Room consumer parameters

This commit is contained in:
Neil Alexander 2021-01-08 16:12:51 +00:00
parent 0291be9e7f
commit ac4189ae81
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 24 additions and 21 deletions

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -33,12 +32,13 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
cfg *config.SyncAPI cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer rsConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
streams *streams.Streams pduStream types.StreamProvider
notifier *notifier.Notifier inviteStream types.StreamProvider
notifier *notifier.Notifier
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -47,7 +47,8 @@ func NewOutputRoomEventConsumer(
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
streams *streams.Streams, pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
@ -58,12 +59,13 @@ func NewOutputRoomEventConsumer(
PartitionStore: store, PartitionStore: store,
} }
s := &OutputRoomEventConsumer{ s := &OutputRoomEventConsumer{
cfg: cfg, cfg: cfg,
rsConsumer: &consumer, rsConsumer: &consumer,
db: store, db: store,
notifier: notifier, notifier: notifier,
streams: streams, pduStream: pduStream,
rsAPI: rsAPI, inviteStream: inviteStream,
rsAPI: rsAPI,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -184,7 +186,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err return err
} }
s.streams.PDUStreamProvider.Advance(pduPos) s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil return nil
@ -224,7 +226,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err return err
} }
s.streams.PDUStreamProvider.Advance(pduPos) s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil return nil
@ -281,7 +283,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
return nil return nil
} }
s.streams.InviteStreamProvider.Advance(pduPos) s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
return nil return nil
@ -301,7 +303,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
} }
// Notify any active sync requests that the invite has been retired. // Notify any active sync requests that the invite has been retired.
s.streams.InviteStreamProvider.Advance(pduPos) s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
return nil return nil
@ -322,7 +324,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// tell the notifier about the new peek so it knows to wake up new devices // tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same // TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this // index as PDUs, but we should fix this
s.streams.PDUStreamProvider.Advance(sp) s.pduStream.Advance(sp)
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil return nil
@ -343,7 +345,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// tell the notifier about the new peek so it knows to wake up new devices // tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same // TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this // index as PDUs, but we should fix this
s.streams.PDUStreamProvider.Advance(sp) s.pduStream.Advance(sp)
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil return nil

View file

@ -71,7 +71,8 @@ func AddPublicRoutes(
} }
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, syncDB, notifier, streams, rsAPI, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")