Only initiate wakeups if the stream advanced

This commit is contained in:
Neil Alexander 2021-01-12 15:06:46 +00:00
parent e8695f1ee6
commit d5ec531ddb
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 49 additions and 32 deletions

View file

@ -95,8 +95,9 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
if s.stream.Advance(streamPos) {
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
}
return nil
}

View file

@ -90,8 +90,9 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err
}
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
if s.stream.Advance(streamPos) {
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
}
return nil
}

View file

@ -105,12 +105,13 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
s.stream.Advance(streamPos)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
types.StreamingToken{SendToDevicePosition: streamPos},
)
if s.stream.Advance(streamPos) {
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
types.StreamingToken{SendToDevicePosition: streamPos},
)
}
return nil
}

View file

@ -101,8 +101,9 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
)
}
s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
if s.stream.Advance(typingPos) {
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
}
return nil
}

View file

@ -122,9 +122,10 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
Partition: msg.Partition,
}
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
if s.stream.Advance(posUpdate) {
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
}
return nil

View file

@ -186,8 +186,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
if s.pduStream.Advance(pduPos) {
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
}
return nil
}
@ -226,8 +227,9 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
if s.pduStream.Advance(pduPos) {
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
}
return nil
}
@ -283,8 +285,9 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
return nil
}
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
if s.inviteStream.Advance(pduPos) {
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
}
return nil
}
@ -303,8 +306,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}
// Notify any active sync requests that the invite has been retired.
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
if s.inviteStream.Advance(pduPos) {
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
}
return nil
}
@ -324,8 +328,9 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
if s.pduStream.Advance(sp) {
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
}
return nil
}
@ -345,8 +350,9 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
if s.pduStream.Advance(sp) {
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
}
return nil
}

View file

@ -19,13 +19,16 @@ func (p *PartitionedStreamProvider) Setup() {
func (p *PartitionedStreamProvider) Advance(
latest types.LogPosition,
) {
) bool {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
if latest.IsAfter(&p.latest) {
p.latest = latest
return true
}
return false
}
func (p *PartitionedStreamProvider) LatestPosition(

View file

@ -19,13 +19,16 @@ func (p *StreamProvider) Setup() {
func (p *StreamProvider) Advance(
latest types.StreamPosition,
) {
) bool {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
if latest > p.latest {
p.latest = latest
return true
}
return false
}
func (p *StreamProvider) LatestPosition(

View file

@ -28,8 +28,8 @@ type StreamProvider interface {
Setup()
// Advance will update the latest position of the stream based on
// an update and will wake callers waiting on StreamNotifyAfter.
Advance(latest StreamPosition)
// an update. It returns true if the position advanced or false otherwise.
Advance(latest StreamPosition) bool
// CompleteSync will update the response to include all updates as needed
// for a complete sync. It will always return immediately.
@ -46,7 +46,7 @@ type StreamProvider interface {
type PartitionedStreamProvider interface {
Setup()
Advance(latest LogPosition)
Advance(latest LogPosition) bool
CompleteSync(ctx context.Context, req *SyncRequest) LogPosition
IncrementalSync(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition
LatestPosition(ctx context.Context) LogPosition