diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index f8d0d6a3f..b5cc47318 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -85,7 +85,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": output.RoomID, }).Info("received data from client API server") - pduPos, err := s.db.UpsertAccountData( + streamPos, err := s.db.UpsertAccountData( context.TODO(), string(msg.Key), output.RoomID, output.Type, ) if err != nil { @@ -96,8 +96,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.streams.AccountDataStreamProvider.Advance(pduPos) - s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: pduPos}) + s.streams.AccountDataStreamProvider.Advance(streamPos) + s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) return nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index cedd433b1..384fc25ca 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -180,7 +180,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. case <-userStreamListener.GetNotifyChannel(syncReq.Since): syncReq.Log.Debugln("Responding to sync after wake-up") - currentPos = userStreamListener.GetSyncPosition() + currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) } } else { syncReq.Log.Debugln("Responding to sync immediately")