diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go index 9e8c2367e..53cef9cbd 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go @@ -1,6 +1,7 @@ package storage import ( + "context" "database/sql" "github.com/matrix-org/gomatrixserverlib" @@ -8,7 +9,7 @@ import ( const inviteEventsSchema = ` CREATE TABLE IF NOT EXISTS syncapi_invite_events ( - id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_output_room_event_id_seq'), + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), event_id TEXT NOT NULL, room_id TEXT NOT NULL, target_user_id TEXT NOT NULL, @@ -51,10 +52,13 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { } func (s *inviteEventsStatements) insertInviteEvent( - inviteEvent gomatrixserverlib.Event, + ctx context.Context, inviteEvent gomatrixserverlib.Event, ) (streamPos int64, err error) { - err = s.insertInviteEventStmt.QueryRow( - inviteEvent.RoomID(), inviteEvent.EventID, *inviteEvent.StateKey(), + err = s.insertInviteEventStmt.QueryRowContext( + ctx, + inviteEvent.RoomID(), + inviteEvent.EventID(), + *inviteEvent.StateKey(), inviteEvent.JSON(), ).Scan(&streamPos) return @@ -63,10 +67,10 @@ func (s *inviteEventsStatements) insertInviteEvent( // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) selectInviteEventsInRange( - targetUserID string, startPos, endPos int64, + ctx context.Context, targetUserID string, startPos, endPos int64, ) (map[string]gomatrixserverlib.Event, error) { - rows, err := s.selectInviteEventsInRangeStmt.Query( - targetUserID, startPos, endPos, + rows, err := s.selectInviteEventsInRangeStmt.QueryContext( + ctx, targetUserID, startPos, endPos, ) if err != nil { return nil, err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 660a3074d..0d5236fa9 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -30,11 +30,16 @@ const outputRoomEventsSchema = ` CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; -- Stores output room events received from the roomserver. +CREATE SEQUENCE IF NOT EXISTS syncapi_output_room_event_id_seq CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- An incrementing ID which denotes the position in the log that this event resides at. -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- This isn't a problem for us since we just want to order by this field. +<<<<<<< HEAD id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), +======= + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_output_room_event_id_seq'), +>>>>>>> 960083b099c96b25b0c81ea602b372470e9cf889 -- The event ID for the event event_id TEXT NOT NULL, -- The 'room_id' key for the event. diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index a26b14c0e..9fc65ad0b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -47,32 +47,32 @@ type SyncServerDatabase struct { accountData accountDataStatements events outputRoomEventsStatements roomstate currentRoomStateStatements + invites inviteEventsStatements } // NewSyncServerDatabase creates a new sync server database func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { - var db *sql.DB + var d SyncServerDatabase var err error - if db, err = sql.Open("postgres", dataSourceName); err != nil { + if d.db, err = sql.Open("postgres", dataSourceName); err != nil { return nil, err } - partitions := common.PartitionOffsetStatements{} - if err = partitions.Prepare(db, "syncapi"); err != nil { + if err = d.partitions.Prepare(d.db, "syncapi"); err != nil { return nil, err } - accountData := accountDataStatements{} - if err = accountData.prepare(db); err != nil { + if err = d.accountData.prepare(d.db); err != nil { return nil, err } - events := outputRoomEventsStatements{} - if err = events.prepare(db); err != nil { + if err = d.events.prepare(d.db); err != nil { return nil, err } - state := currentRoomStateStatements{} - if err := state.prepare(db); err != nil { + if err := d.roomstate.prepare(d.db); err != nil { return nil, err } - return &SyncServerDatabase{db, partitions, accountData, events, state}, nil + if err := d.invites.prepare(d.db); err != nil { + return nil, err + } + return &d, nil } // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. @@ -260,7 +260,7 @@ func (d *SyncServerDatabase) IncrementalSync( } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, fromPos, toPos, res); err != nil { return nil, err } @@ -322,7 +322,7 @@ func (d *SyncServerDatabase) CompleteSync( res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil { return nil, err } @@ -365,7 +365,11 @@ func (d *SyncServerDatabase) UpsertAccountData( } func (d *SyncServerDatabase) addInvitesToResponse( - ctx context.Context, txn *sql.Tx, userID string, res *types.Response) error { + ctx context.Context, txn *sql.Tx, + userID string, + _, _ types.StreamPosition, + res *types.Response, +) error { // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "invite") if err != nil {