Pass through all updated stream positions to notifier, since multiple may have changed

This commit is contained in:
Neil Alexander 2021-01-12 15:15:18 +00:00
parent d5ec531ddb
commit 883f3b5ee7
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
7 changed files with 63 additions and 61 deletions

View file

@ -24,7 +24,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/streams"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -32,7 +32,7 @@ import (
type OutputClientDataConsumer struct { type OutputClientDataConsumer struct {
clientAPIConsumer *internal.ContinualConsumer clientAPIConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
stream types.StreamProvider streams *streams.Streams
notifier *notifier.Notifier notifier *notifier.Notifier
} }
@ -42,7 +42,7 @@ func NewOutputClientDataConsumer(
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, streams *streams.Streams,
) *OutputClientDataConsumer { ) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
@ -55,7 +55,7 @@ func NewOutputClientDataConsumer(
clientAPIConsumer: &consumer, clientAPIConsumer: &consumer,
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, streams: streams,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -95,8 +95,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data") }).Panicf("could not save account data")
} }
if s.stream.Advance(streamPos) { if s.streams.AccountDataStreamProvider.Advance(streamPos) {
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) s.notifier.OnNewAccountData(string(msg.Key), s.streams.Latest(context.Background()))
} }
return nil return nil

View file

@ -24,7 +24,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/streams"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -32,7 +32,7 @@ import (
type OutputReceiptEventConsumer struct { type OutputReceiptEventConsumer struct {
receiptConsumer *internal.ContinualConsumer receiptConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
stream types.StreamProvider streams *streams.Streams
notifier *notifier.Notifier notifier *notifier.Notifier
} }
@ -43,7 +43,7 @@ func NewOutputReceiptEventConsumer(
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, streams *streams.Streams,
) *OutputReceiptEventConsumer { ) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
@ -57,7 +57,7 @@ func NewOutputReceiptEventConsumer(
receiptConsumer: &consumer, receiptConsumer: &consumer,
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, streams: streams,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -90,8 +90,8 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err return err
} }
if s.stream.Advance(streamPos) { if s.streams.ReceiptStreamProvider.Advance(streamPos) {
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) s.notifier.OnNewReceipt(output.RoomID, s.streams.Latest(context.Background()))
} }
return nil return nil

View file

@ -24,7 +24,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -35,7 +35,7 @@ type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
serverName gomatrixserverlib.ServerName // our server name serverName gomatrixserverlib.ServerName // our server name
stream types.StreamProvider streams *streams.Streams
notifier *notifier.Notifier notifier *notifier.Notifier
} }
@ -46,7 +46,7 @@ func NewOutputSendToDeviceEventConsumer(
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, streams *streams.Streams,
) *OutputSendToDeviceEventConsumer { ) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
@ -61,7 +61,7 @@ func NewOutputSendToDeviceEventConsumer(
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,
notifier: notifier, notifier: notifier,
stream: stream, streams: streams,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -105,11 +105,11 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err return err
} }
if s.stream.Advance(streamPos) { if s.streams.SendToDeviceStreamProvider.Advance(streamPos) {
s.notifier.OnNewSendToDevice( s.notifier.OnNewSendToDevice(
output.UserID, output.UserID,
[]string{output.DeviceID}, []string{output.DeviceID},
types.StreamingToken{SendToDevicePosition: streamPos}, s.streams.Latest(context.Background()),
) )
} }

View file

@ -15,6 +15,7 @@
package consumers package consumers
import ( import (
"context"
"encoding/json" "encoding/json"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
@ -24,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -32,7 +34,7 @@ import (
type OutputTypingEventConsumer struct { type OutputTypingEventConsumer struct {
typingConsumer *internal.ContinualConsumer typingConsumer *internal.ContinualConsumer
eduCache *cache.EDUCache eduCache *cache.EDUCache
stream types.StreamProvider streams *streams.Streams
notifier *notifier.Notifier notifier *notifier.Notifier
} }
@ -44,7 +46,7 @@ func NewOutputTypingEventConsumer(
store storage.Database, store storage.Database,
eduCache *cache.EDUCache, eduCache *cache.EDUCache,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, streams *streams.Streams,
) *OutputTypingEventConsumer { ) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
@ -58,7 +60,7 @@ func NewOutputTypingEventConsumer(
typingConsumer: &consumer, typingConsumer: &consumer,
eduCache: eduCache, eduCache: eduCache,
notifier: notifier, notifier: notifier,
stream: stream, streams: streams,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -101,8 +103,8 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
) )
} }
if s.stream.Advance(typingPos) { if s.streams.TypingStreamProvider.Advance(typingPos) {
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) s.notifier.OnNewTyping(output.Event.RoomID, s.streams.Latest(context.Background()))
} }
return nil return nil

View file

