From 8fdcd971084021c8385ebabb48d4860ef769b75b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 8 Jan 2021 14:39:49 +0000 Subject: [PATCH] Fix position update --- syncapi/consumers/clientapi.go | 6 +++--- syncapi/sync/requestpool.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index f8d0d6a3f..b5cc47318 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -85,7 +85,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": output.RoomID, }).Info("received data from client API server") - pduPos, err := s.db.UpsertAccountData( + streamPos, err := s.db.UpsertAccountData( context.TODO(), string(msg.Key), output.RoomID, output.Type, ) if err != nil { @@ -96,8 +96,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.streams.AccountDataStreamProvider.Advance(pduPos) - s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: pduPos}) + s.streams.AccountDataStreamProvider.Advance(streamPos) + s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) return nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index cedd433b1..384fc25ca 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -180,7 +180,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. case <-userStreamListener.GetNotifyChannel(syncReq.Since): syncReq.Log.Debugln("Responding to sync after wake-up") - currentPos = userStreamListener.GetSyncPosition() + currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) } } else { syncReq.Log.Debugln("Responding to sync immediately")