From acb5728cfa66da4eea4abcceafc766af9afd6a58 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 7 Aug 2020 16:57:04 +0100 Subject: [PATCH] Bugfixes --- keyserver/internal/device_list_update_test.go | 10 ++++++++++ syncapi/internal/keychange.go | 15 +++++++++------ syncapi/types/types.go | 4 +++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go index 266a78f97..50e427638 100644 --- a/keyserver/internal/device_list_update_test.go +++ b/keyserver/internal/device_list_update_test.go @@ -204,6 +204,16 @@ func TestUpdateNoPrevID(t *testing.T) { if err != nil { t.Fatalf("Update returned an error: %s", err) } + // At this point we show have this device list marked as stale and not store the keys or emitted anything + if !db.staleUsers[event.UserID] { + t.Errorf("%s not marked as stale", event.UserID) + } + if len(producer.events) > 0 { + t.Errorf("Update incorrect emitted %d device change events", len(producer.events)) + } + if len(db.storedKeys) > 0 { + t.Errorf("Update incorrect stored %d device change events", len(db.storedKeys)) + } t.Log("waiting for /users/devices to be called...") wg.Wait() // wait a bit for db to be updated... diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index b12c13f3f..e0379aafb 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -46,6 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // be already filled in with join/leave information. +// nolint:gocyclo func DeviceListCatchup( ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, res *types.Response, from, to types.StreamingToken, @@ -68,22 +69,20 @@ func DeviceListCatchup( var partition int32 var offset int64 + partition = -1 + offset = sarama.OffsetOldest // Extract partition/offset from sync token // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. logOffset := from.Log(DeviceListLogName) if logOffset != nil { partition = logOffset.Partition offset = logOffset.Offset - } else { - partition = -1 - offset = sarama.OffsetOldest } var toOffset int64 + toOffset = sarama.OffsetNewest toLog := to.Log(DeviceListLogName) - if toLog != nil { + if toLog != nil && toLog.Offset > 0 { toOffset = toLog.Offset - } else { - toOffset = sarama.OffsetNewest } var queryRes api.QueryKeyChangesResponse keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ @@ -96,6 +95,10 @@ func DeviceListCatchup( util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") return hasNew, nil } + util.GetLogger(ctx).Debugf( + "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v", + partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs, + ) userSet := make(map[string]bool) for _, userID := range res.DeviceLists.Changed { userSet[userID] = true diff --git a/syncapi/types/types.go b/syncapi/types/types.go index a3299c7fa..f3324800f 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -176,12 +176,14 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) } ret.Positions[i] = other.Positions[i] } + ret.logs = make(map[string]*LogPosition) for name := range t.logs { otherLog := other.Log(name) if otherLog == nil { continue } - t.logs[name] = otherLog + copy := *otherLog + ret.logs[name] = © } return ret }