@ -25,6 +25,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -35,7 +36,7 @@ type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer keyChangeConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
notifier *notifier.Notifier notifier *notifier.Notifier
stream types.PartitionedStreamProvider streams *streams.Streams
serverName gomatrixserverlib.ServerName // our server name serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI keyAPI api.KeyInternalAPI
@ -53,7 +54,7 @@ func NewOutputKeyChangeEventConsumer(
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.PartitionedStreamProvider, streams *streams.Streams,
) *OutputKeyChangeEventConsumer { ) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
@ -72,7 +73,7 @@ func NewOutputKeyChangeEventConsumer(
partitionToOffset: make(map[int32]int64), partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{}, partitionToOffsetMu: sync.Mutex{},
notifier: notifier, notifier: notifier,
stream: stream, streams: streams,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -122,9 +123,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
Partition: msg.Partition, Partition: msg.Partition,
} }
if s.stream.Advance(posUpdate) { if s.streams.DeviceListStreamProvider.Advance(posUpdate) {
for userID := range queryRes.UserIDsToCount { for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) s.notifier.OnNewKeyChange(s.streams.Latest(context.Background()), userID, output.UserID)
} }
} }

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -32,13 +33,12 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
cfg *config.SyncAPI cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer rsConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
pduStream types.StreamProvider streams *streams.Streams
inviteStream types.StreamProvider notifier *notifier.Notifier
notifier *notifier.Notifier
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -47,7 +47,7 @@ func NewOutputRoomEventConsumer(
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
pduStream types.StreamProvider, streams *streams.Streams,
inviteStream types.StreamProvider, inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
@ -59,13 +59,12 @@ func NewOutputRoomEventConsumer(
PartitionStore: store, PartitionStore: store,
} }
s := &OutputRoomEventConsumer{ s := &OutputRoomEventConsumer{
cfg: cfg, cfg: cfg,
rsConsumer: &consumer, rsConsumer: &consumer,
db: store, db: store,
notifier: notifier, notifier: notifier,
pduStream: pduStream, streams: streams,
inviteStream: inviteStream, rsAPI: rsAPI,
rsAPI: rsAPI,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -186,8 +185,8 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err return err
} }
if s.pduStream.Advance(pduPos) { if s.streams.PDUStreamProvider.Advance(pduPos) {
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, s.streams.Latest(ctx))
} }
return nil return nil
@ -227,8 +226,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err return err
} }
if s.pduStream.Advance(pduPos) { if s.streams.PDUStreamProvider.Advance(pduPos) {
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, s.streams.Latest(ctx))
} }
return nil return nil
@ -285,8 +284,8 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
return nil return nil
} }
if s.inviteStream.Advance(pduPos) { if s.streams.InviteStreamProvider.Advance(pduPos) {
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) s.notifier.OnNewInvite(s.streams.Latest(ctx), *msg.Event.StateKey())
} }
return nil return nil
@ -306,8 +305,8 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
} }
// Notify any active sync requests that the invite has been retired. // Notify any active sync requests that the invite has been retired.
if s.inviteStream.Advance(pduPos) { if s.streams.InviteStreamProvider.Advance(pduPos) {
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) s.notifier.OnNewInvite(s.streams.Latest(ctx), msg.TargetUserID)
} }
return nil return nil
@ -328,8 +327,8 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// tell the notifier about the new peek so it knows to wake up new devices // tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same // TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this // index as PDUs, but we should fix this
if s.pduStream.Advance(sp) { if s.streams.PDUStreamProvider.Advance(sp) {
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, s.streams.Latest(ctx))
} }
return nil return nil
@ -350,8 +349,8 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// tell the notifier about the new peek so it knows to wake up new devices // tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same // TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this // index as PDUs, but we should fix this
if s.pduStream.Advance(sp) { if s.streams.PDUStreamProvider.Advance(sp) {
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, s.streams.Latest(ctx))
} }
return nil return nil

View file

@ -64,14 +64,14 @@ func AddPublicRoutes(
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, consumer, keyAPI, rsAPI, syncDB, notifier, streams,
) )
if err = keyChangeConsumer.Start(); err != nil { if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer") logrus.WithError(err).Panicf("failed to start key change consumer")
} }
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, cfg, consumer, syncDB, notifier, streams,
streams.InviteStreamProvider, rsAPI, streams.InviteStreamProvider, rsAPI,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
@ -79,28 +79,28 @@ func AddPublicRoutes(
} }
clientConsumer := consumers.NewOutputClientDataConsumer( clientConsumer := consumers.NewOutputClientDataConsumer(
cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider, cfg, consumer, syncDB, notifier, streams,
) )
if err = clientConsumer.Start(); err != nil { if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer") logrus.WithError(err).Panicf("failed to start client data consumer")
} }
typingConsumer := consumers.NewOutputTypingEventConsumer( typingConsumer := consumers.NewOutputTypingEventConsumer(
cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider, cfg, consumer, syncDB, eduCache, notifier, streams,
) )
if err = typingConsumer.Start(); err != nil { if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer") logrus.WithError(err).Panicf("failed to start typing consumer")
} }
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider, cfg, consumer, syncDB, notifier, streams,
) )
if err = sendToDeviceConsumer.Start(); err != nil { if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer") logrus.WithError(err).Panicf("failed to start send-to-device consumer")
} }
receiptConsumer := consumers.NewOutputReceiptEventConsumer( receiptConsumer := consumers.NewOutputReceiptEventConsumer(
cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider, cfg, consumer, syncDB, notifier, streams,
) )
if err = receiptConsumer.Start(); err != nil { if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer") logrus.WithError(err).Panicf("failed to start receipts consumer")