diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 865fb5b57..1d47b73a6 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -33,12 +32,13 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { - cfg *config.SyncAPI - rsAPI api.RoomserverInternalAPI - rsConsumer *internal.ContinualConsumer - db storage.Database - streams *streams.Streams - notifier *notifier.Notifier + cfg *config.SyncAPI + rsAPI api.RoomserverInternalAPI + rsConsumer *internal.ContinualConsumer + db storage.Database + pduStream types.StreamProvider + inviteStream types.StreamProvider + notifier *notifier.Notifier } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -47,7 +47,8 @@ func NewOutputRoomEventConsumer( kafkaConsumer sarama.Consumer, store storage.Database, notifier *notifier.Notifier, - streams *streams.Streams, + pduStream types.StreamProvider, + inviteStream types.StreamProvider, rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { @@ -58,12 +59,13 @@ func NewOutputRoomEventConsumer( PartitionStore: store, } s := &OutputRoomEventConsumer{ - cfg: cfg, - rsConsumer: &consumer, - db: store, - notifier: notifier, - streams: streams, - rsAPI: rsAPI, + cfg: cfg, + rsConsumer: &consumer, + db: store, + notifier: notifier, + pduStream: pduStream, + inviteStream: inviteStream, + rsAPI: rsAPI, } consumer.ProcessMessage = s.onMessage @@ -184,7 +186,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } - s.streams.PDUStreamProvider.Advance(pduPos) + s.pduStream.Advance(pduPos) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) return nil @@ -224,7 +226,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return err } - s.streams.PDUStreamProvider.Advance(pduPos) + s.pduStream.Advance(pduPos) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) return nil @@ -281,7 +283,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( return nil } - s.streams.InviteStreamProvider.Advance(pduPos) + s.inviteStream.Advance(pduPos) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) return nil @@ -301,7 +303,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } // Notify any active sync requests that the invite has been retired. - s.streams.InviteStreamProvider.Advance(pduPos) + s.inviteStream.Advance(pduPos) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) return nil @@ -322,7 +324,7 @@ func (s *OutputRoomEventConsumer) onNewPeek( // tell the notifier about the new peek so it knows to wake up new devices // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this - s.streams.PDUStreamProvider.Advance(sp) + s.pduStream.Advance(sp) s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) return nil @@ -343,7 +345,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek( // tell the notifier about the new peek so it knows to wake up new devices // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this - s.streams.PDUStreamProvider.Advance(sp) + s.pduStream.Advance(sp) s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) return nil diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index a52cd202e..4a09940d9 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -71,7 +71,8 @@ func AddPublicRoutes( } roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, syncDB, notifier, streams, rsAPI, + cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, + streams.InviteStreamProvider, rsAPI, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer")