From 6ade9ed7a43797d40518dc3a147065e3c38683e6 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Thu, 23 Feb 2023 14:28:11 +0100 Subject: [PATCH] Pull out creation of several NIDs; Add more caching --- internal/caching/cache_eventstatekeys.go | 23 +++ internal/caching/cache_roomservernids.go | 9 + internal/caching/caches.go | 23 ++- internal/caching/impl_ristretto.go | 18 ++ roomserver/internal/helpers/helpers_test.go | 13 +- roomserver/internal/input/input_events.go | 35 +++- .../internal/perform/perform_backfill.go | 21 +- roomserver/storage/interface.go | 29 +-- roomserver/storage/shared/storage.go | 190 +++++++++++------- 9 files changed, 248 insertions(+), 113 deletions(-) diff --git a/internal/caching/cache_eventstatekeys.go b/internal/caching/cache_eventstatekeys.go index 05580ab05..634af5422 100644 --- a/internal/caching/cache_eventstatekeys.go +++ b/internal/caching/cache_eventstatekeys.go @@ -7,6 +7,7 @@ import "github.com/matrix-org/dendrite/roomserver/types" type EventStateKeyCache interface { GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) + GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool) } func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) { @@ -15,4 +16,26 @@ func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (strin func (c Caches) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) { c.RoomServerStateKeys.Set(eventStateKeyNID, eventStateKey) + c.RoomServerStateKeyNIDs.Set(eventStateKey, eventStateKeyNID) +} + +func (c Caches) GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool) { + if eventStateKey == "" { + return 1, true // 1 is the empty statekey as per the default value in the database + } + return c.RoomServerStateKeyNIDs.Get(eventStateKey) +} + +type EventTypeCache interface { + GetEventTypeKey(eventType string) (types.EventTypeNID, bool) + StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string) +} + +func (c Caches) StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string) { + c.RoomServerEventTypeNIDs.Set(eventType, eventTypeNID) + c.RoomServerEventTypes.Set(eventTypeNID, eventType) +} + +func (c Caches) GetEventTypeKey(eventType string) (types.EventTypeNID, bool) { + return c.RoomServerEventTypeNIDs.Get(eventType) } diff --git a/internal/caching/cache_roomservernids.go b/internal/caching/cache_roomservernids.go index 88a5b28bc..734a3a04f 100644 --- a/internal/caching/cache_roomservernids.go +++ b/internal/caching/cache_roomservernids.go @@ -9,19 +9,28 @@ type RoomServerCaches interface { RoomVersionCache RoomServerEventsCache EventStateKeyCache + EventTypeCache } // RoomServerNIDsCache contains the subset of functions needed for // a roomserver NID cache. type RoomServerNIDsCache interface { GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) + // StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) + GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) } func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) { return c.RoomServerRoomIDs.Get(roomNID) } +// StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) { + c.RoomServerRoomNIDs.Set(roomID, roomNID) c.RoomServerRoomIDs.Set(roomNID, roomID) } + +func (c Caches) GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) { + return c.RoomServerRoomNIDs.Get(roomID) +} diff --git a/internal/caching/caches.go b/internal/caching/caches.go index 78c9ab7ee..479920466 100644 --- a/internal/caching/caches.go +++ b/internal/caching/caches.go @@ -23,16 +23,19 @@ import ( // different implementations as long as they satisfy the Cache // interface. type Caches struct { - RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version - ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys - RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID - RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID - RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event - RoomServerStateKeys Cache[types.EventStateKeyNID, string] // event NID -> event state key - FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU - FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU - SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response - LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID + RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version + ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys + RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID + RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID + RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event + RoomServerStateKeys Cache[types.EventStateKeyNID, string] // eventStateKey NID -> event state key + RoomServerStateKeyNIDs Cache[string, types.EventStateKeyNID] // event state key -> eventStateKey NID + RoomServerEventTypeNIDs Cache[string, types.EventTypeNID] // eventType -> eventType NID + RoomServerEventTypes Cache[types.EventTypeNID, string] // eventType NID -> eventType + FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU + FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU + SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response + LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID } // Cache is the interface that an implementation must satisfy. diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go index 49292d0dc..fca93afd1 100644 --- a/internal/caching/impl_ristretto.go +++ b/internal/caching/impl_ristretto.go @@ -40,6 +40,9 @@ const ( spaceSummaryRoomsCache lazyLoadingCache eventStateKeyCache + eventTypeCache + eventTypeNIDCache + eventStateKeyNIDCache ) func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches { @@ -105,6 +108,21 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm Prefix: eventStateKeyCache, MaxAge: maxAge, }, + RoomServerStateKeyNIDs: &RistrettoCachePartition[string, types.EventStateKeyNID]{ // eventStateKey -> eventStateKey NID + cache: cache, + Prefix: eventStateKeyNIDCache, + MaxAge: maxAge, + }, + RoomServerEventTypeNIDs: &RistrettoCachePartition[string, types.EventTypeNID]{ // eventType -> eventType NID + cache: cache, + Prefix: eventTypeCache, + MaxAge: maxAge, + }, + RoomServerEventTypes: &RistrettoCachePartition[types.EventTypeNID, string]{ // eventType NID -> eventType + cache: cache, + Prefix: eventTypeNIDCache, + MaxAge: maxAge, + }, FederationPDUs: &RistrettoCostedCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ // queue NID -> PDU &RistrettoCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ cache: cache, diff --git a/roomserver/internal/helpers/helpers_test.go b/roomserver/internal/helpers/helpers_test.go index aa5c30e44..62730df1f 100644 --- a/roomserver/internal/helpers/helpers_test.go +++ b/roomserver/internal/helpers/helpers_test.go @@ -38,7 +38,18 @@ func TestIsInvitePendingWithoutNID(t *testing.T) { var authNIDs []types.EventNID for _, x := range room.Events() { - evNID, _, _, _, _, err := db.StoreEvent(context.Background(), x.Event, authNIDs, false) + roomNID, err := db.GetOrCreateRoomNID(context.Background(), x.Unwrap()) + assert.NoError(t, err) + assert.Greater(t, roomNID, types.RoomNID(0)) + + eventTypeNID, err := db.GetOrCreateEventTypeNID(context.Background(), x.Type()) + assert.NoError(t, err) + assert.Greater(t, eventTypeNID, types.EventTypeNID(0)) + + eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(context.Background(), x.StateKey()) + assert.NoError(t, err) + + evNID, _, _, _, err := db.StoreEvent(context.Background(), x.Event, roomNID, eventTypeNID, eventStateKeyNID, authNIDs, false) assert.NoError(t, err) authNIDs = append(authNIDs, evNID) } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 6e90892b1..fe35efb27 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -332,8 +332,23 @@ func (r *Inputer) processRoomEvent( } } + roomNID, err := r.DB.GetOrCreateRoomNID(ctx, event) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err) + } + + eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, event.Type()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err) + } + + eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err) + } + // Store the event. - _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) + _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected) if err != nil { return fmt.Errorf("updater.StoreEvent: %w", err) } @@ -568,6 +583,7 @@ func (r *Inputer) processStateBefore( // we've failed to retrieve the auth chain altogether (in which case // an error is returned) or we've successfully retrieved them all and // they are now in the database. +// nolint: gocyclo func (r *Inputer) fetchAuthEvents( ctx context.Context, logger *logrus.Entry, @@ -674,8 +690,23 @@ nextAuthEvent: logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID()) } + roomNID, err := r.DB.GetOrCreateRoomNID(ctx, authEvent) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err) + } + + eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, authEvent.Type()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err) + } + + eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err) + } + // Finally, store the event in the database. - eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected) + eventNID, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected) if err != nil { return fmt.Errorf("updater.StoreEvent: %w", err) } diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index caa939a12..3a3a049db 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -605,9 +605,28 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs authNids[i] = nid.EventNID i++ } + + roomNID, err = db.GetOrCreateRoomNID(ctx, ev.Unwrap()) + if err != nil { + logrus.WithError(err).Error("failed to get or create roomNID") + continue + } + + eventTypeNID, err := db.GetOrCreateEventTypeNID(ctx, ev.Type()) + if err != nil { + logrus.WithError(err).Error("failed to get or create eventType NID") + continue + } + + eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(ctx, ev.StateKey()) + if err != nil { + logrus.WithError(err).Error("failed to get or create eventStateKey NID") + continue + } + var redactedEventID string var redactionEvent *gomatrixserverlib.Event - eventNID, roomNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false) + eventNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), roomNID, eventTypeNID, eventStateKeyNID, authNids, false) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event") continue diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index b84f086bf..88ec56670 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -74,10 +74,7 @@ type Database interface { SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error) // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. - StoreEvent( - ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, - isRejected bool, - ) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) + StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) // Look up the state entries for a list of string event IDs // Returns an error if the there is an error talking to the database // Returns a types.MissingEventError if the event IDs aren't in the database. @@ -182,24 +179,11 @@ type Database interface { GetMembershipForHistoryVisibility( ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string, ) (map[string]*gomatrixserverlib.HeaderedEvent, error) + GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error) + GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) + GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error) } -/* - RoomsTable - EventsTable - EventJSONTable - MembershipTable - StateSnapshotTable - StateBlockTable - EventTypesTable - EventStateKeysTable - - - PublishedTable? - RoomAliases? - -*/ - type RoomDatabase interface { // RoomInfo returns room information for the given room ID, or nil if there is no room. RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) @@ -207,7 +191,7 @@ type RoomDatabase interface { IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error) MissingAuthPrevEvents(ctx context.Context, e *gomatrixserverlib.Event) (missingAuth, missingPrev []string, err error) // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. - StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) + StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error) @@ -224,4 +208,7 @@ type RoomDatabase interface { LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error) EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error) + GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error) + GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) + GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error) } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index e904676d7..7eb4567f9 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -112,16 +112,31 @@ func (d *Database) eventStateKeyNIDs( ) (map[string]types.EventStateKeyNID, error) { result := make(map[string]types.EventStateKeyNID) eventStateKeys = util.UniqueStrings(eventStateKeys) - nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, eventStateKeys) - if err != nil { - return nil, err + // first ask the cache about these keys + fetchEventStateKeys := make([]string, 0, len(eventStateKeys)) + for _, eventStateKey := range eventStateKeys { + eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey) + if ok { + result[eventStateKey] = eventStateKeyNID + continue + } + fetchEventStateKeys = append(fetchEventStateKeys, eventStateKey) } - for eventStateKey, nid := range nids { - result[eventStateKey] = nid + + if len(fetchEventStateKeys) > 0 { + nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, fetchEventStateKeys) + if err != nil { + return nil, err + } + for eventStateKey, nid := range nids { + result[eventStateKey] = nid + } } + // We received some nids, but are still missing some, work out which and create them if len(eventStateKeys) > len(result) { var nid types.EventStateKeyNID + var err error err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { for _, eventStateKey := range eventStateKeys { if _, ok := result[eventStateKey]; ok { @@ -629,73 +644,71 @@ func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, e return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID) } -func (d *Database) StoreEvent( - ctx context.Context, event *gomatrixserverlib.Event, - authEventNIDs []types.EventNID, isRejected bool, -) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { - return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected) +// GetOrCreateRoomNID gets or creates a new roomNID for the given event +func (d *Database) GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (roomNID types.RoomNID, err error) { + // Get the default room version. If the client doesn't supply a room_version + // then we will use our configured default to create the room. + // https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-createroom + // Note that the below logic depends on the m.room.create event being the + // first event that is persisted to the database when creating or joining a + // room. + var roomVersion gomatrixserverlib.RoomVersion + if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil { + return 0, fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err) + } + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion) + if err != nil { + return err + } + return nil + }) + return roomNID, err } -func (d *Database) storeEvent( - ctx context.Context, updater *RoomUpdater, event *gomatrixserverlib.Event, - authEventNIDs []types.EventNID, isRejected bool, -) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { - var ( - roomNID types.RoomNID - eventTypeNID types.EventTypeNID - eventStateKeyNID types.EventStateKeyNID - eventNID types.EventNID - stateNID types.StateSnapshotNID - redactionEvent *gomatrixserverlib.Event - redactedEventID string - err error - ) - var txn *sql.Tx - if updater != nil && updater.txn != nil { - txn = updater.txn - } - // First writer is with a database-provided transaction, so that NIDs are assigned - // globally outside of the updater context, to help avoid races. +func (d *Database) GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - // TODO: Here we should aim to have two different code paths for new rooms - // vs existing ones. - - // Get the default room version. If the client doesn't supply a room_version - // then we will use our configured default to create the room. - // https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-createroom - // Note that the below logic depends on the m.room.create event being the - // first event that is persisted to the database when creating or joining a - // room. - var roomVersion gomatrixserverlib.RoomVersion - if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil { - return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err) - } - - if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil { - return fmt.Errorf("d.assignRoomNID: %w", err) - } - - if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil { + if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, eventType); err != nil { return fmt.Errorf("d.assignEventTypeNID: %w", err) } + return nil + }) + return eventTypeNID, err +} - eventStateKey := event.StateKey() - // Assigned a numeric ID for the state_key if there is one present. - // Otherwise set the numeric ID for the state_key to 0. - if eventStateKey != nil { - if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil { - return fmt.Errorf("d.assignStateKeyNID: %w", err) - } +func (d *Database) GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (eventStateKeyNID types.EventStateKeyNID, err error) { + if eventStateKey == nil { + return 0, nil + } + + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil { + return fmt.Errorf("d.assignStateKeyNID: %w", err) } - return nil }) if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err) + return 0, err } + + return eventStateKeyNID, nil +} + +func (d *Database) StoreEvent( + ctx context.Context, event *gomatrixserverlib.Event, + roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, + authEventNIDs []types.EventNID, isRejected bool, +) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { + var ( + eventNID types.EventNID + stateNID types.StateSnapshotNID + redactionEvent *gomatrixserverlib.Event + redactedEventID string + err error + ) // Second writer is using the database-provided transaction, probably from the // room updater, for easy roll-back if required. - err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if eventNID, stateNID, err = d.EventsTable.InsertEvent( ctx, txn, @@ -731,7 +744,7 @@ func (d *Database) storeEvent( return nil }) if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err) + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err) } // We should attempt to update the previous events table with any @@ -746,28 +759,28 @@ func (d *Database) storeEvent( // any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater // to do writes however then this will need to go inside `Writer.Do`. succeeded := false - if updater == nil { - var roomInfo *types.RoomInfo - roomInfo, err = d.roomInfo(ctx, txn, event.RoomID()) - if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err) - } - if roomInfo == nil && len(prevEvents) > 0 { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID()) - } - updater, err = d.GetRoomUpdater(ctx, roomInfo) - if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err) - } - defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) + var roomInfo *types.RoomInfo + roomInfo, err = d.roomInfo(ctx, nil, event.RoomID()) + if err != nil { + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err) } + if roomInfo == nil && len(prevEvents) > 0 { + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID()) + } + var updater *RoomUpdater + updater, err = d.GetRoomUpdater(ctx, roomInfo) + if err != nil { + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err) + } + defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) + if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err) + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err) } succeeded = true } - return eventNID, roomNID, types.StateAtEvent{ + return eventNID, types.StateAtEvent{ BeforeStateSnapshotNID: stateNID, StateEntry: types.StateEntry{ StateKeyTuple: types.StateKeyTuple{ @@ -819,6 +832,10 @@ func (d *Database) MissingAuthPrevEvents( func (d *Database) assignRoomNID( ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion, ) (types.RoomNID, error) { + roomNID, ok := d.Cache.GetRoomServerRoomNID(roomID) + if ok { + return roomNID, nil + } // Check if we already have a numeric ID in the database. roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID) if err == sql.ErrNoRows { @@ -829,12 +846,20 @@ func (d *Database) assignRoomNID( roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID) } } - return roomNID, err + if err != nil { + return 0, err + } + d.Cache.StoreRoomServerRoomID(roomNID, roomID) + return roomNID, nil } func (d *Database) assignEventTypeNID( ctx context.Context, txn *sql.Tx, eventType string, ) (types.EventTypeNID, error) { + eventTypeNID, ok := d.Cache.GetEventTypeKey(eventType) + if ok { + return eventTypeNID, nil + } // Check if we already have a numeric ID in the database. eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType) if err == sql.ErrNoRows { @@ -845,12 +870,20 @@ func (d *Database) assignEventTypeNID( eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType) } } - return eventTypeNID, err + if err != nil { + return 0, err + } + d.Cache.StoreEventTypeKey(eventTypeNID, eventType) + return eventTypeNID, nil } func (d *Database) assignStateKeyNID( ctx context.Context, txn *sql.Tx, eventStateKey string, ) (types.EventStateKeyNID, error) { + eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey) + if ok { + return eventStateKeyNID, nil + } // Check if we already have a numeric ID in the database. eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey) if err == sql.ErrNoRows { @@ -861,6 +894,7 @@ func (d *Database) assignStateKeyNID( eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey) } } + d.Cache.StoreEventStateKey(eventStateKeyNID, eventStateKey) return eventStateKeyNID, err }