This commit is contained in:
Kegan Dougal 2020-08-07 16:57:04 +01:00
parent cd720d3ba4
commit acb5728cfa
3 changed files with 22 additions and 7 deletions

View file

@ -204,6 +204,16 @@ func TestUpdateNoPrevID(t *testing.T) {
if err != nil {
t.Fatalf("Update returned an error: %s", err)
}
// At this point we show have this device list marked as stale and not store the keys or emitted anything
if !db.staleUsers[event.UserID] {
t.Errorf("%s not marked as stale", event.UserID)
}
if len(producer.events) > 0 {
t.Errorf("Update incorrect emitted %d device change events", len(producer.events))
}
if len(db.storedKeys) > 0 {
t.Errorf("Update incorrect stored %d device change events", len(db.storedKeys))
}
t.Log("waiting for /users/devices to be called...")
wg.Wait()
// wait a bit for db to be updated...

View file

@ -46,6 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information.
// nolint:gocyclo
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken,
@ -68,22 +69,20 @@ func DeviceListCatchup(
var partition int32
var offset int64
partition = -1
offset = sarama.OffsetOldest
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := from.Log(DeviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
} else {
partition = -1
offset = sarama.OffsetOldest
}
var toOffset int64
toOffset = sarama.OffsetNewest
toLog := to.Log(DeviceListLogName)
if toLog != nil {
if toLog != nil && toLog.Offset > 0 {
toOffset = toLog.Offset
} else {
toOffset = sarama.OffsetNewest
}
var queryRes api.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
@ -96,6 +95,10 @@ func DeviceListCatchup(
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return hasNew, nil
}
util.GetLogger(ctx).Debugf(
"QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v",
partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs,
)
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true

View file

@ -176,12 +176,14 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
}
ret.Positions[i] = other.Positions[i]
}
ret.logs = make(map[string]*LogPosition)
for name := range t.logs {
otherLog := other.Log(name)
if otherLog == nil {
continue
}
t.logs[name] = otherLog
copy := *otherLog
ret.logs[name] = &copy
}
return ret
}