Remove notifs about key changes in syncapi (#1496)
The join/leave events themselves will wake up the right people so we needn't do it twice.
This commit is contained in:
parent
8b880be57e
commit
3e12f6e9c2
|
@ -125,31 +125,3 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
|
||||||
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
|
|
||||||
// users keys:
|
|
||||||
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// TODO: f.e changed, wake up stream
|
|
||||||
for _, userID := range changed {
|
|
||||||
log.Infof("OnJoinEvent:Notify %s that %s should have device lists tracked", userID, *ev.StateKey())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
|
||||||
// work out who we are no longer sharing any rooms with and notify them about the leaving user
|
|
||||||
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// TODO: f.e left, wake up stream
|
|
||||||
for _, userID := range left {
|
|
||||||
log.Infof("OnLeaveEvent:Notify %s that %s should no longer track device lists", userID, *ev.StateKey())
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -38,7 +38,6 @@ type OutputRoomEventConsumer struct {
|
||||||
rsConsumer *internal.ContinualConsumer
|
rsConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
keyChanges *OutputKeyChangeEventConsumer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||||
|
@ -48,7 +47,6 @@ func NewOutputRoomEventConsumer(
|
||||||
n *sync.Notifier,
|
n *sync.Notifier,
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
keyChanges *OutputKeyChangeEventConsumer,
|
|
||||||
) *OutputRoomEventConsumer {
|
) *OutputRoomEventConsumer {
|
||||||
|
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
|
@ -63,7 +61,6 @@ func NewOutputRoomEventConsumer(
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
keyChanges: keyChanges,
|
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
@ -182,26 +179,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
|
|
||||||
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
|
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
|
||||||
|
|
||||||
s.notifyKeyChanges(&ev)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) {
|
|
||||||
membership, err := ev.Membership()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
switch membership {
|
|
||||||
case gomatrixserverlib.Join:
|
|
||||||
s.keyChanges.OnJoinEvent(ev)
|
|
||||||
case gomatrixserverlib.Ban:
|
|
||||||
fallthrough
|
|
||||||
case gomatrixserverlib.Leave:
|
|
||||||
s.keyChanges.OnLeaveEvent(ev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
|
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
|
||||||
if ev.Type() != gomatrixserverlib.MRoomMember {
|
if ev.Type() != gomatrixserverlib.MRoomMember {
|
||||||
return sp, nil
|
return sp, nil
|
||||||
|
|
|
@ -71,7 +71,7 @@ func AddPublicRoutes(
|
||||||
}
|
}
|
||||||
|
|
||||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||||
cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer,
|
cfg, consumer, notifier, syncDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err = roomConsumer.Start(); err != nil {
|
if err = roomConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start room server consumer")
|
logrus.WithError(err).Panicf("failed to start room server consumer")
|
||||||
|
|
Loading…
Reference in a new issue