From b4c136a9c4872d9050284325b43322d4e389385f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 1 Feb 2022 13:31:45 +0000 Subject: [PATCH] Handle cases where the room does not exist --- roomserver/internal/input/input.go | 2 +- roomserver/storage/interface.go | 2 +- roomserver/storage/shared/room_updater.go | 21 ++++++++++++++++++--- roomserver/storage/shared/storage.go | 17 +++++++++-------- roomserver/storage/sqlite3/storage.go | 2 +- 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 8b3d47a44..c7b38f231 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -151,7 +151,7 @@ func (r *Inputer) processRoomEventUsingUpdater( if err != nil { return false, fmt.Errorf("r.DB.RoomInfo: %w", err) } - updater, err := r.DB.GetRoomUpdater(ctx, *roomInfo) + updater, err := r.DB.GetRoomUpdater(ctx, roomInfo) if err != nil { return true, fmt.Errorf("r.DB.GetRoomUpdater: %w", err) } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 11cc589d8..a9851e05b 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -89,7 +89,7 @@ type Database interface { // Opens and returns a room updater, which locks the room and opens a transaction. // The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error. // If this returns an error then no further action is required. - GetRoomUpdater(ctx context.Context, roomInfo types.RoomInfo) (*shared.RoomUpdater, error) + GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) // Look up event references for the latest events in the room and the current state snapshot. // Returns the latest events, the current state and the maximum depth of the latest events plus 1. // Returns an error if there was a problem talking to the database. diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go index 53d1075be..c3869f3de 100644 --- a/roomserver/storage/shared/room_updater.go +++ b/roomserver/storage/shared/room_updater.go @@ -12,7 +12,7 @@ import ( type RoomUpdater struct { transaction d *Database - roomInfo types.RoomInfo + roomInfo *types.RoomInfo latestEvents []types.StateAtEventAndReference lastEventIDSent string currentStateSnapshotNID types.StateSnapshotNID @@ -25,7 +25,22 @@ func rollback(txn *sql.Tx) { txn.Rollback() // nolint: errcheck } -func NewRoomUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo types.RoomInfo) (*RoomUpdater, error) { +func NewRoomUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo *types.RoomInfo) (*RoomUpdater, error) { + // If the roomInfo is nil then that means that the room doesn't exist + // yet, so we can't do `SelectLatestEventsNIDsForUpdate` because that + // would involve locking a row on the table that doesn't exist. Instead + // we will just run with a normal database transaction. It'll either + // succeed, processing a create event which creates the room, or it won't. + if roomInfo == nil { + tx, err := d.DB.Begin() + if err != nil { + return nil, fmt.Errorf("d.DB.Begin: %w", err) + } + return &RoomUpdater{ + transaction{ctx, tx}, d, nil, nil, "", 0, + }, nil + } + eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID) if err != nil { @@ -84,7 +99,7 @@ func (u *RoomUpdater) StoreEvent( ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool, ) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { - return u.d.storeEvent(ctx, u.txn, event, authEventNIDs, isRejected) + return u.d.storeEvent(ctx, u.txn, u, event, authEventNIDs, isRejected) } func (u *RoomUpdater) AddState( diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 52b959228..b82f5984d 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -42,7 +42,7 @@ type Database struct { MembershipTable tables.Membership PublishedTable tables.Published RedactionsTable tables.Redactions - GetRoomUpdaterFn func(ctx context.Context, roomInfo types.RoomInfo) (*RoomUpdater, error) + GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error) } func (d *Database) SupportsConcurrentRoomInputs() bool { @@ -476,7 +476,7 @@ func (d *Database) MembershipUpdater( } func (d *Database) GetRoomUpdater( - ctx context.Context, roomInfo types.RoomInfo, + ctx context.Context, roomInfo *types.RoomInfo, ) (*RoomUpdater, error) { if d.GetRoomUpdaterFn != nil { return d.GetRoomUpdaterFn(ctx, roomInfo) @@ -497,11 +497,11 @@ func (d *Database) StoreEvent( ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool, ) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { - return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected) + return d.storeEvent(ctx, nil, nil, event, authEventNIDs, isRejected) } func (d *Database) storeEvent( - ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, + ctx context.Context, txn *sql.Tx, updater *RoomUpdater, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool, ) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { var ( @@ -589,7 +589,6 @@ func (d *Database) storeEvent( // that there's a row-level lock on the latest room events (well, // on Postgres at least). var roomInfo *types.RoomInfo - var updater *RoomUpdater if prevEvents := event.PrevEvents(); len(prevEvents) > 0 { roomInfo, err = d.RoomInfo(ctx, event.RoomID()) if err != nil { @@ -603,9 +602,11 @@ func (d *Database) storeEvent( // function only does SELECTs though so the created txn (at this point) is just a read txn like // any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater // to do writes however then this will need to go inside `Writer.Do`. - updater, err = d.GetRoomUpdater(ctx, *roomInfo) - if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err) + if updater == nil { + updater, err = d.GetRoomUpdater(ctx, roomInfo) + if err != nil { + return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err) + } } // Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents // and EndTransaction in a writer then it's possible for a new write txn to be made between the two diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index b38cf7d12..325c253b5 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -202,7 +202,7 @@ func (d *Database) SupportsConcurrentRoomInputs() bool { } func (d *Database) GetRoomUpdater( - ctx context.Context, roomInfo types.RoomInfo, + ctx context.Context, roomInfo *types.RoomInfo, ) (*shared.RoomUpdater, error) { // TODO: Do not use transactions. We should be holding open this transaction but we cannot have // multiple write transactions on sqlite. The code will perform additional