From 0291be9e7faefe8c7dbdc4eef658e88f06ddacb2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 8 Jan 2021 16:04:02 +0000 Subject: [PATCH] Review comments @Kegsay --- syncapi/consumers/clientapi.go | 9 ++++----- syncapi/consumers/eduserver_receipts.go | 9 ++++----- syncapi/consumers/eduserver_sendtodevice.go | 9 ++++----- syncapi/consumers/eduserver_typing.go | 9 ++++----- syncapi/consumers/keychange.go | 9 ++++----- syncapi/consumers/roomserver.go | 5 ++--- syncapi/notifier/notifier.go | 4 ++++ syncapi/storage/interface.go | 8 ++++---- syncapi/storage/shared/syncserver.go | 8 ++++---- syncapi/streams/stream_accountdata.go | 2 +- syncapi/streams/stream_invite.go | 2 +- syncapi/streams/stream_pdu.go | 4 +++- syncapi/streams/stream_receipt.go | 2 +- syncapi/syncapi.go | 12 ++++++------ 14 files changed, 46 insertions(+), 46 deletions(-) diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index b5cc47318..4958f2216 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" log "github.com/sirupsen/logrus" ) @@ -33,7 +32,7 @@ import ( type OutputClientDataConsumer struct { clientAPIConsumer *internal.ContinualConsumer db storage.Database - streams *streams.Streams + stream types.StreamProvider notifier *notifier.Notifier } @@ -43,7 +42,7 @@ func NewOutputClientDataConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - streams *streams.Streams, + stream types.StreamProvider, ) *OutputClientDataConsumer { consumer := internal.ContinualConsumer{ @@ -56,7 +55,7 @@ func NewOutputClientDataConsumer( clientAPIConsumer: &consumer, db: store, notifier: notifier, - streams: streams, + stream: stream, } consumer.ProcessMessage = s.onMessage @@ -96,7 +95,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.streams.AccountDataStreamProvider.Advance(streamPos) + s.stream.Advance(streamPos) s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) return nil diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index bd3ec9793..bd538eff2 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" log "github.com/sirupsen/logrus" ) @@ -33,7 +32,7 @@ import ( type OutputReceiptEventConsumer struct { receiptConsumer *internal.ContinualConsumer db storage.Database - streams *streams.Streams + stream types.StreamProvider notifier *notifier.Notifier } @@ -44,7 +43,7 @@ func NewOutputReceiptEventConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - streams *streams.Streams, + stream types.StreamProvider, ) *OutputReceiptEventConsumer { consumer := internal.ContinualConsumer{ @@ -58,7 +57,7 @@ func NewOutputReceiptEventConsumer( receiptConsumer: &consumer, db: store, notifier: notifier, - streams: streams, + stream: stream, } consumer.ProcessMessage = s.onMessage @@ -91,7 +90,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro return err } - s.streams.ReceiptStreamProvider.Advance(streamPos) + s.stream.Advance(streamPos) s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) return nil diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index a22b66795..6e774b5b4 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -36,7 +35,7 @@ type OutputSendToDeviceEventConsumer struct { sendToDeviceConsumer *internal.ContinualConsumer db storage.Database serverName gomatrixserverlib.ServerName // our server name - streams *streams.Streams + stream types.StreamProvider notifier *notifier.Notifier } @@ -47,7 +46,7 @@ func NewOutputSendToDeviceEventConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - streams *streams.Streams, + stream types.StreamProvider, ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ @@ -62,7 +61,7 @@ func NewOutputSendToDeviceEventConsumer( db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, - streams: streams, + stream: stream, } consumer.ProcessMessage = s.onMessage @@ -106,7 +105,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - s.streams.SendToDeviceStreamProvider.Advance(streamPos) + s.stream.Advance(streamPos) s.notifier.OnNewSendToDevice( output.UserID, []string{output.DeviceID}, diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index a8a67d653..3edf6675d 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" log "github.com/sirupsen/logrus" ) @@ -33,7 +32,7 @@ import ( type OutputTypingEventConsumer struct { typingConsumer *internal.ContinualConsumer eduCache *cache.EDUCache - streams *streams.Streams + stream types.StreamProvider notifier *notifier.Notifier } @@ -45,7 +44,7 @@ func NewOutputTypingEventConsumer( store storage.Database, eduCache *cache.EDUCache, notifier *notifier.Notifier, - streams *streams.Streams, + stream types.StreamProvider, ) *OutputTypingEventConsumer { consumer := internal.ContinualConsumer{ @@ -59,7 +58,7 @@ func NewOutputTypingEventConsumer( typingConsumer: &consumer, eduCache: eduCache, notifier: notifier, - streams: streams, + stream: stream, } consumer.ProcessMessage = s.onMessage @@ -102,7 +101,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error ) } - s.streams.TypingStreamProvider.Advance(typingPos) + s.stream.Advance(typingPos) s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) return nil diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 919b7c94d..af7b280fa 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -25,7 +25,6 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -36,7 +35,7 @@ type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer db storage.Database notifier *notifier.Notifier - streams *streams.Streams + stream types.PartitionedStreamProvider serverName gomatrixserverlib.ServerName // our server name rsAPI roomserverAPI.RoomserverInternalAPI keyAPI api.KeyInternalAPI @@ -54,7 +53,7 @@ func NewOutputKeyChangeEventConsumer( rsAPI roomserverAPI.RoomserverInternalAPI, store storage.Database, notifier *notifier.Notifier, - streams *streams.Streams, + stream types.PartitionedStreamProvider, ) *OutputKeyChangeEventConsumer { consumer := internal.ContinualConsumer{ @@ -73,7 +72,7 @@ func NewOutputKeyChangeEventConsumer( partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, notifier: notifier, - streams: streams, + stream: stream, } consumer.ProcessMessage = s.onMessage @@ -123,7 +122,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er Partition: msg.Partition, } - s.streams.DeviceListStreamProvider.Advance(posUpdate) + s.stream.Advance(posUpdate) for userID := range queryRes.UserIDsToCount { s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index dbd9b3aba..865fb5b57 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -301,7 +301,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } // Notify any active sync requests that the invite has been retired. - // Invites share the same stream counter as PDUs s.streams.InviteStreamProvider.Advance(pduPos) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) @@ -324,7 +323,7 @@ func (s *OutputRoomEventConsumer) onNewPeek( // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this s.streams.PDUStreamProvider.Advance(sp) - s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) + s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) return nil } @@ -345,7 +344,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek( // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this s.streams.PDUStreamProvider.Advance(sp) - s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) + s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) return nil } diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 6c4d4a0ca..d853cc0e4 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -136,10 +136,12 @@ func (n *Notifier) OnNewAccountData( func (n *Notifier) OnNewPeek( roomID, userID, deviceID string, + posUpdate types.StreamingToken, ) { n.streamLock.Lock() defer n.streamLock.Unlock() + n.currPos.ApplyUpdates(posUpdate) n.addPeekingDevice(roomID, userID, deviceID) // we don't wake up devices here given the roomserver consumer will do this shortly afterwards @@ -148,10 +150,12 @@ func (n *Notifier) OnNewPeek( func (n *Notifier) OnRetirePeek( roomID, userID, deviceID string, + posUpdate types.StreamingToken, ) { n.streamLock.Lock() defer n.streamLock.Unlock() + n.currPos.ApplyUpdates(posUpdate) n.removePeekingDevice(roomID, userID, deviceID) // we don't wake up devices here given the roomserver consumer will do this shortly afterwards diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 45d724372..d66e99640 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -29,10 +29,10 @@ import ( type Database interface { internal.PartitionStorer - MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) - MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) - MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) - MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error) + MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) + MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) + MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) + MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 85d1a095f..ebb996739 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -61,7 +61,7 @@ func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { }) } -func (d *Database) MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) { +func (d *Database) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) { id, err := d.OutputEvents.SelectMaxEventID(ctx, nil) if err != nil { return 0, fmt.Errorf("d.OutputEvents.SelectMaxEventID: %w", err) @@ -69,7 +69,7 @@ func (d *Database) MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosit return types.StreamPosition(id), nil } -func (d *Database) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) { +func (d *Database) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) { id, err := d.Receipts.SelectMaxReceiptID(ctx, nil) if err != nil { return 0, fmt.Errorf("d.Receipts.SelectMaxReceiptID: %w", err) @@ -77,7 +77,7 @@ func (d *Database) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamP return types.StreamPosition(id), nil } -func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) { +func (d *Database) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) { id, err := d.Invites.SelectMaxInviteID(ctx, nil) if err != nil { return 0, fmt.Errorf("d.Invites.SelectMaxInviteID: %w", err) @@ -85,7 +85,7 @@ func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPo return types.StreamPosition(id), nil } -func (d *Database) MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error) { +func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) { id, err := d.AccountData.SelectMaxAccountDataID(ctx, nil) if err != nil { return 0, fmt.Errorf("d.Invites.SelectMaxAccountDataID: %w", err) diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 666414f1e..aa7f0937d 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -19,7 +19,7 @@ func (p *AccountDataStreamProvider) Setup() { p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.MaxStreamTokenForAccountData(context.Background()) + id, err := p.DB.MaxStreamPositionForAccountData(context.Background()) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 7d5df1931..10a0dda86 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -16,7 +16,7 @@ func (p *InviteStreamProvider) Setup() { p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.MaxStreamTokenForInvites(context.Background()) + id, err := p.DB.MaxStreamPositionForInvites(context.Background()) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 08d1eb3b0..016c182e8 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -18,7 +18,7 @@ func (p *PDUStreamProvider) Setup() { p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.MaxStreamTokenForPDUs(context.Background()) + id, err := p.DB.MaxStreamPositionForPDUs(context.Background()) if err != nil { panic(err) } @@ -33,6 +33,8 @@ func (p *PDUStreamProvider) CompleteSync( to := p.LatestPosition(ctx) // Get the current sync position which we will base the sync response on. + // For complete syncs, we want to start at the most recent events and work + // backwards, so that we show the most recent events in the room. r := types.Range{ From: to, To: 0, diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index bba47a877..259d07bd4 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -16,7 +16,7 @@ type ReceiptStreamProvider struct { func (p *ReceiptStreamProvider) Setup() { p.StreamProvider.Setup() - id, err := p.DB.MaxStreamTokenForReceipts(context.Background()) + id, err := p.DB.MaxStreamPositionForReceipts(context.Background()) if err != nil { panic(err) } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 7addcb9bb..a52cd202e 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -57,14 +57,14 @@ func AddPublicRoutes( streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache) notifier := notifier.NewNotifier(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { - logrus.WithError(err).Panicf("failed to load notifier") + logrus.WithError(err).Panicf("failed to load notifier ") } requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, keyAPI, rsAPI, syncDB, notifier, streams, + consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer") @@ -78,28 +78,28 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - cfg, consumer, syncDB, notifier, streams, + cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - cfg, consumer, syncDB, eduCache, notifier, streams, + cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - cfg, consumer, syncDB, notifier, streams, + cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - cfg, consumer, syncDB, notifier, streams, + cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer")