diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 200ac85cc..3fc6120d2 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -125,31 +125,3 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } return nil } - -func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { - // work out who we are now sharing rooms with which we previously were not and notify them about the joining - // users keys: - changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) - if err != nil { - log.WithError(err).Error("OnJoinEvent: failed to work out changed users") - return - } - // TODO: f.e changed, wake up stream - for _, userID := range changed { - log.Infof("OnJoinEvent:Notify %s that %s should have device lists tracked", userID, *ev.StateKey()) - } -} - -func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { - // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) - if err != nil { - log.WithError(err).Error("OnLeaveEvent: failed to work out left users") - return - } - // TODO: f.e left, wake up stream - for _, userID := range left { - log.Infof("OnLeaveEvent:Notify %s that %s should no longer track device lists", userID, *ev.StateKey()) - } - -} diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index d8d0a298a..ca48c8300 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -38,7 +38,6 @@ type OutputRoomEventConsumer struct { rsConsumer *internal.ContinualConsumer db storage.Database notifier *sync.Notifier - keyChanges *OutputKeyChangeEventConsumer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -48,7 +47,6 @@ func NewOutputRoomEventConsumer( n *sync.Notifier, store storage.Database, rsAPI api.RoomserverInternalAPI, - keyChanges *OutputKeyChangeEventConsumer, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ @@ -63,7 +61,6 @@ func NewOutputRoomEventConsumer( db: store, notifier: n, rsAPI: rsAPI, - keyChanges: keyChanges, } consumer.ProcessMessage = s.onMessage @@ -182,26 +179,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) - s.notifyKeyChanges(&ev) - return nil } -func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) { - membership, err := ev.Membership() - if err != nil { - return - } - switch membership { - case gomatrixserverlib.Join: - s.keyChanges.OnJoinEvent(ev) - case gomatrixserverlib.Ban: - fallthrough - case gomatrixserverlib.Leave: - s.keyChanges.OnLeaveEvent(ev) - } -} - func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) { if ev.Type() != gomatrixserverlib.MRoomMember { return sp, nil diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index c77c55412..43e2455b6 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -71,7 +71,7 @@ func AddPublicRoutes( } roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer, + cfg, consumer, notifier, syncDB, rsAPI, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer")