From b35ca2d35c328cd015b401eafbc937ca6184ac50 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 30 Jul 2020 15:05:29 +0100 Subject: [PATCH] Recheck device lists when join/leave events come in --- syncapi/consumers/roomserver.go | 23 +++++++++++++++++++++++ syncapi/syncapi.go | 18 +++++++++--------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index da4a5366c..f8cdcd5c1 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -35,6 +35,7 @@ type OutputRoomEventConsumer struct { rsConsumer *internal.ContinualConsumer db storage.Database notifier *sync.Notifier + keyChanges *OutputKeyChangeEventConsumer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -44,6 +45,7 @@ func NewOutputRoomEventConsumer( n *sync.Notifier, store storage.Database, rsAPI api.RoomserverInternalAPI, + keyChanges *OutputKeyChangeEventConsumer, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ @@ -56,6 +58,7 @@ func NewOutputRoomEventConsumer( db: store, notifier: n, rsAPI: rsAPI, + keyChanges: keyChanges, } consumer.ProcessMessage = s.onMessage @@ -160,9 +163,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) + s.notifyKeyChanges(&ev) + return nil } +func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) { + if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil { + return + } + membership, err := ev.Membership() + if err != nil { + return + } + switch membership { + case gomatrixserverlib.Join: + s.keyChanges.OnJoinEvent(ev) + case gomatrixserverlib.Ban: + fallthrough + case gomatrixserverlib.Leave: + s.keyChanges.OnLeaveEvent(ev) + } +} + func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, ) error { diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 754cd5026..5198d59b1 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -64,8 +64,16 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( + cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), + consumer, notifier, keyAPI, currentStateAPI, syncDB, + ) + if err = keyChangeConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start key change consumer") + } + roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, notifier, syncDB, rsAPI, + cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") @@ -92,13 +100,5 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start send-to-device consumer") } - keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), - consumer, notifier, keyAPI, currentStateAPI, syncDB, - ) - if err = keyChangeConsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start key change consumer") - } - routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) }