diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index a3efd8d58..4b789e56d 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -28,26 +28,82 @@ import ( type Database interface { common.PartitionStorer + // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) + // Events lookups a list of event by their event ID. + // Returns a list of events matching the requested IDs found in the database. + // If an event is not found in the database then it will be omitted from the list. + // Returns an error if there was a problem talking with the database. + // Does not include any transaction IDs in the returned events. Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) + // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races + // when generating the sync stream position for this event. Returns the sync stream position for the inserted event. + // Returns an error if there was a problem inserting this event. WriteEvent(context.Context, *gomatrixserverlib.HeaderedEvent, []gomatrixserverlib.HeaderedEvent, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error) + // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key + // If no event could be found, returns nil + // If there was an issue during the retrieval, returns an error GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) + // GetStateEventsForRoom fetches the state events for a given room. + // Returns an empty slice if no state events could be found for this room. + // Returns an error if there was an issue with the retrieval. GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) + // SyncPosition returns the latest positions for syncing. SyncPosition(ctx context.Context) (types.PaginationToken, error) + // IncrementalSync returns all the data needed in order to create an incremental + // sync response for the given user. Events returned will include any client + // transaction IDs associated with the given device. These transaction IDs come + // from when the device sent the event via an API that included a transaction + // ID. IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) + // CompleteSync returns a complete /sync API response for the given user. CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error) + // GetAccountDataInRange returns all account data for a given user inserted or + // updated between two given positions + // Returns a map following the format data[roomID] = []dataTypes + // If no data is retrieved, returns an empty map + // If there was an issue with the retrieval, returns an error GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) + // UpsertAccountData keeps track of new or updated account data, by saving the type + // of the new/updated data, and the user ID and room ID the data is related to (empty) + // room ID means the data isn't specific to any room) + // If no data with the given type, user ID and room ID exists in the database, + // creates a new row, else update the existing one + // Returns an error if there was an issue with the upsert UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error) + // AddInviteEvent stores a new invite event for a user. + // If the invite was successfully stored this returns the stream ID it was stored at. + // Returns an error if there was a problem communicating with the database. AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error) + // RetireInviteEvent removes an old invite event from the database. + // Returns an error if there was a problem communicating with the database. RetireInviteEvent(ctx context.Context, inviteEventID string) error + // SetTypingTimeoutCallback sets a callback function that is called right after + // a user is removed from the typing user list due to timeout. SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) + // AddTypingUser adds a typing user to the typing cache. + // Returns the newly calculated sync position for typing notifications. AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition + // RemoveTypingUser removes a typing user from the typing cache. + // Returns the newly calculated sync position for typing notifications. RemoveTypingUser(userID, roomID string) types.StreamPosition + // GetEventsInRange retrieves all of the events on a given ordering using the + // given extremities and limit. GetEventsInRange(ctx context.Context, from, to *types.PaginationToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) + // EventPositionInTopology returns the depth of the given event. EventPositionInTopology(ctx context.Context, eventID string) (types.StreamPosition, error) + // EventsAtTopologicalPosition returns all of the events matching a given + // position in the topology of a given room. EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) + // BackwardExtremitiesForRoom returns the event IDs of all of the backward + // extremities we know of for a given room. BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) + // MaxTopologicalPosition returns the highest topological position for a given room. MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error) + // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and + // matches the streamevent.transactionID device then the transaction ID gets + // added to the unsigned section of the output event. StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent + // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) } diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 9d61ccfc3..ef9707397 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -93,16 +93,10 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er return &d, nil } -// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { return d.roomstate.selectJoinedUsers(ctx) } -// Events lookups a list of event by their event ID. -// Returns a list of events matching the requested IDs found in the database. -// If an event is not found in the database then it will be omitted from the list. -// Returns an error if there was a problem talking with the database. -// Does not include any transaction IDs in the returned events. func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) if err != nil { @@ -148,9 +142,6 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx return nil } -// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races -// when generating the sync stream position for this event. Returns the sync stream position for the inserted event. -// Returns an error if there was a problem inserting this event. func (d *SyncServerDatasource) WriteEvent( ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, @@ -221,18 +212,12 @@ func (d *SyncServerDatasource) updateRoomState( return nil } -// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key -// If no event could be found, returns nil -// If there was an issue during the retrieval, returns an error func (d *SyncServerDatasource) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) } -// GetStateEventsForRoom fetches the state events for a given room. -// Returns an empty slice if no state events could be found for this room. -// Returns an error if there was an issue with the retrieval. func (d *SyncServerDatasource) GetStateEventsForRoom( ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) { @@ -243,8 +228,6 @@ func (d *SyncServerDatasource) GetStateEventsForRoom( return } -// GetEventsInRange retrieves all of the events on a given ordering using the -// given extremities and limit. func (d *SyncServerDatasource) GetEventsInRange( ctx context.Context, from, to *types.PaginationToken, @@ -306,29 +289,22 @@ func (d *SyncServerDatasource) GetEventsInRange( return } -// SyncPosition returns the latest positions for syncing. func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.PaginationToken, error) { return d.syncPositionTx(ctx, nil) } -// BackwardExtremitiesForRoom returns the event IDs of all of the backward -// extremities we know of for a given room. func (d *SyncServerDatasource) BackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (backwardExtremities []string, err error) { return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) } -// MaxTopologicalPosition returns the highest topological position for a given -// room. func (d *SyncServerDatasource) MaxTopologicalPosition( ctx context.Context, roomID string, ) (types.StreamPosition, error) { return d.topology.selectMaxPositionInTopology(ctx, roomID) } -// EventsAtTopologicalPosition returns all of the events matching a given -// position in the topology of a given room. func (d *SyncServerDatasource) EventsAtTopologicalPosition( ctx context.Context, roomID string, pos types.StreamPosition, ) ([]types.StreamEvent, error) { @@ -346,7 +322,6 @@ func (d *SyncServerDatasource) EventPositionInTopology( return d.topology.selectPositionInTopology(ctx, eventID) } -// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { return d.syncStreamPositionTx(ctx, nil) } @@ -511,11 +486,6 @@ func (d *SyncServerDatasource) addEDUDeltaToResponse( return } -// IncrementalSync returns all the data needed in order to create an incremental -// sync response for the given user. Events returned will include any client -// transaction IDs associated with the given device. These transaction IDs come -// from when the device sent the event via an API that included a transaction -// ID. func (d *SyncServerDatasource) IncrementalSync( ctx context.Context, device authtypes.Device, @@ -645,7 +615,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( return res, toPos, joinedRoomIDs, err } -// CompleteSync returns a complete /sync API response for the given user. func (d *SyncServerDatasource) CompleteSync( ctx context.Context, userID string, numRecentEventsPerRoom int, ) (*types.Response, error) { @@ -677,11 +646,6 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -// GetAccountDataInRange returns all account data for a given user inserted or -// updated between two given positions -// Returns a map following the format data[roomID] = []dataTypes -// If no data is retrieved, returns an empty map -// If there was an issue with the retrieval, returns an error func (d *SyncServerDatasource) GetAccountDataInRange( ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter, @@ -689,29 +653,18 @@ func (d *SyncServerDatasource) GetAccountDataInRange( return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) } -// UpsertAccountData keeps track of new or updated account data, by saving the type -// of the new/updated data, and the user ID and room ID the data is related to (empty) -// room ID means the data isn't specific to any room) -// If no data with the given type, user ID and room ID exists in the database, -// creates a new row, else update the existing one -// Returns an error if there was an issue with the upsert func (d *SyncServerDatasource) UpsertAccountData( ctx context.Context, userID, roomID, dataType string, ) (types.StreamPosition, error) { return d.accountData.insertAccountData(ctx, userID, roomID, dataType) } -// AddInviteEvent stores a new invite event for a user. -// If the invite was successfully stored this returns the stream ID it was stored at. -// Returns an error if there was a problem communicating with the database. func (d *SyncServerDatasource) AddInviteEvent( ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, ) (types.StreamPosition, error) { return d.invites.insertInviteEvent(ctx, inviteEvent) } -// RetireInviteEvent removes an old invite event from the database. -// Returns an error if there was a problem communicating with the database. func (d *SyncServerDatasource) RetireInviteEvent( ctx context.Context, inviteEventID string, ) error { @@ -725,16 +678,12 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback d.eduCache.SetTimeoutCallback(fn) } -// AddTypingUser adds a typing user to the typing cache. -// Returns the newly calculated sync position for typing notifications. func (d *SyncServerDatasource) AddTypingUser( userID, roomID string, expireTime *time.Time, ) types.StreamPosition { return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) } -// RemoveTypingUser removes a typing user from the typing cache. -// Returns the newly calculated sync position for typing notifications. func (d *SyncServerDatasource) RemoveTypingUser( userID, roomID string, ) types.StreamPosition { @@ -1073,9 +1022,6 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom( return s, nil } -// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and -// matches the streamevent.transactionID device then the transaction ID gets -// added to the unsigned section of the output event. func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent { out := make([]gomatrixserverlib.HeaderedEvent, len(in)) for i := 0; i < len(in); i++ {