From af332f24aef1365a41b6854f591ebcf0dda7c24d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Jan 2021 15:36:53 +0000 Subject: [PATCH] Add some more wakeups --- syncapi/consumers/eduserver_sendtodevice.go | 8 +------- syncapi/consumers/keychange.go | 10 ++++------ syncapi/consumers/roomserver.go | 8 ++++---- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index c9e9732e8..5dc684036 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -97,13 +97,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - //s.notifier.OnNewSendToDevice( - // output.UserID, - // []string{output.DeviceID}, - // types.StreamingToken{SendToDevicePosition: streamPos}, - //) - - _ = streamPos + s.db.SendToDeviceStream().Advance(streamPos) return nil } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index cec89eedb..12a3ef9d9 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -110,14 +110,12 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 - posUpdate := types.StreamingToken{ - DeviceListPosition: types.LogPosition{ - Offset: msg.Offset, - Partition: msg.Partition, - }, + posUpdate := types.LogPosition{ + Offset: msg.Offset, + Partition: msg.Partition, } - _ = posUpdate + s.db.DeviceListStream().Advance(posUpdate) //for userID := range queryRes.UserIDsToCount { // s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index b720f24f8..b4c7e6e52 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -271,8 +271,8 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( return nil } - _ = pduPos - //s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) + s.db.InviteStream().Advance(pduPos) + return nil } @@ -291,8 +291,8 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( // Notify any active sync requests that the invite has been retired. // Invites share the same stream counter as PDUs - _ = pduPos - //s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) + s.db.InviteStream().Advance(pduPos) + return nil }