From 9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 30 Jul 2020 11:15:46 +0100 Subject: [PATCH] Hook up device list updates to the sync notifier (#1231) * WIP hooking up key changes * Fix import cycle, get tests passing and binary compiling * Linting and update whitelist --- cmd/dendrite-sync-api-server/main.go | 4 +- internal/setup/monolith.go | 3 +- keyserver/api/api.go | 2 + keyserver/internal/internal.go | 4 + keyserver/producers/keychange.go | 9 + syncapi/consumers/keychange.go | 191 ++------------- syncapi/internal/keychange.go | 219 ++++++++++++++++++ .../{consumers => internal}/keychange_test.go | 93 ++++---- syncapi/sync/notifier.go | 10 + syncapi/sync/requestpool.go | 32 ++- syncapi/syncapi.go | 14 +- syncapi/types/types.go | 4 + sytest-whitelist | 1 + 13 files changed, 356 insertions(+), 230 deletions(-) create mode 100644 syncapi/internal/keychange.go rename syncapi/{consumers => internal}/keychange_test.go (86%) diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index d67395fb3..0761a1d10 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -29,7 +29,9 @@ func main() { rsAPI := base.RoomserverHTTPClient() - syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg) + syncapi.AddPublicRoutes( + base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, base.KeyServerHTTPClient(), base.CurrentStateAPIClient(), + federation, cfg) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index 1f6d9a761..f33f97ee4 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -77,6 +77,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { ) mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client) syncapi.AddPublicRoutes( - publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config, + publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, + m.KeyAPI, m.StateAPI, m.FedClient, m.Config, ) } diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 406a252d5..c9afb09cc 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -143,6 +143,8 @@ type QueryKeyChangesRequest struct { type QueryKeyChangesResponse struct { // The set of users who have had their keys change. UserIDs []string + // The partition being served - useful if the partition is unknown at request time + Partition int32 // The latest offset represented in this response. Offset int64 // Set if there was a problem handling the request. diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 240a56403..9a41e44fc 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -41,6 +41,9 @@ type KeyInternalAPI struct { } func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { + if req.Partition < 0 { + req.Partition = a.Producer.DefaultPartition() + } userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset) if err != nil { res.Error = &api.KeyError{ @@ -48,6 +51,7 @@ func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyC } } res.Offset = latest + res.Partition = req.Partition res.UserIDs = userIDs } diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index d59dd2002..c51d9f55d 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -31,6 +31,15 @@ type KeyChange struct { DB storage.Database } +// DefaultPartition returns the default partition this process is sending key changes to. +// NB: A keyserver MUST send key changes to only 1 partition or else query operations will +// become inconsistent. Partitions can be sharded (e.g by hash of user ID of key change) but +// then all keyservers must be queried to calculate the entire set of key changes between +// two sync tokens. +func (p *KeyChange) DefaultPartition() int32 { + return 0 +} + // ProduceKeyChanges creates new change events for each key func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error { for _, key := range keys { diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 78aff6011..35978be71 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -23,10 +23,11 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" + syncinternal "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" + syncapi "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -39,6 +40,7 @@ type OutputKeyChangeEventConsumer struct { keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex + notifier *syncapi.Notifier } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -47,6 +49,7 @@ func NewOutputKeyChangeEventConsumer( serverName gomatrixserverlib.ServerName, topic string, kafkaConsumer sarama.Consumer, + n *syncapi.Notifier, keyAPI api.KeyInternalAPI, currentStateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, @@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer( currentStateAPI: currentStateAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, + notifier: n, } consumer.ProcessMessage = s.onMessage @@ -110,59 +114,22 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams + posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ + syncinternal.DeviceListLogName: &types.LogPosition{ + Offset: msg.Offset, + Partition: msg.Partition, + }, + }) + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) + } return nil } -// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response -// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST -// be already filled in with join/leave information. -func (s *OutputKeyChangeEventConsumer) Catchup( - ctx context.Context, userID string, res *types.Response, tok types.StreamingToken, -) (newTok *types.StreamingToken, hasNew bool, err error) { - // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. - newlyJoinedRooms := joinedRooms(res, userID) - newlyLeftRooms := leftRooms(res) - if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms) - if err != nil { - return nil, false, err - } - res.DeviceLists.Changed = changed - res.DeviceLists.Left = left - hasNew = len(changed) > 0 || len(left) > 0 - } - - // now also track users who we already share rooms with but who have updated their devices between the two tokens - // TODO: Extract partition/offset from sync token - var partition int32 - var offset int64 - var queryRes api.QueryKeyChangesResponse - s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ - Partition: partition, - Offset: offset, - }, &queryRes) - if queryRes.Error != nil { - // don't fail the catchup because we may have got useful information by tracking membership - util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") - } else { - // TODO: Make a new streaming token using the new offset - userSet := make(map[string]bool) - for _, userID := range res.DeviceLists.Changed { - userSet[userID] = true - } - for _, userID := range queryRes.UserIDs { - if !userSet[userID] { - res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) - } - } - } - return -} - func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -175,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return @@ -186,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header } } - -// nolint:gocyclo -func (s *OutputKeyChangeEventConsumer) trackChangedUsers( - ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string, -) (changed, left []string, err error) { - // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. - - // Leave algorithm: - // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. - // - Get users in newly left room. - QueryCurrentState - // - Loop set of users and decrement by 1 for each user in newly left room. - // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. - var queryRes currentstateAPI.QuerySharedUsersResponse - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - IncludeRoomIDs: newlyLeftRooms, - }, &queryRes) - if err != nil { - return nil, nil, err - } - var stateRes currentstateAPI.QueryBulkStateContentResponse - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyLeftRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, nil, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - queryRes.UserIDsToCount[tuple.StateKey]-- - } - } - for userID, count := range queryRes.UserIDsToCount { - if count <= 0 { - left = append(left, userID) // left is returned - } - } - - // Join algorithm: - // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. - // - Get users in newly joined room - QueryCurrentState - // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? - // - If yes: then they already shared a room in common, do nothing. - // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - ExcludeRoomIDs: newlyJoinedRooms, - }, &queryRes) - if err != nil { - return nil, left, err - } - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyJoinedRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, left, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - // new user who we weren't previously sharing rooms with - if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { - changed = append(changed, tuple.StateKey) // changed is returned - } - } - } - return changed, left, nil -} - -func joinedRooms(res *types.Response, userID string) []string { - var roomIDs []string - for roomID, join := range res.Rooms.Join { - // we would expect to see our join event somewhere if we newly joined the room. - // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. - newlyJoined := membershipEventPresent(join.State.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - continue - } - newlyJoined = membershipEventPresent(join.Timeline.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - } - } - return roomIDs -} - -func leftRooms(res *types.Response) []string { - roomIDs := make([]string, len(res.Rooms.Leave)) - i := 0 - for roomID := range res.Rooms.Leave { - roomIDs[i] = roomID - i++ - } - return roomIDs -} - -func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { - for _, ev := range events { - // it's enough to know that we have our member event here, don't need to check membership content - // as it's implied by being in the respective section of the sync response. - if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { - return true - } - } - return false -} diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go new file mode 100644 index 000000000..b594cc623 --- /dev/null +++ b/syncapi/internal/keychange.go @@ -0,0 +1,219 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + + "github.com/Shopify/sarama" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/keyserver/api" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const DeviceListLogName = "dl" + +// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response +// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST +// be already filled in with join/leave information. +func DeviceListCatchup( + ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + userID string, res *types.Response, tok types.StreamingToken, +) (newTok *types.StreamingToken, hasNew bool, err error) { + // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. + newlyJoinedRooms := joinedRooms(res, userID) + newlyLeftRooms := leftRooms(res) + if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { + changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) + if err != nil { + return nil, false, err + } + res.DeviceLists.Changed = changed + res.DeviceLists.Left = left + hasNew = len(changed) > 0 || len(left) > 0 + } + + // now also track users who we already share rooms with but who have updated their devices between the two tokens + + var partition int32 + var offset int64 + // Extract partition/offset from sync token + // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. + logOffset := tok.Log(DeviceListLogName) + if logOffset != nil { + partition = logOffset.Partition + offset = logOffset.Offset + } else { + partition = -1 + offset = sarama.OffsetOldest + } + var queryRes api.QueryKeyChangesResponse + keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ + Partition: partition, + Offset: offset, + }, &queryRes) + if queryRes.Error != nil { + // don't fail the catchup because we may have got useful information by tracking membership + util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") + return + } + userSet := make(map[string]bool) + for _, userID := range res.DeviceLists.Changed { + userSet[userID] = true + } + for _, userID := range queryRes.UserIDs { + if !userSet[userID] { + res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) + hasNew = true + } + } + // Make a new streaming token using the new offset + tok.SetLog(DeviceListLogName, &types.LogPosition{ + Offset: queryRes.Offset, + Partition: queryRes.Partition, + }) + newTok = &tok + return +} + +// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. +// nolint:gocyclo +func TrackChangedUsers( + ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, +) (changed, left []string, err error) { + // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. + + // Leave algorithm: + // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. + // - Get users in newly left room. - QueryCurrentState + // - Loop set of users and decrement by 1 for each user in newly left room. + // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. + var queryRes currentstateAPI.QuerySharedUsersResponse + err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + IncludeRoomIDs: newlyLeftRooms, + }, &queryRes) + if err != nil { + return nil, nil, err + } + var stateRes currentstateAPI.QueryBulkStateContentResponse + err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyLeftRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, nil, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + queryRes.UserIDsToCount[tuple.StateKey]-- + } + } + for userID, count := range queryRes.UserIDsToCount { + if count <= 0 { + left = append(left, userID) // left is returned + } + } + + // Join algorithm: + // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. + // - Get users in newly joined room - QueryCurrentState + // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? + // - If yes: then they already shared a room in common, do nothing. + // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' + err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + ExcludeRoomIDs: newlyJoinedRooms, + }, &queryRes) + if err != nil { + return nil, left, err + } + err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyJoinedRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, left, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + // new user who we weren't previously sharing rooms with + if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { + changed = append(changed, tuple.StateKey) // changed is returned + } + } + } + return changed, left, nil +} + +func joinedRooms(res *types.Response, userID string) []string { + var roomIDs []string + for roomID, join := range res.Rooms.Join { + // we would expect to see our join event somewhere if we newly joined the room. + // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. + newlyJoined := membershipEventPresent(join.State.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + continue + } + newlyJoined = membershipEventPresent(join.Timeline.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + } + } + return roomIDs +} + +func leftRooms(res *types.Response) []string { + roomIDs := make([]string, len(res.Rooms.Leave)) + i := 0 + for roomID := range res.Rooms.Leave { + roomIDs[i] = roomID + i++ + } + return roomIDs +} + +func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { + for _, ev := range events { + // it's enough to know that we have our member event here, don't need to check membership content + // as it's implied by being in the respective section of the sync response. + if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { + return true + } + } + return false +} diff --git a/syncapi/consumers/keychange_test.go b/syncapi/internal/keychange_test.go similarity index 86% rename from syncapi/consumers/keychange_test.go rename to syncapi/internal/keychange_test.go index 3ecb3f583..d0d27e448 100644 --- a/syncapi/consumers/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -1,4 +1,4 @@ -package consumers +package internal import ( "context" @@ -159,18 +159,17 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { newShareUser := "@bill:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, @@ -182,18 +181,17 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { removeUser := "@bill:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {removeUser}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, @@ -205,16 +203,15 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { existingUser := "@bob:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -227,18 +224,17 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { existingUser := "@bob:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: false, @@ -249,11 +245,6 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { existingUser := "@bob1:localhost" roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ - roomIDToJoinedMembers: map[string][]string{ - roomID: {syncingUser, existingUser}, - }, - }, nil) syncResponse := types.NewResponse() empty := "" roomStateEvents := []gomatrixserverlib.ClientEvent{ @@ -295,9 +286,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {syncingUser, existingUser}, + }, + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: false, @@ -312,18 +307,17 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { newlyLeftUser2 := "@debra:localhost" newlyJoinedRoom := "!join:bar" newlyLeftRoom := "!left:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -348,12 +342,6 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { newShareUser := "@berta:localhost" newShareUser2 := "@bobby:localhost" roomID := "!join:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ - roomIDToJoinedMembers: map[string][]string{ - roomID: {newShareUser, newShareUser2}, - "!another:room": {syncingUser}, - }, - }, nil) syncResponse := types.NewResponse() roomEvents := []gomatrixserverlib.ClientEvent{ { @@ -408,9 +396,14 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {newShareUser, newShareUser2}, + "!another:room": {syncingUser}, + }, + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 325e75351..df23a2f4a 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -132,6 +132,16 @@ func (n *Notifier) OnNewSendToDevice( n.wakeupUserDevice(userID, deviceIDs, latestPos) } +func (n *Notifier) OnNewKeyChange( + posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + latestPos := n.currPos.WithUpdates(posUpdate) + n.currPos = latestPos + n.wakeupUsers([]string{wakeUserID}, latestPos) +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index bf6a9e01f..754d69833 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -22,6 +22,9 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -35,11 +38,16 @@ type RequestPool struct { db storage.Database userAPI userapi.UserInternalAPI notifier *Notifier + keyAPI keyapi.KeyInternalAPI + stateAPI currentstateAPI.CurrentStateInternalAPI } // NewRequestPool makes a new RequestPool -func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool { - return &RequestPool{db, userAPI, n} +func NewRequestPool( + db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, +) *RequestPool { + return &RequestPool{db, userAPI, n, keyAPI, stateAPI} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -164,6 +172,10 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea if err != nil { return } + res, err = rp.appendDeviceLists(res, req.device.UserID, since) + if err != nil { + return + } // Before we return the sync response, make sure that we take action on // any send-to-device database updates or deletions that we need to do. @@ -192,6 +204,22 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea return } +func (rp *RequestPool) appendDeviceLists( + data *types.Response, userID string, since types.StreamingToken, +) (*types.Response, error) { + // TODO: Currently this code will race which may result in duplicates but not missing data. + // This happens because, whilst we are told the range to fetch here (since / latest) the + // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then + // returns the latest position with which the response has authority on). We'd need to tweak + // the API to expose a "to" value to fix this. + _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since) + if err != nil { + return nil, err + } + + return data, nil +} + // nolint:gocyclo func (rp *RequestPool) appendAccountData( data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index caf91e27e..754cd5026 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -21,7 +21,9 @@ import ( "github.com/gorilla/mux" "github.com/sirupsen/logrus" + currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -39,6 +41,8 @@ func AddPublicRoutes( consumer sarama.Consumer, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, + keyAPI keyapi.KeyInternalAPI, + currentStateAPI currentstateapi.CurrentStateInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, ) { @@ -58,7 +62,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start notifier") } - requestPool := sync.NewRequestPool(syncDB, notifier, userAPI) + requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) roomConsumer := consumers.NewOutputRoomEventConsumer( cfg, consumer, notifier, syncDB, rsAPI, @@ -88,5 +92,13 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start send-to-device consumer") } + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( + cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), + consumer, notifier, keyAPI, currentStateAPI, syncDB, + ) + if err = keyChangeConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start key change consumer") + } + routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 7bba8e522..f20c73bff 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -110,6 +110,10 @@ type StreamingToken struct { logs map[string]*LogPosition } +func (t *StreamingToken) SetLog(name string, lp *LogPosition) { + t.logs[name] = lp +} + func (t *StreamingToken) Log(name string) *LogPosition { l, ok := t.logs[name] if !ok { diff --git a/sytest-whitelist b/sytest-whitelist index 388f95e08..26922df4c 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -127,6 +127,7 @@ Can query specific device keys using POST query for user with no keys returns empty key dict Can claim one time key using POST Can claim remote one time key using POST +Local device key changes appear in v2 /sync Can add account data Can add account data to room Can get account data without syncing