From 883f3b5ee7063ca06f3a14bd7e1c5b326a570d5c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 12 Jan 2021 15:15:18 +0000 Subject: [PATCH] Pass through all updated stream positions to notifier, since multiple may have changed --- syncapi/consumers/clientapi.go | 12 ++--- syncapi/consumers/eduserver_receipts.go | 12 ++--- syncapi/consumers/eduserver_sendtodevice.go | 12 ++--- syncapi/consumers/eduserver_typing.go | 12 +++-- syncapi/consumers/keychange.go | 11 +++-- syncapi/consumers/roomserver.go | 53 ++++++++++----------- syncapi/syncapi.go | 12 ++--- 7 files changed, 63 insertions(+), 61 deletions(-) diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 630791d09..1c091e3ff 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/syncapi/streams" log "github.com/sirupsen/logrus" ) @@ -32,7 +32,7 @@ import ( type OutputClientDataConsumer struct { clientAPIConsumer *internal.ContinualConsumer db storage.Database - stream types.StreamProvider + streams *streams.Streams notifier *notifier.Notifier } @@ -42,7 +42,7 @@ func NewOutputClientDataConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - stream types.StreamProvider, + streams *streams.Streams, ) *OutputClientDataConsumer { consumer := internal.ContinualConsumer{ @@ -55,7 +55,7 @@ func NewOutputClientDataConsumer( clientAPIConsumer: &consumer, db: store, notifier: notifier, - stream: stream, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -95,8 +95,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - if s.stream.Advance(streamPos) { - s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) + if s.streams.AccountDataStreamProvider.Advance(streamPos) { + s.notifier.OnNewAccountData(string(msg.Key), s.streams.Latest(context.Background())) } return nil diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 65215f3fe..3bf322c8d 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/syncapi/streams" log "github.com/sirupsen/logrus" ) @@ -32,7 +32,7 @@ import ( type OutputReceiptEventConsumer struct { receiptConsumer *internal.ContinualConsumer db storage.Database - stream types.StreamProvider + streams *streams.Streams notifier *notifier.Notifier } @@ -43,7 +43,7 @@ func NewOutputReceiptEventConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - stream types.StreamProvider, + streams *streams.Streams, ) *OutputReceiptEventConsumer { consumer := internal.ContinualConsumer{ @@ -57,7 +57,7 @@ func NewOutputReceiptEventConsumer( receiptConsumer: &consumer, db: store, notifier: notifier, - stream: stream, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -90,8 +90,8 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro return err } - if s.stream.Advance(streamPos) { - s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) + if s.streams.ReceiptStreamProvider.Advance(streamPos) { + s.notifier.OnNewReceipt(output.RoomID, s.streams.Latest(context.Background())) } return nil diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 98a58ce66..ce1e8dead 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" @@ -35,7 +35,7 @@ type OutputSendToDeviceEventConsumer struct { sendToDeviceConsumer *internal.ContinualConsumer db storage.Database serverName gomatrixserverlib.ServerName // our server name - stream types.StreamProvider + streams *streams.Streams notifier *notifier.Notifier } @@ -46,7 +46,7 @@ func NewOutputSendToDeviceEventConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - stream types.StreamProvider, + streams *streams.Streams, ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ @@ -61,7 +61,7 @@ func NewOutputSendToDeviceEventConsumer( db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, - stream: stream, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -105,11 +105,11 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - if s.stream.Advance(streamPos) { + if s.streams.SendToDeviceStreamProvider.Advance(streamPos) { s.notifier.OnNewSendToDevice( output.UserID, []string{output.DeviceID}, - types.StreamingToken{SendToDevicePosition: streamPos}, + s.streams.Latest(context.Background()), ) } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 87fc018d3..a5281f208 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -15,6 +15,7 @@ package consumers import ( + "context" "encoding/json" "github.com/Shopify/sarama" @@ -24,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" log "github.com/sirupsen/logrus" ) @@ -32,7 +34,7 @@ import ( type OutputTypingEventConsumer struct { typingConsumer *internal.ContinualConsumer eduCache *cache.EDUCache - stream types.StreamProvider + streams *streams.Streams notifier *notifier.Notifier } @@ -44,7 +46,7 @@ func NewOutputTypingEventConsumer( store storage.Database, eduCache *cache.EDUCache, notifier *notifier.Notifier, - stream types.StreamProvider, + streams *streams.Streams, ) *OutputTypingEventConsumer { consumer := internal.ContinualConsumer{ @@ -58,7 +60,7 @@ func NewOutputTypingEventConsumer( typingConsumer: &consumer, eduCache: eduCache, notifier: notifier, - stream: stream, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -101,8 +103,8 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error ) } - if s.stream.Advance(typingPos) { - s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + if s.streams.TypingStreamProvider.Advance(typingPos) { + s.notifier.OnNewTyping(output.Event.RoomID, s.streams.Latest(context.Background())) } return nil diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index f13a63397..e72d3af2f 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -25,6 +25,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -35,7 +36,7 @@ type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer db storage.Database notifier *notifier.Notifier - stream types.PartitionedStreamProvider + streams *streams.Streams serverName gomatrixserverlib.ServerName // our server name rsAPI roomserverAPI.RoomserverInternalAPI keyAPI api.KeyInternalAPI @@ -53,7 +54,7 @@ func NewOutputKeyChangeEventConsumer( rsAPI roomserverAPI.RoomserverInternalAPI, store storage.Database, notifier *notifier.Notifier, - stream types.PartitionedStreamProvider, + streams *streams.Streams, ) *OutputKeyChangeEventConsumer { consumer := internal.ContinualConsumer{ @@ -72,7 +73,7 @@ func NewOutputKeyChangeEventConsumer( partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, notifier: notifier, - stream: stream, + streams: streams, } consumer.ProcessMessage = s.onMessage @@ -122,9 +123,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er Partition: msg.Partition, } - if s.stream.Advance(posUpdate) { + if s.streams.DeviceListStreamProvider.Advance(posUpdate) { for userID := range queryRes.UserIDsToCount { - s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) + s.notifier.OnNewKeyChange(s.streams.Latest(context.Background()), userID, output.UserID) } } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index a546f3a84..1b254d3e9 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -32,13 +33,12 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { - cfg *config.SyncAPI - rsAPI api.RoomserverInternalAPI - rsConsumer *internal.ContinualConsumer - db storage.Database - pduStream types.StreamProvider - inviteStream types.StreamProvider - notifier *notifier.Notifier + cfg *config.SyncAPI + rsAPI api.RoomserverInternalAPI + rsConsumer *internal.ContinualConsumer + db storage.Database + streams *streams.Streams + notifier *notifier.Notifier } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -47,7 +47,7 @@ func NewOutputRoomEventConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - pduStream types.StreamProvider, + streams *streams.Streams, inviteStream types.StreamProvider, rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { @@ -59,13 +59,12 @@ func NewOutputRoomEventConsumer( PartitionStore: store, } s := &OutputRoomEventConsumer{ - cfg: cfg, - rsConsumer: &consumer, - db: store, - notifier: notifier, - pduStream: pduStream, - inviteStream: inviteStream, - rsAPI: rsAPI, + cfg: cfg, + rsConsumer: &consumer, + db: store, + notifier: notifier, + streams: streams, + rsAPI: rsAPI, } consumer.ProcessMessage = s.onMessage @@ -186,8 +185,8 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } - if s.pduStream.Advance(pduPos) { - s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) + if s.streams.PDUStreamProvider.Advance(pduPos) { + s.notifier.OnNewEvent(ev, ev.RoomID(), nil, s.streams.Latest(ctx)) } return nil @@ -227,8 +226,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return err } - if s.pduStream.Advance(pduPos) { - s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) + if s.streams.PDUStreamProvider.Advance(pduPos) { + s.notifier.OnNewEvent(ev, ev.RoomID(), nil, s.streams.Latest(ctx)) } return nil @@ -285,8 +284,8 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( return nil } - if s.inviteStream.Advance(pduPos) { - s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) + if s.streams.InviteStreamProvider.Advance(pduPos) { + s.notifier.OnNewInvite(s.streams.Latest(ctx), *msg.Event.StateKey()) } return nil @@ -306,8 +305,8 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } // Notify any active sync requests that the invite has been retired. - if s.inviteStream.Advance(pduPos) { - s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) + if s.streams.InviteStreamProvider.Advance(pduPos) { + s.notifier.OnNewInvite(s.streams.Latest(ctx), msg.TargetUserID) } return nil @@ -328,8 +327,8 @@ func (s *OutputRoomEventConsumer) onNewPeek( // tell the notifier about the new peek so it knows to wake up new devices // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this - if s.pduStream.Advance(sp) { - s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) + if s.streams.PDUStreamProvider.Advance(sp) { + s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, s.streams.Latest(ctx)) } return nil @@ -350,8 +349,8 @@ func (s *OutputRoomEventConsumer) onRetirePeek( // tell the notifier about the new peek so it knows to wake up new devices // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this - if s.pduStream.Advance(sp) { - s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) + if s.streams.PDUStreamProvider.Advance(sp) { + s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, s.streams.Latest(ctx)) } return nil diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 4a09940d9..c82fe53ad 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -64,14 +64,14 @@ func AddPublicRoutes( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, + consumer, keyAPI, rsAPI, syncDB, notifier, streams, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer") } roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, + cfg, consumer, syncDB, notifier, streams, streams.InviteStreamProvider, rsAPI, ) if err = roomConsumer.Start(); err != nil { @@ -79,28 +79,28 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider, + cfg, consumer, syncDB, notifier, streams, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider, + cfg, consumer, syncDB, eduCache, notifier, streams, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider, + cfg, consumer, syncDB, notifier, streams, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider, + cfg, consumer, syncDB, notifier, streams, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer")