Handle cases where the room does not exist

This commit is contained in:
Neil Alexander 2022-02-01 13:31:45 +00:00
parent e0a485c50d
commit b4c136a9c4
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
5 changed files with 30 additions and 14 deletions

View file

@ -151,7 +151,7 @@ func (r *Inputer) processRoomEventUsingUpdater(
if err != nil {
return false, fmt.Errorf("r.DB.RoomInfo: %w", err)
}
updater, err := r.DB.GetRoomUpdater(ctx, *roomInfo)
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
if err != nil {
return true, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
}

View file

@ -89,7 +89,7 @@ type Database interface {
// Opens and returns a room updater, which locks the room and opens a transaction.
// The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error.
// If this returns an error then no further action is required.
GetRoomUpdater(ctx context.Context, roomInfo types.RoomInfo) (*shared.RoomUpdater, error)
GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error)
// Look up event references for the latest events in the room and the current state snapshot.
// Returns the latest events, the current state and the maximum depth of the latest events plus 1.
// Returns an error if there was a problem talking to the database.

View file

@ -12,7 +12,7 @@ import (
type RoomUpdater struct {
transaction
d *Database
roomInfo types.RoomInfo
roomInfo *types.RoomInfo
latestEvents []types.StateAtEventAndReference
lastEventIDSent string
currentStateSnapshotNID types.StateSnapshotNID
@ -25,7 +25,22 @@ func rollback(txn *sql.Tx) {
txn.Rollback() // nolint: errcheck
}
func NewRoomUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo types.RoomInfo) (*RoomUpdater, error) {
func NewRoomUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo *types.RoomInfo) (*RoomUpdater, error) {
// If the roomInfo is nil then that means that the room doesn't exist
// yet, so we can't do `SelectLatestEventsNIDsForUpdate` because that
// would involve locking a row on the table that doesn't exist. Instead
// we will just run with a normal database transaction. It'll either
// succeed, processing a create event which creates the room, or it won't.
if roomInfo == nil {
tx, err := d.DB.Begin()
if err != nil {
return nil, fmt.Errorf("d.DB.Begin: %w", err)
}
return &RoomUpdater{
transaction{ctx, tx}, d, nil, nil, "", 0,
}, nil
}
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID)
if err != nil {
@ -84,7 +99,7 @@ func (u *RoomUpdater) StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool,
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
return u.d.storeEvent(ctx, u.txn, event, authEventNIDs, isRejected)
return u.d.storeEvent(ctx, u.txn, u, event, authEventNIDs, isRejected)
}
func (u *RoomUpdater) AddState(

View file

@ -42,7 +42,7 @@ type Database struct {
MembershipTable tables.Membership
PublishedTable tables.Published
RedactionsTable tables.Redactions
GetRoomUpdaterFn func(ctx context.Context, roomInfo types.RoomInfo) (*RoomUpdater, error)
GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error)
}
func (d *Database) SupportsConcurrentRoomInputs() bool {
@ -476,7 +476,7 @@ func (d *Database) MembershipUpdater(
}
func (d *Database) GetRoomUpdater(
ctx context.Context, roomInfo types.RoomInfo,
ctx context.Context, roomInfo *types.RoomInfo,
) (*RoomUpdater, error) {
if d.GetRoomUpdaterFn != nil {
return d.GetRoomUpdaterFn(ctx, roomInfo)
@ -497,11 +497,11 @@ func (d *Database) StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool,
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected)
return d.storeEvent(ctx, nil, nil, event, authEventNIDs, isRejected)
}
func (d *Database) storeEvent(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event,
ctx context.Context, txn *sql.Tx, updater *RoomUpdater, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool,
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
var (
@ -589,7 +589,6 @@ func (d *Database) storeEvent(
// that there's a row-level lock on the latest room events (well,
// on Postgres at least).
var roomInfo *types.RoomInfo
var updater *RoomUpdater
if prevEvents := event.PrevEvents(); len(prevEvents) > 0 {
roomInfo, err = d.RoomInfo(ctx, event.RoomID())
if err != nil {
@ -603,9 +602,11 @@ func (d *Database) storeEvent(
// function only does SELECTs though so the created txn (at this point) is just a read txn like
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
// to do writes however then this will need to go inside `Writer.Do`.
updater, err = d.GetRoomUpdater(ctx, *roomInfo)
if err != nil {
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
if updater == nil {
updater, err = d.GetRoomUpdater(ctx, roomInfo)
if err != nil {
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err)
}
}
// Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents
// and EndTransaction in a writer then it's possible for a new write txn to be made between the two

View file

@ -202,7 +202,7 @@ func (d *Database) SupportsConcurrentRoomInputs() bool {
}
func (d *Database) GetRoomUpdater(
ctx context.Context, roomInfo types.RoomInfo,
ctx context.Context, roomInfo *types.RoomInfo,
) (*shared.RoomUpdater, error) {
// TODO: Do not use transactions. We should be holding open this transaction but we cannot have
// multiple write transactions on sqlite. The code will perform additional