diff --git a/eduserver/cache/cache.go b/eduserver/cache/cache.go index dd535a6d2..f637d7c97 100644 --- a/eduserver/cache/cache.go +++ b/eduserver/cache/cache.go @@ -113,19 +113,6 @@ func (t *EDUCache) AddTypingUser( return t.GetLatestSyncPosition() } -// AddSendToDeviceMessage increases the sync position for -// send-to-device updates. -// Returns the sync position before update, as the caller -// will use this to record the current stream position -// at the time that the send-to-device message was sent. -func (t *EDUCache) AddSendToDeviceMessage() int64 { - t.Lock() - defer t.Unlock() - latestSyncPosition := t.latestSyncPosition - t.latestSyncPosition++ - return latestSyncPosition -} - // addUser with mutex lock & replace the previous timer. // Returns the latest typing sync position after update. func (t *EDUCache) addUser( diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 3765fe23f..7948ecdf5 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" log "github.com/sirupsen/logrus" ) @@ -30,6 +31,7 @@ import ( type OutputClientDataConsumer struct { clientAPIConsumer *internal.ContinualConsumer db storage.Database + streams *streams.Streams } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. @@ -37,6 +39,7 @@ func NewOutputClientDataConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, store storage.Database, + streams *streams.Streams, ) *OutputClientDataConsumer { consumer := internal.ContinualConsumer{ @@ -48,6 +51,7 @@ func NewOutputClientDataConsumer( s := &OutputClientDataConsumer{ clientAPIConsumer: &consumer, db: store, + streams: streams, } consumer.ProcessMessage = s.onMessage diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index fd1f806ce..68b388acb 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" log "github.com/sirupsen/logrus" ) @@ -30,6 +31,7 @@ import ( type OutputReceiptEventConsumer struct { receiptConsumer *internal.ContinualConsumer db storage.Database + streams *streams.Streams } // NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. @@ -38,6 +40,7 @@ func NewOutputReceiptEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, store storage.Database, + streams *streams.Streams, ) *OutputReceiptEventConsumer { consumer := internal.ContinualConsumer{ @@ -50,6 +53,7 @@ func NewOutputReceiptEventConsumer( s := &OutputReceiptEventConsumer{ receiptConsumer: &consumer, db: store, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -82,7 +86,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro return err } - s.db.TypingStream().Advance(streamPos) + s.streams.TypingStreamProvider.Advance(streamPos) return nil } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 5dc684036..10b31f10a 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" @@ -33,6 +34,7 @@ type OutputSendToDeviceEventConsumer struct { sendToDeviceConsumer *internal.ContinualConsumer db storage.Database serverName gomatrixserverlib.ServerName // our server name + streams *streams.Streams } // NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer. @@ -41,6 +43,7 @@ func NewOutputSendToDeviceEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, store storage.Database, + streams *streams.Streams, ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ @@ -54,6 +57,7 @@ func NewOutputSendToDeviceEventConsumer( sendToDeviceConsumer: &consumer, db: store, serverName: cfg.Matrix.ServerName, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -97,7 +101,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - s.db.SendToDeviceStream().Advance(streamPos) + s.streams.SendToDeviceStreamProvider.Advance(streamPos) return nil } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 585ddf6c9..853d3845e 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -19,9 +19,11 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" log "github.com/sirupsen/logrus" ) @@ -29,7 +31,8 @@ import ( // OutputTypingEventConsumer consumes events that originated in the EDU server. type OutputTypingEventConsumer struct { typingConsumer *internal.ContinualConsumer - db storage.Database + eduCache *cache.EDUCache + streams *streams.Streams } // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. @@ -38,6 +41,8 @@ func NewOutputTypingEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, store storage.Database, + eduCache *cache.EDUCache, + streams *streams.Streams, ) *OutputTypingEventConsumer { consumer := internal.ContinualConsumer{ @@ -49,7 +54,8 @@ func NewOutputTypingEventConsumer( s := &OutputTypingEventConsumer{ typingConsumer: &consumer, - db: store, + eduCache: eduCache, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -59,10 +65,11 @@ func NewOutputTypingEventConsumer( // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { - s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { - s.db.TypingStream().Advance(types.StreamPosition(latestSyncPosition)) - }) - + /* + s.eduCache.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { + s.eduCache.TypingStream().Advance(types.StreamPosition(latestSyncPosition)) + }) + */ return s.typingConsumer.Start() } @@ -83,12 +90,16 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error var typingPos types.StreamPosition typingEvent := output.Event if typingEvent.Typing { - typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime) + typingPos = types.StreamPosition( + s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime), + ) } else { - typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) + typingPos = types.StreamPosition( + s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID), + ) } - s.db.TypingStream().Advance(typingPos) + s.streams.TypingStreamProvider.Advance(typingPos) return nil } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 12a3ef9d9..9f58051e8 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -33,6 +34,7 @@ import ( type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer db storage.Database + streams *streams.Streams serverName gomatrixserverlib.ServerName // our server name rsAPI roomserverAPI.RoomserverInternalAPI keyAPI api.KeyInternalAPI @@ -49,6 +51,7 @@ func NewOutputKeyChangeEventConsumer( keyAPI api.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, store storage.Database, + streams *streams.Streams, ) *OutputKeyChangeEventConsumer { consumer := internal.ContinualConsumer{ @@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer( rsAPI: rsAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -115,7 +119,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er Partition: msg.Partition, } - s.db.DeviceListStream().Advance(posUpdate) + s.streams.DeviceListStreamProvider.Advance(posUpdate) //for userID := range queryRes.UserIDsToCount { // s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index b4c7e6e52..9de359dd2 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -35,6 +36,7 @@ type OutputRoomEventConsumer struct { rsAPI api.RoomserverInternalAPI rsConsumer *internal.ContinualConsumer db storage.Database + streams *streams.Streams } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -42,6 +44,7 @@ func NewOutputRoomEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, store storage.Database, + streams *streams.Streams, rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { @@ -55,6 +58,7 @@ func NewOutputRoomEventConsumer( cfg: cfg, rsConsumer: &consumer, db: store, + streams: streams, rsAPI: rsAPI, } consumer.ProcessMessage = s.onMessage @@ -176,7 +180,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } - s.db.PDUStream().Advance(pduPos) + s.streams.PDUStreamProvider.Advance(pduPos) return nil } @@ -215,7 +219,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return err } - s.db.PDUStream().Advance(pduPos) + s.streams.PDUStreamProvider.Advance(pduPos) return nil } @@ -271,7 +275,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( return nil } - s.db.InviteStream().Advance(pduPos) + s.streams.InviteStreamProvider.Advance(pduPos) return nil } @@ -291,7 +295,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( // Notify any active sync requests that the invite has been retired. // Invites share the same stream counter as PDUs - s.db.InviteStream().Advance(pduPos) + s.streams.InviteStreamProvider.Advance(pduPos) return nil } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 93372f7a2..14af66c2f 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -16,11 +16,10 @@ package storage import ( "context" - "time" + "database/sql" eduAPI "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" @@ -31,14 +30,25 @@ import ( type Database interface { internal.PartitionStorer - PDUStream() types.StreamProvider - PDUTopology() types.TopologyProvider - TypingStream() types.StreamProvider - ReceiptStream() types.StreamProvider - InviteStream() types.StreamProvider - SendToDeviceStream() types.StreamProvider - AccountDataStream() types.StreamProvider - DeviceListStream() types.StreamLogProvider + ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error) + + MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) + MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) + MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) + + CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) + GetStateDeltas(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) + RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) + + RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) + + GetBackwardTopologyPos(ctx context.Context, txn *sql.Tx, events []types.StreamEvent) (types.TopologyToken, error) + PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) + + InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) + PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) + RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) @@ -95,15 +105,6 @@ type Database interface { // DeletePeek deletes all peeks for a given room by a given user // Returns an error if there was a problem communicating with the database. DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error) - // SetTypingTimeoutCallback sets a callback function that is called right after - // a user is removed from the typing user list due to timeout. - SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) - // AddTypingUser adds a typing user to the typing cache. - // Returns the newly calculated sync position for typing notifications. - AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition - // RemoveTypingUser removes a typing user from the typing cache. - // Returns the newly calculated sync position for typing notifications. - RemoveTypingUser(userID, roomID string) types.StreamPosition // GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit. GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit. @@ -118,8 +119,6 @@ type Database interface { // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent - // AddSendToDevice increases the EDU position in the cache and returns the stream position. - AddSendToDevice() types.StreamPosition // SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns three lists: // - "events": a list of send-to-device events that should be included in the sync // - "changes": a list of send-to-device events that should be updated in the database by diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 51c21cb6f..51840304c 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -20,12 +20,10 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" - userapi "github.com/matrix-org/dendrite/userapi/api" ) // SyncServerDatasource represents a sync server datasource which manages @@ -39,7 +37,7 @@ type SyncServerDatasource struct { // NewDatabase creates a new sync server database // nolint:gocyclo -func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (*SyncServerDatasource, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error if d.db, err = sqlutil.Open(dbProperties); err != nil { @@ -107,8 +105,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInter Filter: filter, SendToDevice: sendToDevice, Receipts: receipts, - EDUCache: cache.New(), } - d.Database.ConfigureProviders(userAPI) return &d, nil } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 83fc49b2e..c0e593464 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -19,12 +19,10 @@ import ( "database/sql" "encoding/json" "fmt" - "time" eduAPI "github.com/matrix-org/dendrite/eduserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -49,74 +47,70 @@ type Database struct { SendToDevice tables.SendToDevice Filter tables.Filter Receipts tables.Receipts - EDUCache *cache.EDUCache - - PDUStreamProvider types.StreamProvider - PDUTopologyProvider types.TopologyProvider - TypingStreamProvider types.StreamProvider - ReceiptStreamProvider types.StreamProvider - InviteStreamProvider types.StreamProvider - SendToDeviceStreamProvider types.StreamProvider - AccountDataStreamProvider types.StreamProvider - DeviceListStreamProvider types.StreamLogProvider } -// ConfigureProviders creates instances of the various -// stream and topology providers provided by the storage -// packages. -func (d *Database) ConfigureProviders(userAPI userapi.UserInternalAPI) { - d.PDUStreamProvider = &PDUStreamProvider{StreamProvider{DB: d}} - d.TypingStreamProvider = &TypingStreamProvider{StreamProvider{DB: d}} - d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}} - d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}} - d.SendToDeviceStreamProvider = &SendToDeviceStreamProvider{StreamProvider{DB: d}} - d.AccountDataStreamProvider = &AccountDataStreamProvider{ - StreamProvider: StreamProvider{DB: d}, - userAPI: userAPI, +func (d *Database) ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error) { + return d.DB.BeginTx(ctx, &sql.TxOptions{ + // Set the isolation level so that we see a snapshot of the database. + // In PostgreSQL repeatable read transactions will see a snapshot taken + // at the first query, and since the transaction is read-only it can't + // run into any serialisation errors. + // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }) +} + +func (d *Database) MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) { + id, err := d.OutputEvents.SelectMaxEventID(ctx, nil) + if err != nil { + return 0, fmt.Errorf("d.OutputEvents.SelectMaxEventID: %w", err) } - d.DeviceListStreamProvider = &DeviceListStreamProvider{StreamLogProvider{DB: d}} - - d.PDUStreamProvider.Setup() - d.TypingStreamProvider.Setup() - d.ReceiptStreamProvider.Setup() - d.InviteStreamProvider.Setup() - d.SendToDeviceStreamProvider.Setup() - d.AccountDataStreamProvider.Setup() - d.DeviceListStreamProvider.Setup() - - d.PDUTopologyProvider = &PDUTopologyProvider{DB: d} + return types.StreamPosition(id), nil } -func (d *Database) PDUStream() types.StreamProvider { - return d.PDUStreamProvider +func (d *Database) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) { + id, err := d.Receipts.SelectMaxReceiptID(ctx, nil) + if err != nil { + return 0, fmt.Errorf("d.Receipts.SelectMaxReceiptID: %w", err) + } + return types.StreamPosition(id), nil } -func (d *Database) PDUTopology() types.TopologyProvider { - return d.PDUTopologyProvider +func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) { + id, err := d.Invites.SelectMaxInviteID(ctx, nil) + if err != nil { + return 0, fmt.Errorf("d.Invites.SelectMaxInviteID: %w", err) + } + return types.StreamPosition(id), nil } -func (d *Database) TypingStream() types.StreamProvider { - return d.TypingStreamProvider +func (d *Database) CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { + return d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilterPart) } -func (d *Database) ReceiptStream() types.StreamProvider { - return d.ReceiptStreamProvider +func (d *Database) RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) { + return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, membership) } -func (d *Database) InviteStream() types.StreamProvider { - return d.InviteStreamProvider +func (d *Database) RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { + return d.OutputEvents.SelectRecentEvents(ctx, txn, roomID, r, limit, chronologicalOrder, onlySyncEvents) } -func (d *Database) SendToDeviceStream() types.StreamProvider { - return d.SendToDeviceStreamProvider +func (d *Database) PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) { + return d.Topology.SelectPositionInTopology(ctx, txn, eventID) } -func (d *Database) AccountDataStream() types.StreamProvider { - return d.AccountDataStreamProvider +func (d *Database) InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) { + return d.Invites.SelectInviteEventsInRange(ctx, txn, targetUserID, r) } -func (d *Database) DeviceListStream() types.StreamLogProvider { - return d.DeviceListStreamProvider +func (d *Database) PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) { + return d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r) +} + +func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) { + return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) } // Events lookups a list of event by their event ID. @@ -166,6 +160,7 @@ func (d *Database) GetEventsInStreamingRange( return events, err } +/* func (d *Database) AddTypingUser( userID, roomID string, expireTime *time.Time, ) types.StreamPosition { @@ -178,13 +173,16 @@ func (d *Database) RemoveTypingUser( return types.StreamPosition(d.EDUCache.RemoveUser(userID, roomID)) } -func (d *Database) AddSendToDevice() types.StreamPosition { - return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage()) -} - func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { d.EDUCache.SetTimeoutCallback(fn) } +*/ + +/* +func (d *Database) AddSendToDevice() types.StreamPosition { + return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage()) +} +*/ func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { return d.CurrentRoomState.SelectJoinedUsers(ctx) @@ -552,7 +550,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. -func (d *Database) getBackwardTopologyPos( +func (d *Database) GetBackwardTopologyPos( ctx context.Context, txn *sql.Tx, events []types.StreamEvent, ) (types.TopologyToken, error) { @@ -576,7 +574,7 @@ func (d *Database) addRoomDeltaToResponse( device *userapi.Device, txn *sql.Tx, r types.Range, - delta stateDelta, + delta types.StateDelta, numRecentEventsPerRoom int, res *types.Response, ) error { @@ -740,11 +738,11 @@ func (d *Database) fetchMissingStateEvents( // the user has new membership events. // A list of joined room IDs is also returned in case the caller needs it. // nolint:gocyclo -func (d *Database) getStateDeltas( +func (d *Database) GetStateDeltas( ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { +) ([]types.StateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -753,7 +751,7 @@ func (d *Database) getStateDeltas( // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. // - Get all CURRENTLY joined rooms, and add them to 'joined' block. - var deltas []stateDelta + var deltas []types.StateDelta // get all the state events ever (i.e. for all available rooms) between these two positions stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) @@ -784,10 +782,10 @@ func (d *Database) getStateDeltas( state[peek.RoomID] = s } if !peek.Deleted { - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Peek, - stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]), - roomID: peek.RoomID, + deltas = append(deltas, types.StateDelta{ + Membership: gomatrixserverlib.Peek, + StateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]), + RoomID: peek.RoomID, }) } } @@ -812,11 +810,11 @@ func (d *Database) getStateDeltas( continue // we'll add this room in when we do joined rooms } - deltas = append(deltas, stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, + deltas = append(deltas, types.StateDelta{ + Membership: membership, + MembershipPos: ev.StreamPosition, + StateEvents: d.StreamEventsToEvents(device, stateStreamEvents), + RoomID: roomID, }) break } @@ -829,10 +827,10 @@ func (d *Database) getStateDeltas( return nil, nil, err } for _, joinedRoomID := range joinedRoomIDs { - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), - roomID: joinedRoomID, + deltas = append(deltas, types.StateDelta{ + Membership: gomatrixserverlib.Join, + StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), + RoomID: joinedRoomID, }) } @@ -844,13 +842,13 @@ func (d *Database) getStateDeltas( // Fetches full state for all joined rooms and uses selectStateInRange to get // updates for other rooms. // nolint:gocyclo -func (d *Database) getStateDeltasForFullStateSync( +func (d *Database) GetStateDeltasForFullStateSync( ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { +) ([]types.StateDelta, []string, error) { // Use a reasonable initial capacity - deltas := make(map[string]stateDelta) + deltas := make(map[string]types.StateDelta) peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r) if err != nil { @@ -864,10 +862,10 @@ func (d *Database) getStateDeltasForFullStateSync( if stateErr != nil { return nil, nil, stateErr } - deltas[peek.RoomID] = stateDelta{ - membership: gomatrixserverlib.Peek, - stateEvents: d.StreamEventsToEvents(device, s), - roomID: peek.RoomID, + deltas[peek.RoomID] = types.StateDelta{ + Membership: gomatrixserverlib.Peek, + StateEvents: d.StreamEventsToEvents(device, s), + RoomID: peek.RoomID, } } } @@ -886,11 +884,11 @@ func (d *Database) getStateDeltasForFullStateSync( for _, ev := range stateStreamEvents { if membership := getMembershipFromEvent(ev.Event, userID); membership != "" { if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. - deltas[roomID] = stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, + deltas[roomID] = types.StateDelta{ + Membership: membership, + MembershipPos: ev.StreamPosition, + StateEvents: d.StreamEventsToEvents(device, stateStreamEvents), + RoomID: roomID, } } @@ -910,15 +908,15 @@ func (d *Database) getStateDeltasForFullStateSync( if stateErr != nil { return nil, nil, stateErr } - deltas[joinedRoomID] = stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, s), - roomID: joinedRoomID, + deltas[joinedRoomID] = types.StateDelta{ + Membership: gomatrixserverlib.Join, + StateEvents: d.StreamEventsToEvents(device, s), + RoomID: joinedRoomID, } } // Create a response array. - result := make([]stateDelta, len(deltas)) + result := make([]types.StateDelta, len(deltas)) i := 0 for _, delta := range deltas { result[i] = delta @@ -1057,15 +1055,6 @@ func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { return membership } -type stateDelta struct { - roomID string - stateEvents []*gomatrixserverlib.HeaderedEvent - membership string - // The PDU stream position of the latest membership event for this user, if applicable. - // Can be 0 if there is no membership event in this delta. - membershipPos types.StreamPosition -} - // StoreReceipt stores user receipts func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { diff --git a/syncapi/storage/shared/topology_pdu.go b/syncapi/storage/shared/topology_pdu.go deleted file mode 100644 index 80387f98d..000000000 --- a/syncapi/storage/shared/topology_pdu.go +++ /dev/null @@ -1,31 +0,0 @@ -package shared - -import ( - "context" - - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" -) - -type PDUTopologyProvider struct { - DB *Database -} - -func (p *PDUTopologyProvider) TopologyRange(ctx context.Context, res *types.Response, roomID string, from, to types.TopologyToken, filter gomatrixserverlib.EventFilter) { - backwardOrdering := from.Depth > to.Depth || from.PDUPosition > to.PDUPosition - - events, err := p.DB.GetEventsInTopologicalRange(ctx, &from, &to, roomID, filter.Limit, backwardOrdering) - if err != nil { - return - } - - _ = events -} - -func (p *PDUTopologyProvider) TopologyLatestPosition(ctx context.Context, roomID string) types.TopologyToken { - token, err := p.DB.MaxTopologicalPosition(ctx, roomID) - if err != nil { - return types.TopologyToken{} - } - return token -} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 5efaf86fe..7abe8dd00 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -21,12 +21,10 @@ import ( // Import the sqlite3 package _ "github.com/mattn/go-sqlite3" - "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" - userapi "github.com/matrix-org/dendrite/userapi/api" ) // SyncServerDatasource represents a sync server datasource which manages @@ -41,7 +39,7 @@ type SyncServerDatasource struct { // NewDatabase creates a new sync server database // nolint: gocyclo -func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (*SyncServerDatasource, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error if d.db, err = sqlutil.Open(dbProperties); err != nil { @@ -51,7 +49,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInter if err = d.prepare(dbProperties); err != nil { return nil, err } - d.ConfigureProviders(userAPI) return &d, nil } @@ -121,7 +118,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er Filter: filter, SendToDevice: sendToDevice, Receipts: receipts, - EDUCache: cache.New(), } return nil } diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index d0efa57d8..15386c338 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -22,16 +22,15 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" - userapi "github.com/matrix-org/dendrite/userapi/api" ) // NewSyncServerDatasource opens a database connection. -func NewSyncServerDatasource(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (Database, error) { +func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, userAPI) + return sqlite3.NewDatabase(dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties, userAPI) + return postgres.NewDatabase(dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/syncapi/storage/shared/stream_accountdata.go b/syncapi/streams/stream_accountdata.go similarity index 99% rename from syncapi/storage/shared/stream_accountdata.go rename to syncapi/streams/stream_accountdata.go index 5502f5f93..169dc8b02 100644 --- a/syncapi/storage/shared/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -1,4 +1,4 @@ -package shared +package streams import ( "context" diff --git a/syncapi/storage/shared/stream_invite.go b/syncapi/streams/stream_invite.go similarity index 84% rename from syncapi/storage/shared/stream_invite.go rename to syncapi/streams/stream_invite.go index ef4c5e4b2..351348369 100644 --- a/syncapi/storage/shared/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -1,4 +1,4 @@ -package shared +package streams import ( "context" @@ -16,12 +16,11 @@ func (p *InviteStreamProvider) Setup() { p.latestMutex.Lock() defer p.latestMutex.Unlock() - latest, err := p.DB.Invites.SelectMaxInviteID(context.Background(), nil) + id, err := p.DB.MaxStreamTokenForInvites(context.Background()) if err != nil { return } - - p.latest = types.StreamPosition(latest) + p.latest = id } func (p *InviteStreamProvider) CompleteSync( @@ -41,7 +40,7 @@ func (p *InviteStreamProvider) IncrementalSync( To: to, } - invites, retiredInvites, err := p.DB.Invites.SelectInviteEventsInRange( + invites, retiredInvites, err := p.DB.InviteEventsInRange( ctx, nil, req.Device.UserID, r, ) if err != nil { diff --git a/syncapi/storage/shared/stream_pdu.go b/syncapi/streams/stream_pdu.go similarity index 82% rename from syncapi/storage/shared/stream_pdu.go rename to syncapi/streams/stream_pdu.go index e77ff063b..42e2b053b 100644 --- a/syncapi/storage/shared/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -1,4 +1,4 @@ -package shared +package streams import ( "context" @@ -14,27 +14,17 @@ type PDUStreamProvider struct { StreamProvider } -var txReadOnlySnapshot = sql.TxOptions{ - // Set the isolation level so that we see a snapshot of the database. - // In PostgreSQL repeatable read transactions will see a snapshot taken - // at the first query, and since the transaction is read-only it can't - // run into any serialisation errors. - // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, -} - func (p *PDUStreamProvider) Setup() { p.StreamProvider.Setup() p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.OutputEvents.SelectMaxEventID(context.Background(), nil) + id, err := p.DB.MaxStreamTokenForPDUs(context.Background()) if err != nil { return } - p.latest = types.StreamPosition(id) + p.latest = id } func (p *PDUStreamProvider) CompleteSync( @@ -47,7 +37,7 @@ func (p *PDUStreamProvider) CompleteSync( // a consistent view of the database throughout. This does have the unfortunate side-effect that all // the matrixy logic resides in this function, but it's better to not hide the fact that this is // being done in a transaction. - txn, err := p.DB.DB.BeginTx(ctx, &txReadOnlySnapshot) + txn, err := p.DB.ReadOnlySnapshot(ctx) if err != nil { return to } @@ -61,8 +51,7 @@ func (p *PDUStreamProvider) CompleteSync( } // Extract room state and recent events for all rooms the user is joined to. - var joinedRoomIDs []string - joinedRoomIDs, err = p.DB.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, req.Device.UserID, gomatrixserverlib.Join) + joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, txn, req.Device.UserID, gomatrixserverlib.Join) if err != nil { return to } @@ -83,7 +72,7 @@ func (p *PDUStreamProvider) CompleteSync( } // Add peeked rooms. - peeks, err := p.DB.Peeks.SelectPeeksInRange(ctx, txn, req.Device.UserID, req.Device.ID, r) + peeks, err := p.DB.PeeksInRange(ctx, txn, req.Device.UserID, req.Device.ID, r) if err != nil { return to } @@ -101,7 +90,6 @@ func (p *PDUStreamProvider) CompleteSync( } succeeded = true - return p.LatestPosition(ctx) } @@ -120,18 +108,18 @@ func (p *PDUStreamProvider) IncrementalSync( var err error //var events []types.StreamEvent - var stateDeltas []stateDelta + var stateDeltas []types.StateDelta var joinedRooms []string // TODO: use filter provided in request stateFilter := gomatrixserverlib.DefaultStateFilter() if req.WantFullState { - if stateDeltas, joinedRooms, err = p.DB.getStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { return } } else { - if stateDeltas, joinedRooms, err = p.DB.getStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { return } } @@ -155,56 +143,56 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( device *userapi.Device, txn *sql.Tx, r types.Range, - delta stateDelta, + delta types.StateDelta, numRecentEventsPerRoom int, res *types.Response, ) error { - if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { + if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave // in a single /sync request // This is all "okay" assuming history_visibility == "shared" which it is by default. - r.To = delta.membershipPos + r.To = delta.MembershipPos } - recentStreamEvents, limited, err := p.DB.OutputEvents.SelectRecentEvents( - ctx, txn, delta.roomID, r, + recentStreamEvents, limited, err := p.DB.RecentEvents( + ctx, txn, delta.RoomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { return err } recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) - delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - prevBatch, err := p.DB.getBackwardTopologyPos(ctx, txn, recentStreamEvents) + delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back + prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, txn, recentStreamEvents) if err != nil { return err } // XXX: should we ever get this far if we have no recent events or state in this room? // in practice we do for peeks, but possibly not joins? - if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { + if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { return nil } - switch delta.membership { + switch delta.Membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() jr.Timeline.PrevBatch = &prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.roomID] = *jr + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.RoomID] = *jr case gomatrixserverlib.Peek: jr := types.NewJoinResponse() jr.Timeline.PrevBatch = &prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Peek[delta.roomID] = *jr + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Peek[delta.RoomID] = *jr case gomatrixserverlib.Leave: fallthrough // transitions to leave are the same as ban case gomatrixserverlib.Ban: @@ -214,8 +202,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( lr.Timeline.PrevBatch = &prevBatch lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.roomID] = *lr + lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.RoomID] = *lr } return nil @@ -229,7 +217,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( numRecentEventsPerRoom int, device *userapi.Device, ) (jr *types.JoinResponse, err error) { var stateEvents []*gomatrixserverlib.HeaderedEvent - stateEvents, err = p.DB.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) + stateEvents, err = p.DB.CurrentState(ctx, txn, roomID, stateFilter) if err != nil { return } @@ -237,7 +225,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent var limited bool - recentStreamEvents, limited, err = p.DB.OutputEvents.SelectRecentEvents( + recentStreamEvents, limited, err = p.DB.RecentEvents( ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { @@ -276,7 +264,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( var prevBatch *types.TopologyToken if len(recentStreamEvents) > 0 { var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = p.DB.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) + backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) if err != nil { return } diff --git a/syncapi/storage/shared/stream_receipt.go b/syncapi/streams/stream_receipt.go similarity index 90% rename from syncapi/storage/shared/stream_receipt.go rename to syncapi/streams/stream_receipt.go index 1ad2d141f..cd345c312 100644 --- a/syncapi/storage/shared/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -1,4 +1,4 @@ -package shared +package streams import ( "context" @@ -16,12 +16,11 @@ type ReceiptStreamProvider struct { func (p *ReceiptStreamProvider) Setup() { p.StreamProvider.Setup() - latest, err := p.DB.Receipts.SelectMaxReceiptID(context.Background(), nil) + id, err := p.DB.MaxStreamTokenForReceipts(context.Background()) if err != nil { return } - - p.latest = types.StreamPosition(latest) + p.latest = id } func (p *ReceiptStreamProvider) CompleteSync( @@ -43,7 +42,7 @@ func (p *ReceiptStreamProvider) IncrementalSync( } } - lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(ctx, joinedRooms, from) + lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from) if err != nil { return to //fmt.Errorf("unable to select receipts for rooms: %w", err) } diff --git a/syncapi/storage/shared/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go similarity index 98% rename from syncapi/storage/shared/stream_sendtodevice.go rename to syncapi/streams/stream_sendtodevice.go index ef3ccf2da..cd60dd53c 100644 --- a/syncapi/storage/shared/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -1,4 +1,4 @@ -package shared +package streams import ( "context" diff --git a/syncapi/storage/shared/stream_typing.go b/syncapi/streams/stream_typing.go similarity index 88% rename from syncapi/storage/shared/stream_typing.go rename to syncapi/streams/stream_typing.go index 767d8ca50..8100c0d92 100644 --- a/syncapi/storage/shared/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -1,15 +1,17 @@ -package shared +package streams import ( "context" "encoding/json" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) type TypingStreamProvider struct { StreamProvider + EDUCache *cache.EDUCache } func (p *TypingStreamProvider) CompleteSync( @@ -35,7 +37,7 @@ func (p *TypingStreamProvider) IncrementalSync( jr := req.Response.Rooms.Join[roomID] - if users, updated := p.DB.EDUCache.GetTypingUsersIfUpdatedAfter( + if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter( roomID, int64(from), ); updated { ev := gomatrixserverlib.ClientEvent{ diff --git a/syncapi/storage/shared/streamlog_devicelist.go b/syncapi/streams/streamlog_devicelist.go similarity index 96% rename from syncapi/storage/shared/streamlog_devicelist.go rename to syncapi/streams/streamlog_devicelist.go index 844c640c3..2f67e8da5 100644 --- a/syncapi/storage/shared/streamlog_devicelist.go +++ b/syncapi/streams/streamlog_devicelist.go @@ -1,4 +1,4 @@ -package shared +package streams import ( "context" diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go new file mode 100644 index 000000000..966d0f2ae --- /dev/null +++ b/syncapi/streams/streams.go @@ -0,0 +1,50 @@ +package streams + +import ( + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" +) + +type Streams struct { + PDUStreamProvider types.StreamProvider + PDUTopologyProvider types.TopologyProvider + TypingStreamProvider types.StreamProvider + ReceiptStreamProvider types.StreamProvider + InviteStreamProvider types.StreamProvider + SendToDeviceStreamProvider types.StreamProvider + AccountDataStreamProvider types.StreamProvider + DeviceListStreamProvider types.StreamLogProvider +} + +func NewSyncStreamProviders( + d storage.Database, userAPI userapi.UserInternalAPI, + eduCache *cache.EDUCache, +) *Streams { + streams := &Streams{ + PDUStreamProvider: &PDUStreamProvider{StreamProvider{DB: d}}, + TypingStreamProvider: &TypingStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + EDUCache: eduCache, + }, + ReceiptStreamProvider: &ReceiptStreamProvider{StreamProvider{DB: d}}, + InviteStreamProvider: &InviteStreamProvider{StreamProvider{DB: d}}, + SendToDeviceStreamProvider: &SendToDeviceStreamProvider{StreamProvider{DB: d}}, + AccountDataStreamProvider: &AccountDataStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + userAPI: userAPI, + }, + DeviceListStreamProvider: &DeviceListStreamProvider{StreamLogProvider{DB: d}}, + } + + streams.PDUStreamProvider.Setup() + streams.TypingStreamProvider.Setup() + streams.ReceiptStreamProvider.Setup() + streams.InviteStreamProvider.Setup() + streams.SendToDeviceStreamProvider.Setup() + streams.AccountDataStreamProvider.Setup() + streams.DeviceListStreamProvider.Setup() + + return streams +} diff --git a/syncapi/storage/shared/stream.go b/syncapi/streams/template_stream.go similarity index 94% rename from syncapi/storage/shared/stream.go rename to syncapi/streams/template_stream.go index 76c5acf38..e85529f62 100644 --- a/syncapi/storage/shared/stream.go +++ b/syncapi/streams/template_stream.go @@ -1,14 +1,15 @@ -package shared +package streams import ( "context" "sync" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type StreamProvider struct { - DB *Database + DB storage.Database latest types.StreamPosition latestMutex sync.RWMutex update *sync.Cond diff --git a/syncapi/storage/shared/streamlog.go b/syncapi/streams/template_streamlog.go similarity index 94% rename from syncapi/storage/shared/streamlog.go rename to syncapi/streams/template_streamlog.go index a3d8e2a53..39566f8b1 100644 --- a/syncapi/storage/shared/streamlog.go +++ b/syncapi/streams/template_streamlog.go @@ -1,14 +1,15 @@ -package shared +package streams import ( "context" "sync" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type StreamLogProvider struct { - DB *Database + DB storage.Database latest types.LogPosition latestMutex sync.RWMutex update *sync.Cond diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 2827271fd..ff84d688a 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" @@ -39,19 +40,13 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - cfg *config.SyncAPI - userAPI userapi.UserInternalAPI - keyAPI keyapi.KeyInternalAPI - rsAPI roomserverAPI.RoomserverInternalAPI - lastseen sync.Map - pduStream types.StreamProvider - typingStream types.StreamProvider - receiptStream types.StreamProvider - sendToDeviceStream types.StreamProvider - inviteStream types.StreamProvider - accountDataStream types.StreamProvider - deviceListStream types.StreamLogProvider + db storage.Database + cfg *config.SyncAPI + userAPI userapi.UserInternalAPI + keyAPI keyapi.KeyInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI + lastseen sync.Map + streams *streams.Streams } // NewRequestPool makes a new RequestPool @@ -59,21 +54,16 @@ func NewRequestPool( db storage.Database, cfg *config.SyncAPI, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, + streams *streams.Streams, ) *RequestPool { rp := &RequestPool{ - db: db, - cfg: cfg, - userAPI: userAPI, - keyAPI: keyAPI, - rsAPI: rsAPI, - lastseen: sync.Map{}, - pduStream: db.PDUStream(), - typingStream: db.TypingStream(), - receiptStream: db.ReceiptStream(), - sendToDeviceStream: db.SendToDeviceStream(), - inviteStream: db.InviteStream(), - accountDataStream: db.AccountDataStream(), - deviceListStream: db.DeviceListStream(), + db: db, + cfg: cfg, + userAPI: userAPI, + keyAPI: keyAPI, + rsAPI: rsAPI, + lastseen: sync.Map{}, + streams: streams, } go rp.cleanLastSeen() return rp @@ -186,13 +176,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. case <-timer.C: // Timeout reached return giveup() - case <-rp.pduStream.NotifyAfter(waitctx, syncReq.Since.PDUPosition): - case <-rp.typingStream.NotifyAfter(waitctx, syncReq.Since.TypingPosition): - case <-rp.receiptStream.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition): - case <-rp.inviteStream.NotifyAfter(waitctx, syncReq.Since.InvitePosition): - case <-rp.sendToDeviceStream.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition): - case <-rp.accountDataStream.NotifyAfter(waitctx, syncReq.Since.AccountDataPosition): - case <-rp.deviceListStream.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition): + case <-rp.streams.PDUStreamProvider.NotifyAfter(waitctx, syncReq.Since.PDUPosition): + case <-rp.streams.TypingStreamProvider.NotifyAfter(waitctx, syncReq.Since.TypingPosition): + case <-rp.streams.ReceiptStreamProvider.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition): + case <-rp.streams.InviteStreamProvider.NotifyAfter(waitctx, syncReq.Since.InvitePosition): + case <-rp.streams.SendToDeviceStreamProvider.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition): + case <-rp.streams.AccountDataStreamProvider.NotifyAfter(waitctx, syncReq.Since.AccountDataPosition): + case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition): } syncReq.Log.Println("Responding to sync after wakeup") @@ -204,58 +194,65 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. if syncReq.Since.IsEmpty() { // Complete sync syncReq.Response.NextBatch = types.StreamingToken{ - PDUPosition: rp.pduStream.CompleteSync( + PDUPosition: rp.streams.PDUStreamProvider.CompleteSync( syncReq.Context, syncReq, ), - TypingPosition: rp.typingStream.CompleteSync( + TypingPosition: rp.streams.TypingStreamProvider.CompleteSync( syncReq.Context, syncReq, ), - ReceiptPosition: rp.receiptStream.CompleteSync( + ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync( syncReq.Context, syncReq, ), - InvitePosition: rp.inviteStream.CompleteSync( + InvitePosition: rp.streams.InviteStreamProvider.CompleteSync( syncReq.Context, syncReq, ), - SendToDevicePosition: rp.sendToDeviceStream.CompleteSync( + SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync( syncReq.Context, syncReq, ), - AccountDataPosition: rp.accountDataStream.CompleteSync( + AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync( syncReq.Context, syncReq, ), - DeviceListPosition: rp.deviceListStream.CompleteSync( + DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( syncReq.Context, syncReq, ), } } else { // Incremental sync syncReq.Response.NextBatch = types.StreamingToken{ - PDUPosition: rp.pduStream.IncrementalSync( + PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.PDUPosition, rp.pduStream.LatestPosition(syncReq.Context), + syncReq.Since.PDUPosition, // from + rp.streams.PDUStreamProvider.LatestPosition(syncReq.Context), // to ), - TypingPosition: rp.typingStream.IncrementalSync( + TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.TypingPosition, rp.typingStream.LatestPosition(syncReq.Context), + syncReq.Since.TypingPosition, // from + rp.streams.TypingStreamProvider.LatestPosition(syncReq.Context), // to ), - ReceiptPosition: rp.receiptStream.IncrementalSync( - syncReq.Context, syncReq, syncReq.Since.ReceiptPosition, - rp.receiptStream.LatestPosition(syncReq.Context), + ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.ReceiptPosition, // from + rp.streams.ReceiptStreamProvider.LatestPosition(syncReq.Context), // to ), - InvitePosition: rp.inviteStream.IncrementalSync( - syncReq.Context, syncReq, syncReq.Since.InvitePosition, - rp.inviteStream.LatestPosition(syncReq.Context), + InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.InvitePosition, // from + rp.streams.InviteStreamProvider.LatestPosition(syncReq.Context), // to ), - SendToDevicePosition: rp.sendToDeviceStream.IncrementalSync( - syncReq.Context, syncReq, syncReq.Since.SendToDevicePosition, - rp.sendToDeviceStream.LatestPosition(syncReq.Context), + SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.SendToDevicePosition, // from + rp.streams.SendToDeviceStreamProvider.LatestPosition(syncReq.Context), // to ), - AccountDataPosition: rp.accountDataStream.IncrementalSync( - syncReq.Context, syncReq, syncReq.Since.AccountDataPosition, - rp.accountDataStream.LatestPosition(syncReq.Context), + AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.AccountDataPosition, // from + rp.streams.AccountDataStreamProvider.LatestPosition(syncReq.Context), // to ), - DeviceListPosition: rp.deviceListStream.IncrementalSync( - syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, - rp.db.DeviceListStream().LatestPosition(syncReq.Context), + DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.DeviceListPosition, // from + rp.streams.DeviceListStreamProvider.LatestPosition(syncReq.Context), // to ), } } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 0b79eb91b..420b94ab6 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -18,6 +18,7 @@ import ( "github.com/gorilla/mux" "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/eduserver/cache" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" @@ -28,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/sync" ) @@ -43,51 +45,54 @@ func AddPublicRoutes( ) { consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) - syncDB, err := storage.NewSyncServerDatasource(&cfg.Database, userAPI) + syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI) + eduCache := cache.New() + streams := streams.NewSyncStreamProviders(syncDB, userAPI, eduCache) + + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, keyAPI, rsAPI, syncDB, + consumer, keyAPI, rsAPI, syncDB, streams, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer") } roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, syncDB, rsAPI, + cfg, consumer, syncDB, streams, rsAPI, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") } clientConsumer := consumers.NewOutputClientDataConsumer( - cfg, consumer, syncDB, + cfg, consumer, syncDB, streams, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - cfg, consumer, syncDB, + cfg, consumer, syncDB, eduCache, streams, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - cfg, consumer, syncDB, + cfg, consumer, syncDB, streams, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - cfg, consumer, syncDB, + cfg, consumer, syncDB, streams, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer") diff --git a/syncapi/types/types.go b/syncapi/types/types.go index a9a4d40c1..412a6439d 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -35,6 +35,15 @@ var ( ErrInvalidSyncTokenLen = fmt.Errorf("Sync token has an invalid length") ) +type StateDelta struct { + RoomID string + StateEvents []*gomatrixserverlib.HeaderedEvent + Membership string + // The PDU stream position of the latest membership event for this user, if applicable. + // Can be 0 if there is no membership event in this delta. + MembershipPos StreamPosition +} + // StreamPosition represents the offset in the sync stream a client is at. type StreamPosition int64