|
|
@ -574,13 +574,6 @@ func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, e
|
|
|
|
func (d *Database) StoreEvent(
|
|
|
|
func (d *Database) StoreEvent(
|
|
|
|
ctx context.Context, event *gomatrixserverlib.Event,
|
|
|
|
ctx context.Context, event *gomatrixserverlib.Event,
|
|
|
|
authEventNIDs []types.EventNID, isRejected bool,
|
|
|
|
authEventNIDs []types.EventNID, isRejected bool,
|
|
|
|
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
|
|
|
|
|
|
|
return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (d *Database) storeEvent(
|
|
|
|
|
|
|
|
ctx context.Context, updater *RoomUpdater, event *gomatrixserverlib.Event,
|
|
|
|
|
|
|
|
authEventNIDs []types.EventNID, isRejected bool,
|
|
|
|
|
|
|
|
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
|
|
|
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
|
|
|
var (
|
|
|
|
var (
|
|
|
|
roomNID types.RoomNID
|
|
|
|
roomNID types.RoomNID
|
|
|
@ -592,15 +585,6 @@ func (d *Database) storeEvent(
|
|
|
|
redactedEventID string
|
|
|
|
redactedEventID string
|
|
|
|
err error
|
|
|
|
err error
|
|
|
|
)
|
|
|
|
)
|
|
|
|
var txn *sql.Tx
|
|
|
|
|
|
|
|
if updater != nil && updater.txn != nil {
|
|
|
|
|
|
|
|
txn = updater.txn
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// First writer is with a database-provided transaction, so that NIDs are assigned
|
|
|
|
|
|
|
|
// globally outside of the updater context, to help avoid races.
|
|
|
|
|
|
|
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
|
|
|
|
|
|
|
// TODO: Here we should aim to have two different code paths for new rooms
|
|
|
|
|
|
|
|
// vs existing ones.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Get the default room version. If the client doesn't supply a room_version
|
|
|
|
// Get the default room version. If the client doesn't supply a room_version
|
|
|
|
// then we will use our configured default to create the room.
|
|
|
|
// then we will use our configured default to create the room.
|
|
|
@ -610,26 +594,25 @@ func (d *Database) storeEvent(
|
|
|
|
// room.
|
|
|
|
// room.
|
|
|
|
var roomVersion gomatrixserverlib.RoomVersion
|
|
|
|
var roomVersion gomatrixserverlib.RoomVersion
|
|
|
|
if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil {
|
|
|
|
if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil {
|
|
|
|
return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// First writer is with a database-provided transaction, so that NIDs are assigned
|
|
|
|
|
|
|
|
// globally outside of the updater context, to help avoid races.
|
|
|
|
|
|
|
|
err = d.Writer.Do(nil, nil, func(txn *sql.Tx) error {
|
|
|
|
if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil {
|
|
|
|
if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil {
|
|
|
|
return fmt.Errorf("d.assignRoomNID: %w", err)
|
|
|
|
return fmt.Errorf("d.assignRoomNID: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil {
|
|
|
|
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil {
|
|
|
|
return fmt.Errorf("d.assignEventTypeNID: %w", err)
|
|
|
|
return fmt.Errorf("d.assignEventTypeNID: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
eventStateKey := event.StateKey()
|
|
|
|
|
|
|
|
// Assigned a numeric ID for the state_key if there is one present.
|
|
|
|
// Assigned a numeric ID for the state_key if there is one present.
|
|
|
|
// Otherwise set the numeric ID for the state_key to 0.
|
|
|
|
// Otherwise set the numeric ID for the state_key to 0.
|
|
|
|
if eventStateKey != nil {
|
|
|
|
if eventStateKey := event.StateKey(); eventStateKey != nil {
|
|
|
|
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
|
|
|
|
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
|
|
|
|
return fmt.Errorf("d.assignStateKeyNID: %w", err)
|
|
|
|
return fmt.Errorf("d.assignStateKeyNID: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
@ -637,7 +620,7 @@ func (d *Database) storeEvent(
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Second writer is using the database-provided transaction, probably from the
|
|
|
|
// Second writer is using the database-provided transaction, probably from the
|
|
|
|
// room updater, for easy roll-back if required.
|
|
|
|
// room updater, for easy roll-back if required.
|
|
|
|
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
|
|
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
|
|
|
if eventNID, stateNID, err = d.EventsTable.InsertEvent(
|
|
|
|
if eventNID, stateNID, err = d.EventsTable.InsertEvent(
|
|
|
|
ctx,
|
|
|
|
ctx,
|
|
|
|
txn,
|
|
|
|
txn,
|
|
|
@ -688,23 +671,27 @@ func (d *Database) storeEvent(
|
|
|
|
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
|
|
|
|
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
|
|
|
|
// to do writes however then this will need to go inside `Writer.Do`.
|
|
|
|
// to do writes however then this will need to go inside `Writer.Do`.
|
|
|
|
succeeded := false
|
|
|
|
succeeded := false
|
|
|
|
if updater == nil {
|
|
|
|
|
|
|
|
var roomInfo *types.RoomInfo
|
|
|
|
var roomInfo *types.RoomInfo
|
|
|
|
roomInfo, err = d.roomInfo(ctx, txn, event.RoomID())
|
|
|
|
var updater *RoomUpdater
|
|
|
|
|
|
|
|
roomInfo, err = d.roomInfo(ctx, nil, event.RoomID())
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if roomInfo == nil && len(prevEvents) > 0 {
|
|
|
|
if roomInfo == nil && len(prevEvents) > 0 {
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
|
|
|
|
updater, err = d.GetRoomUpdater(ctx, roomInfo)
|
|
|
|
updater, err = d.GetRoomUpdater(ctx, roomInfo)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err)
|
|
|
|
return fmt.Errorf("GetRoomUpdater: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
|
|
|
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
|
|
|
|
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
|
|
|
return fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}); err != nil {
|
|
|
|
|
|
|
|
return 0, 0, types.StateAtEvent{}, nil, "", err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
succeeded = true
|
|
|
|
succeeded = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|