From ff22d71de67bf2ffedd64915a36c12c6cc926826 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 7 Jan 2021 14:57:56 +0000 Subject: [PATCH] Device list updates --- syncapi/internal/keychange.go | 21 ++- syncapi/internal/keychange_test.go | 24 ++-- syncapi/streams/stream_devicelist.go | 16 ++- syncapi/streams/stream_typing.go | 3 - syncapi/streams/streams.go | 5 + syncapi/sync/requestpool.go | 192 +++------------------------ syncapi/syncapi.go | 2 +- 7 files changed, 57 insertions(+), 206 deletions(-) diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 3f901f498..e980437e1 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -49,8 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // nolint:gocyclo func DeviceListCatchup( ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, - userID string, res *types.Response, from, to types.StreamingToken, -) (hasNew bool, err error) { + userID string, res *types.Response, from, to types.LogPosition, +) (newPos types.LogPosition, hasNew bool, err error) { // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. newlyJoinedRooms := joinedRooms(res, userID) @@ -58,7 +58,7 @@ func DeviceListCatchup( if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms) if err != nil { - return false, err + return to, false, err } res.DeviceLists.Changed = changed res.DeviceLists.Left = left @@ -73,13 +73,13 @@ func DeviceListCatchup( offset = sarama.OffsetOldest // Extract partition/offset from sync token // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. - if !from.DeviceListPosition.IsEmpty() { - partition = from.DeviceListPosition.Partition - offset = from.DeviceListPosition.Offset + if !from.IsEmpty() { + partition = from.Partition + offset = from.Offset } var toOffset int64 toOffset = sarama.OffsetNewest - if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 { + if toLog := to; toLog.Partition == partition && toLog.Offset > 0 { toOffset = toLog.Offset } var queryRes api.QueryKeyChangesResponse @@ -91,7 +91,7 @@ func DeviceListCatchup( if queryRes.Error != nil { // don't fail the catchup because we may have got useful information by tracking membership util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") - return hasNew, nil + return to, hasNew, nil } // QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user. var sharedUsersMap map[string]int @@ -128,13 +128,12 @@ func DeviceListCatchup( } } // set the new token - to.DeviceListPosition = types.LogPosition{ + to = types.LogPosition{ Partition: queryRes.Partition, Offset: queryRes.Offset, } - res.NextBatch.ApplyUpdates(to) - return hasNew, nil + return to, hasNew, nil } // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 9eaeda751..44c4a4dd3 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -16,12 +16,10 @@ import ( var ( syncingUser = "@alice:localhost" - emptyToken = types.StreamingToken{} - newestToken = types.StreamingToken{ - DeviceListPosition: types.LogPosition{ - Offset: sarama.OffsetNewest, - Partition: 0, - }, + emptyToken = types.LogPosition{} + newestToken = types.LogPosition{ + Offset: sarama.OffsetNewest, + Partition: 0, } ) @@ -180,7 +178,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { "!another:room": {syncingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -203,7 +201,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { "!another:room": {syncingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -226,7 +224,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -248,7 +246,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -307,7 +305,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { roomID: {syncingUser, existingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -335,7 +333,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { "!another:room": {syncingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -420,7 +418,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { "!another:room": {syncingUser}, }, } - hasNew, err := DeviceListCatchup( + _, hasNew, err := DeviceListCatchup( context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken, ) if err != nil { diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index 97e5da3cb..d50361ecb 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -3,18 +3,23 @@ package streams import ( "context" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/types" ) type DeviceListStreamProvider struct { PartitionedStreamProvider + rsAPI api.RoomserverInternalAPI + keyAPI keyapi.KeyInternalAPI } func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.LogPosition { - return p.LatestPosition(ctx) + return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx)) } func (p *DeviceListStreamProvider) IncrementalSync( @@ -22,6 +27,15 @@ func (p *DeviceListStreamProvider) IncrementalSync( req *types.SyncRequest, from, to types.LogPosition, ) types.LogPosition { + var err error + to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) + if err != nil { + return to // nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) + } + err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response) + if err != nil { + return to // res, fmt.Errorf("internal.DeviceOTKCounts: %w", err) + } return to } diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go index 779e4da5e..3a63b2677 100644 --- a/syncapi/streams/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -18,9 +18,6 @@ func (p *TypingStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition { - // It isn't beneficial to send previous typing notifications - // after a complete sync, so just return the latest position - // and otherwise do nothing. return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) } diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 43d041d91..07cb724cd 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -2,6 +2,8 @@ package streams import ( "github.com/matrix-org/dendrite/eduserver/cache" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -19,6 +21,7 @@ type Streams struct { func NewSyncStreamProviders( d storage.Database, userAPI userapi.UserInternalAPI, + rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, eduCache *cache.EDUCache, ) *Streams { streams := &Streams{ @@ -44,6 +47,8 @@ func NewSyncStreamProviders( }, DeviceListStreamProvider: &DeviceListStreamProvider{ PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, + rsAPI: rsAPI, + keyAPI: keyAPI, }, } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 95f9fc157..c2f80b7f4 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -18,7 +18,6 @@ package sync import ( "context" - "fmt" "net" "net/http" "strings" @@ -185,10 +184,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.DeviceListPosition): } - syncReq.Log.Println("Responding to sync after wakeup") + syncReq.Log.Debugln("Responding to sync after wakeup") waitcancel() } else { - syncReq.Log.Println("Responding to sync immediately") + syncReq.Log.Debugln("Responding to sync immediately") } if syncReq.Since.IsEmpty() { @@ -279,20 +278,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use JSON: jsonerror.InvalidArgumentValue("bad 'to' value"), } } - // work out room joins/leaves - /* - res, err := rp.db.IncrementalSync( - req.Context(), types.NewResponse(), *device, fromToken, toToken, 10, false, - ) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync") - return jsonerror.InternalServerError() - } - */ - res := types.NewResponse() - res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken) + syncReq, err := newSyncRequest(req, *device, rp.db) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info") + util.GetLogger(req.Context()).WithError(err).Error("newSyncRequest failed") + return jsonerror.InternalServerError() + } + rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition) + _, _, err = internal.DeviceListCatchup( + req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID, + syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info") return jsonerror.InternalServerError() } return util.JSONResponse{ @@ -301,171 +298,12 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use Changed []string `json:"changed"` Left []string `json:"left"` }{ - Changed: res.DeviceLists.Changed, - Left: res.DeviceLists.Left, + Changed: syncReq.Response.DeviceLists.Changed, + Left: syncReq.Response.DeviceLists.Left, }, } } -// nolint:gocyclo -/* -func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) { - res := types.NewResponse() - - // TODO: handle ignored users - if req.since.IsEmpty() { - res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit) - if err != nil { - return res, fmt.Errorf("rp.db.CompleteSync: %w", err) - } - } else { - res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState) - if err != nil { - return res, fmt.Errorf("rp.db.IncrementalSync: %w", err) - } - } - - accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead - res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter) - if err != nil { - return res, fmt.Errorf("rp.appendAccountData: %w", err) - } - res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos) - if err != nil { - return res, fmt.Errorf("rp.appendDeviceLists: %w", err) - } - err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res) - if err != nil { - return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err) - } - - return res, err -} -*/ - -func (rp *RequestPool) appendDeviceLists( - data *types.Response, userID string, since, to types.StreamingToken, -) (*types.Response, error) { - _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to) - if err != nil { - return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) - } - - return data, nil -} - -/* -func (rp *RequestPool) appendAccountData( - data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, - accountDataFilter *gomatrixserverlib.EventFilter, -) (*types.Response, error) { - // TODO: Account data doesn't have a sync position of its own, meaning that - // account data might be sent multiple time to the client if multiple account - // data keys were set between two message. This isn't a huge issue since the - // duplicate data doesn't represent a huge quantity of data, but an optimisation - // here would be making sure each data is sent only once to the client. - if req.since.IsEmpty() { - // If this is the initial sync, we don't need to check if a data has - // already been sent. Instead, we send the whole batch. - dataReq := &userapi.QueryAccountDataRequest{ - UserID: userID, - } - dataRes := &userapi.QueryAccountDataResponse{} - if err := rp.userAPI.QueryAccountData(req.ctx, dataReq, dataRes); err != nil { - return nil, err - } - for datatype, databody := range dataRes.GlobalAccountData { - data.AccountData.Events = append( - data.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: datatype, - Content: gomatrixserverlib.RawJSON(databody), - }, - ) - } - for r, j := range data.Rooms.Join { - for datatype, databody := range dataRes.RoomAccountData[r] { - j.AccountData.Events = append( - j.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: datatype, - Content: gomatrixserverlib.RawJSON(databody), - }, - ) - data.Rooms.Join[r] = j - } - } - return data, nil - } - - r := types.Range{ - From: req.since.PDUPosition, - To: currentPos, - } - // If both positions are the same, it means that the data was saved after the - // latest room event. In that case, we need to decrement the old position as - // results are exclusive of Low. - if r.Low() == r.High() { - r.From-- - } - - // Sync is not initial, get all account data since the latest sync - dataTypes, err := rp.db.GetAccountDataInRange( - req.ctx, userID, r, accountDataFilter, - ) - if err != nil { - return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err) - } - - if len(dataTypes) == 0 { - // TODO: this fixes the sytest but is it the right thing to do? - dataTypes[""] = []string{"m.push_rules"} - } - - // Iterate over the rooms - for roomID, dataTypes := range dataTypes { - // Request the missing data from the database - for _, dataType := range dataTypes { - dataReq := userapi.QueryAccountDataRequest{ - UserID: userID, - RoomID: roomID, - DataType: dataType, - } - dataRes := userapi.QueryAccountDataResponse{} - err = rp.userAPI.QueryAccountData(req.ctx, &dataReq, &dataRes) - if err != nil { - continue - } - if roomID == "" { - if globalData, ok := dataRes.GlobalAccountData[dataType]; ok { - data.AccountData.Events = append( - data.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: dataType, - Content: gomatrixserverlib.RawJSON(globalData), - }, - ) - } - } else { - if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok { - joinData := data.Rooms.Join[roomID] - joinData.AccountData.Events = append( - joinData.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: dataType, - Content: gomatrixserverlib.RawJSON(roomData), - }, - ) - data.Rooms.Join[roomID] = joinData - } - } - } - } - - return data, nil -} -*/ - // shouldReturnImmediately returns whether the /sync request is an initial sync, // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 420b94ab6..b58f7e5f4 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -51,7 +51,7 @@ func AddPublicRoutes( } eduCache := cache.New() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, eduCache) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache) requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams)