Recheck device lists when join/leave events come in

This commit is contained in:
Kegan Dougal 2020-07-30 15:05:29 +01:00
parent a2174d3294
commit b35ca2d35c
2 changed files with 32 additions and 9 deletions

View file

@ -35,6 +35,7 @@ type OutputRoomEventConsumer struct {
rsConsumer *internal.ContinualConsumer rsConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
notifier *sync.Notifier notifier *sync.Notifier
keyChanges *OutputKeyChangeEventConsumer
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -44,6 +45,7 @@ func NewOutputRoomEventConsumer(
n *sync.Notifier, n *sync.Notifier,
store storage.Database, store storage.Database,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
keyChanges *OutputKeyChangeEventConsumer,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
@ -56,6 +58,7 @@ func NewOutputRoomEventConsumer(
db: store, db: store,
notifier: n, notifier: n,
rsAPI: rsAPI, rsAPI: rsAPI,
keyChanges: keyChanges,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -160,9 +163,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
} }
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifyKeyChanges(&ev)
return nil return nil
} }
func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) {
if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil {
return
}
membership, err := ev.Membership()
if err != nil {
return
}
switch membership {
case gomatrixserverlib.Join:
s.keyChanges.OnJoinEvent(ev)
case gomatrixserverlib.Ban:
fallthrough
case gomatrixserverlib.Leave:
s.keyChanges.OnLeaveEvent(ev)
}
}
func (s *OutputRoomEventConsumer) onNewInviteEvent( func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent, ctx context.Context, msg api.OutputNewInviteEvent,
) error { ) error {

View file

@ -64,8 +64,16 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
consumer, notifier, keyAPI, currentStateAPI, syncDB,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI, cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")
@ -92,13 +100,5 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer") logrus.WithError(err).Panicf("failed to start send-to-device consumer")
} }
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
consumer, notifier, keyAPI, currentStateAPI, syncDB,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
} }