diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 4958f2216..630791d09 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -95,8 +95,9 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.stream.Advance(streamPos) - s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) + if s.stream.Advance(streamPos) { + s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) + } return nil } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index bd538eff2..65215f3fe 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -90,8 +90,9 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro return err } - s.stream.Advance(streamPos) - s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) + if s.stream.Advance(streamPos) { + s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) + } return nil } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 6e774b5b4..98a58ce66 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -105,12 +105,13 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - s.stream.Advance(streamPos) - s.notifier.OnNewSendToDevice( - output.UserID, - []string{output.DeviceID}, - types.StreamingToken{SendToDevicePosition: streamPos}, - ) + if s.stream.Advance(streamPos) { + s.notifier.OnNewSendToDevice( + output.UserID, + []string{output.DeviceID}, + types.StreamingToken{SendToDevicePosition: streamPos}, + ) + } return nil } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 3edf6675d..87fc018d3 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -101,8 +101,9 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error ) } - s.stream.Advance(typingPos) - s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + if s.stream.Advance(typingPos) { + s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + } return nil } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index af7b280fa..f13a63397 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -122,9 +122,10 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er Partition: msg.Partition, } - s.stream.Advance(posUpdate) - for userID := range queryRes.UserIDsToCount { - s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) + if s.stream.Advance(posUpdate) { + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) + } } return nil diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 1d47b73a6..a546f3a84 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -186,8 +186,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } - s.pduStream.Advance(pduPos) - s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) + if s.pduStream.Advance(pduPos) { + s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) + } return nil } @@ -226,8 +227,9 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return err } - s.pduStream.Advance(pduPos) - s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) + if s.pduStream.Advance(pduPos) { + s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) + } return nil } @@ -283,8 +285,9 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( return nil } - s.inviteStream.Advance(pduPos) - s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) + if s.inviteStream.Advance(pduPos) { + s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) + } return nil } @@ -303,8 +306,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } // Notify any active sync requests that the invite has been retired. - s.inviteStream.Advance(pduPos) - s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) + if s.inviteStream.Advance(pduPos) { + s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) + } return nil } @@ -324,8 +328,9 @@ func (s *OutputRoomEventConsumer) onNewPeek( // tell the notifier about the new peek so it knows to wake up new devices // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this - s.pduStream.Advance(sp) - s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) + if s.pduStream.Advance(sp) { + s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) + } return nil } @@ -345,8 +350,9 @@ func (s *OutputRoomEventConsumer) onRetirePeek( // tell the notifier about the new peek so it knows to wake up new devices // TODO: This only works because the peeks table is reusing the same // index as PDUs, but we should fix this - s.pduStream.Advance(sp) - s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) + if s.pduStream.Advance(sp) { + s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) + } return nil } diff --git a/syncapi/streams/template_pstream.go b/syncapi/streams/template_pstream.go index 265e22a20..eeb4245a8 100644 --- a/syncapi/streams/template_pstream.go +++ b/syncapi/streams/template_pstream.go @@ -19,13 +19,16 @@ func (p *PartitionedStreamProvider) Setup() { func (p *PartitionedStreamProvider) Advance( latest types.LogPosition, -) { +) bool { p.latestMutex.Lock() defer p.latestMutex.Unlock() if latest.IsAfter(&p.latest) { p.latest = latest + return true } + + return false } func (p *PartitionedStreamProvider) LatestPosition( diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go index 15074cc10..36fc880cf 100644 --- a/syncapi/streams/template_stream.go +++ b/syncapi/streams/template_stream.go @@ -19,13 +19,16 @@ func (p *StreamProvider) Setup() { func (p *StreamProvider) Advance( latest types.StreamPosition, -) { +) bool { p.latestMutex.Lock() defer p.latestMutex.Unlock() if latest > p.latest { p.latest = latest + return true } + + return false } func (p *StreamProvider) LatestPosition( diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 24b453a80..cd1cc0003 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -28,8 +28,8 @@ type StreamProvider interface { Setup() // Advance will update the latest position of the stream based on - // an update and will wake callers waiting on StreamNotifyAfter. - Advance(latest StreamPosition) + // an update. It returns true if the position advanced or false otherwise. + Advance(latest StreamPosition) bool // CompleteSync will update the response to include all updates as needed // for a complete sync. It will always return immediately. @@ -46,7 +46,7 @@ type StreamProvider interface { type PartitionedStreamProvider interface { Setup() - Advance(latest LogPosition) + Advance(latest LogPosition) bool CompleteSync(ctx context.Context, req *SyncRequest) LogPosition IncrementalSync(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition LatestPosition(ctx context.Context) LogPosition