Fix position update

This commit is contained in:
Neil Alexander 2021-01-08 14:39:49 +00:00
parent 7d3356f563
commit 8fdcd97108
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 4 additions and 4 deletions

View file

@ -85,7 +85,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")
pduPos, err := s.db.UpsertAccountData(
streamPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
@ -96,8 +96,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
s.streams.AccountDataStreamProvider.Advance(pduPos)
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: pduPos})
s.streams.AccountDataStreamProvider.Advance(streamPos)
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
return nil
}

View file

@ -180,7 +180,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos = userStreamListener.GetSyncPosition()
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
}
} else {
syncReq.Log.Debugln("Responding to sync immediately")