diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index d67395fb3..0761a1d10 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -29,7 +29,9 @@ func main() { rsAPI := base.RoomserverHTTPClient() - syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg) + syncapi.AddPublicRoutes( + base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, base.KeyServerHTTPClient(), base.CurrentStateAPIClient(), + federation, cfg) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 8e8c19d2d..35978be71 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -23,16 +23,14 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" + syncinternal "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" syncapi "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) -const deviceListLogName = "dl" - // OutputKeyChangeEventConsumer consumes events that originated in the key server. type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer @@ -117,7 +115,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ - deviceListLogName: &types.LogPosition{ + syncinternal.DeviceListLogName: &types.LogPosition{ Offset: msg.Offset, Partition: msg.Partition, }, @@ -128,72 +126,10 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return nil } -// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response -// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST -// be already filled in with join/leave information. -func (s *OutputKeyChangeEventConsumer) Catchup( - ctx context.Context, userID string, res *types.Response, tok types.StreamingToken, -) (newTok *types.StreamingToken, hasNew bool, err error) { - // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. - newlyJoinedRooms := joinedRooms(res, userID) - newlyLeftRooms := leftRooms(res) - if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms) - if err != nil { - return nil, false, err - } - res.DeviceLists.Changed = changed - res.DeviceLists.Left = left - hasNew = len(changed) > 0 || len(left) > 0 - } - - // now also track users who we already share rooms with but who have updated their devices between the two tokens - - var partition int32 - var offset int64 - // Extract partition/offset from sync token - // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. - logOffset := tok.Log(deviceListLogName) - if logOffset != nil { - partition = logOffset.Partition - offset = logOffset.Offset - } else { - partition = -1 - offset = sarama.OffsetOldest - } - var queryRes api.QueryKeyChangesResponse - s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ - Partition: partition, - Offset: offset, - }, &queryRes) - if queryRes.Error != nil { - // don't fail the catchup because we may have got useful information by tracking membership - util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") - return - } - userSet := make(map[string]bool) - for _, userID := range res.DeviceLists.Changed { - userSet[userID] = true - } - for _, userID := range queryRes.UserIDs { - if !userSet[userID] { - res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) - hasNew = true - } - } - // Make a new streaming token using the new offset - tok.SetLog(deviceListLogName, &types.LogPosition{ - Offset: queryRes.Offset, - Partition: queryRes.Partition, - }) - newTok = &tok - return -} - func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -206,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return @@ -217,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header } } - -// nolint:gocyclo -func (s *OutputKeyChangeEventConsumer) trackChangedUsers( - ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string, -) (changed, left []string, err error) { - // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. - - // Leave algorithm: - // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. - // - Get users in newly left room. - QueryCurrentState - // - Loop set of users and decrement by 1 for each user in newly left room. - // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. - var queryRes currentstateAPI.QuerySharedUsersResponse - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - IncludeRoomIDs: newlyLeftRooms, - }, &queryRes) - if err != nil { - return nil, nil, err - } - var stateRes currentstateAPI.QueryBulkStateContentResponse - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyLeftRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, nil, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - queryRes.UserIDsToCount[tuple.StateKey]-- - } - } - for userID, count := range queryRes.UserIDsToCount { - if count <= 0 { - left = append(left, userID) // left is returned - } - } - - // Join algorithm: - // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. - // - Get users in newly joined room - QueryCurrentState - // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? - // - If yes: then they already shared a room in common, do nothing. - // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - ExcludeRoomIDs: newlyJoinedRooms, - }, &queryRes) - if err != nil { - return nil, left, err - } - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyJoinedRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, left, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - // new user who we weren't previously sharing rooms with - if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { - changed = append(changed, tuple.StateKey) // changed is returned - } - } - } - return changed, left, nil -} - -func joinedRooms(res *types.Response, userID string) []string { - var roomIDs []string - for roomID, join := range res.Rooms.Join { - // we would expect to see our join event somewhere if we newly joined the room. - // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. - newlyJoined := membershipEventPresent(join.State.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - continue - } - newlyJoined = membershipEventPresent(join.Timeline.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - } - } - return roomIDs -} - -func leftRooms(res *types.Response) []string { - roomIDs := make([]string, len(res.Rooms.Leave)) - i := 0 - for roomID := range res.Rooms.Leave { - roomIDs[i] = roomID - i++ - } - return roomIDs -} - -func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { - for _, ev := range events { - // it's enough to know that we have our member event here, don't need to check membership content - // as it's implied by being in the respective section of the sync response. - if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { - return true - } - } - return false -} diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go new file mode 100644 index 000000000..b594cc623 --- /dev/null +++ b/syncapi/internal/keychange.go @@ -0,0 +1,219 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + + "github.com/Shopify/sarama" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/keyserver/api" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const DeviceListLogName = "dl" + +// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response +// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST +// be already filled in with join/leave information. +func DeviceListCatchup( + ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + userID string, res *types.Response, tok types.StreamingToken, +) (newTok *types.StreamingToken, hasNew bool, err error) { + // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. + newlyJoinedRooms := joinedRooms(res, userID) + newlyLeftRooms := leftRooms(res) + if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { + changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) + if err != nil { + return nil, false, err + } + res.DeviceLists.Changed = changed + res.DeviceLists.Left = left + hasNew = len(changed) > 0 || len(left) > 0 + } + + // now also track users who we already share rooms with but who have updated their devices between the two tokens + + var partition int32 + var offset int64 + // Extract partition/offset from sync token + // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. + logOffset := tok.Log(DeviceListLogName) + if logOffset != nil { + partition = logOffset.Partition + offset = logOffset.Offset + } else { + partition = -1 + offset = sarama.OffsetOldest + } + var queryRes api.QueryKeyChangesResponse + keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ + Partition: partition, + Offset: offset, + }, &queryRes) + if queryRes.Error != nil { + // don't fail the catchup because we may have got useful information by tracking membership + util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") + return + } + userSet := make(map[string]bool) + for _, userID := range res.DeviceLists.Changed { + userSet[userID] = true + } + for _, userID := range queryRes.UserIDs { + if !userSet[userID] { + res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) + hasNew = true + } + } + // Make a new streaming token using the new offset + tok.SetLog(DeviceListLogName, &types.LogPosition{ + Offset: queryRes.Offset, + Partition: queryRes.Partition, + }) + newTok = &tok + return +} + +// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. +// nolint:gocyclo +func TrackChangedUsers( + ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, +) (changed, left []string, err error) { + // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. + + // Leave algorithm: + // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. + // - Get users in newly left room. - QueryCurrentState + // - Loop set of users and decrement by 1 for each user in newly left room. + // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. + var queryRes currentstateAPI.QuerySharedUsersResponse + err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + IncludeRoomIDs: newlyLeftRooms, + }, &queryRes) + if err != nil { + return nil, nil, err + } + var stateRes currentstateAPI.QueryBulkStateContentResponse + err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyLeftRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, nil, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + queryRes.UserIDsToCount[tuple.StateKey]-- + } + } + for userID, count := range queryRes.UserIDsToCount { + if count <= 0 { + left = append(left, userID) // left is returned + } + } + + // Join algorithm: + // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. + // - Get users in newly joined room - QueryCurrentState + // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? + // - If yes: then they already shared a room in common, do nothing. + // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' + err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + ExcludeRoomIDs: newlyJoinedRooms, + }, &queryRes) + if err != nil { + return nil, left, err + } + err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyJoinedRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, left, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + // new user who we weren't previously sharing rooms with + if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { + changed = append(changed, tuple.StateKey) // changed is returned + } + } + } + return changed, left, nil +} + +func joinedRooms(res *types.Response, userID string) []string { + var roomIDs []string + for roomID, join := range res.Rooms.Join { + // we would expect to see our join event somewhere if we newly joined the room. + // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. + newlyJoined := membershipEventPresent(join.State.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + continue + } + newlyJoined = membershipEventPresent(join.Timeline.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + } + } + return roomIDs +} + +func leftRooms(res *types.Response) []string { + roomIDs := make([]string, len(res.Rooms.Leave)) + i := 0 + for roomID := range res.Rooms.Leave { + roomIDs[i] = roomID + i++ + } + return roomIDs +} + +func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { + for _, ev := range events { + // it's enough to know that we have our member event here, don't need to check membership content + // as it's implied by being in the respective section of the sync response. + if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { + return true + } + } + return false +} diff --git a/syncapi/consumers/keychange_test.go b/syncapi/internal/keychange_test.go similarity index 86% rename from syncapi/consumers/keychange_test.go rename to syncapi/internal/keychange_test.go index 3ecb3f583..d0d27e448 100644 --- a/syncapi/consumers/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -1,4 +1,4 @@ -package consumers +package internal import ( "context" @@ -159,18 +159,17 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { newShareUser := "@bill:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, @@ -182,18 +181,17 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { removeUser := "@bill:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {removeUser}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, @@ -205,16 +203,15 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { existingUser := "@bob:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -227,18 +224,17 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { existingUser := "@bob:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: false, @@ -249,11 +245,6 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { existingUser := "@bob1:localhost" roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ - roomIDToJoinedMembers: map[string][]string{ - roomID: {syncingUser, existingUser}, - }, - }, nil) syncResponse := types.NewResponse() empty := "" roomStateEvents := []gomatrixserverlib.ClientEvent{ @@ -295,9 +286,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {syncingUser, existingUser}, + }, + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: false, @@ -312,18 +307,17 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { newlyLeftUser2 := "@debra:localhost" newlyJoinedRoom := "!join:bar" newlyLeftRoom := "!left:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -348,12 +342,6 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { newShareUser := "@berta:localhost" newShareUser2 := "@bobby:localhost" roomID := "!join:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ - roomIDToJoinedMembers: map[string][]string{ - roomID: {newShareUser, newShareUser2}, - "!another:room": {syncingUser}, - }, - }, nil) syncResponse := types.NewResponse() roomEvents := []gomatrixserverlib.ClientEvent{ { @@ -408,9 +396,14 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {newShareUser, newShareUser2}, + "!another:room": {syncingUser}, + }, + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 65ad2f3a3..00115094d 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -22,7 +22,9 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/syncapi/consumers" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -33,15 +35,19 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - userAPI userapi.UserInternalAPI - notifier *Notifier - keyChanges *consumers.OutputKeyChangeEventConsumer + db storage.Database + userAPI userapi.UserInternalAPI + notifier *Notifier + keyAPI keyapi.KeyInternalAPI + stateAPI currentstateAPI.CurrentStateInternalAPI } // NewRequestPool makes a new RequestPool -func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool { - return &RequestPool{db, userAPI, n} +func NewRequestPool( + db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, +) *RequestPool { + return &RequestPool{db, userAPI, n, keyAPI, stateAPI} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -206,7 +212,7 @@ func (rp *RequestPool) appendDeviceLists( // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then // returns the latest position with which the response has authority on). We'd need to tweak // the API to expose a "to" value to fix this. - _, _, err := rp.keyChanges.Catchup(context.Background(), userID, data, since) + _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since) if err != nil { return nil, err } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 570f53344..754cd5026 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -62,7 +62,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start notifier") } - requestPool := sync.NewRequestPool(syncDB, notifier, userAPI) + requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) roomConsumer := consumers.NewOutputRoomEventConsumer( cfg, consumer, notifier, syncDB, rsAPI,