Compare commits

...

6 commits

Author SHA1 Message Date
Neil Alexander d08d319a9d
Use the writer to get the room updater 2022-09-12 11:11:38 +01:00
Neil Alexander 0d74b296f1
Simplify 2022-09-12 11:08:14 +01:00
Neil Alexander 1b49a9deff
Try that again 2022-09-12 11:07:19 +01:00
Neil Alexander 35f417627d
Try that again 2022-09-12 11:00:00 +01:00
Neil Alexander ee4d8d6de7
Ensure that the previous events update is guarded by the writer 2022-09-12 10:54:36 +01:00
Neil Alexander b5f7d7adcb
Tweak storeEvent 2022-09-12 10:50:29 +01:00
2 changed files with 36 additions and 51 deletions

View file

@ -105,14 +105,12 @@ func (u *RoomUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer // StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer
func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
for _, ref := range previousEventReferences { for _, ref := range previousEventReferences {
if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err)
} }
} }
return nil return nil
})
} }
func (u *RoomUpdater) Events( func (u *RoomUpdater) Events(

View file

@ -574,13 +574,6 @@ func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, e
func (d *Database) StoreEvent( func (d *Database) StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event, ctx context.Context, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool, authEventNIDs []types.EventNID, isRejected bool,
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected)
}
func (d *Database) storeEvent(
ctx context.Context, updater *RoomUpdater, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool,
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { ) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
var ( var (
roomNID types.RoomNID roomNID types.RoomNID
@ -592,15 +585,6 @@ func (d *Database) storeEvent(
redactedEventID string redactedEventID string
err error err error
) )
var txn *sql.Tx
if updater != nil && updater.txn != nil {
txn = updater.txn
}
// First writer is with a database-provided transaction, so that NIDs are assigned
// globally outside of the updater context, to help avoid races.
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// TODO: Here we should aim to have two different code paths for new rooms
// vs existing ones.
// Get the default room version. If the client doesn't supply a room_version // Get the default room version. If the client doesn't supply a room_version
// then we will use our configured default to create the room. // then we will use our configured default to create the room.
@ -610,26 +594,25 @@ func (d *Database) storeEvent(
// room. // room.
var roomVersion gomatrixserverlib.RoomVersion var roomVersion gomatrixserverlib.RoomVersion
if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil { if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil {
return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err) return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
} }
// First writer is with a database-provided transaction, so that NIDs are assigned
// globally outside of the updater context, to help avoid races.
err = d.Writer.Do(nil, nil, func(txn *sql.Tx) error {
if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil { if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil {
return fmt.Errorf("d.assignRoomNID: %w", err) return fmt.Errorf("d.assignRoomNID: %w", err)
} }
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil { if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil {
return fmt.Errorf("d.assignEventTypeNID: %w", err) return fmt.Errorf("d.assignEventTypeNID: %w", err)
} }
eventStateKey := event.StateKey()
// Assigned a numeric ID for the state_key if there is one present. // Assigned a numeric ID for the state_key if there is one present.
// Otherwise set the numeric ID for the state_key to 0. // Otherwise set the numeric ID for the state_key to 0.
if eventStateKey != nil { if eventStateKey := event.StateKey(); eventStateKey != nil {
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil { if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
return fmt.Errorf("d.assignStateKeyNID: %w", err) return fmt.Errorf("d.assignStateKeyNID: %w", err)
} }
} }
return nil return nil
}) })
if err != nil { if err != nil {
@ -637,7 +620,7 @@ func (d *Database) storeEvent(
} }
// Second writer is using the database-provided transaction, probably from the // Second writer is using the database-provided transaction, probably from the
// room updater, for easy roll-back if required. // room updater, for easy roll-back if required.
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if eventNID, stateNID, err = d.EventsTable.InsertEvent( if eventNID, stateNID, err = d.EventsTable.InsertEvent(
ctx, ctx,
txn, txn,
@ -688,23 +671,27 @@ func (d *Database) storeEvent(
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater // any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
// to do writes however then this will need to go inside `Writer.Do`. // to do writes however then this will need to go inside `Writer.Do`.
succeeded := false succeeded := false
if updater == nil {
var roomInfo *types.RoomInfo var roomInfo *types.RoomInfo
roomInfo, err = d.roomInfo(ctx, txn, event.RoomID()) var updater *RoomUpdater
roomInfo, err = d.roomInfo(ctx, nil, event.RoomID())
if err != nil { if err != nil {
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err) return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
} }
if roomInfo == nil && len(prevEvents) > 0 { if roomInfo == nil && len(prevEvents) > 0 {
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID()) return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
} }
if err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
updater, err = d.GetRoomUpdater(ctx, roomInfo) updater, err = d.GetRoomUpdater(ctx, roomInfo)
if err != nil { if err != nil {
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err) return fmt.Errorf("GetRoomUpdater: %w", err)
} }
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
}
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil { if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err) return fmt.Errorf("updater.StorePreviousEvents: %w", err)
}
return nil
}); err != nil {
return 0, 0, types.StateAtEvent{}, nil, "", err
} }
succeeded = true succeeded = true
} }