From 2d89aa2b0ed4a03a5ba67ce5a82bcd55d18c0481 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 24 Jul 2020 12:44:04 +0100 Subject: [PATCH] First cut untested algorithm --- syncapi/consumers/keychange.go | 80 +++++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 8d7e1e1f5..4a5952a80 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -120,7 +120,7 @@ func (s *OutputKeyChangeEventConsumer) Catchup( newlyJoinedRooms := joinedRooms(res, userID) newlyLeftRooms := leftRooms(res) if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := s.trackChangedUsers(userID, newlyJoinedRooms, newlyLeftRooms) + changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms) if err != nil { return false, err } @@ -135,7 +135,7 @@ func (s *OutputKeyChangeEventConsumer) Catchup( func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := s.trackChangedUsers(*ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -148,7 +148,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := s.trackChangedUsers(*ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return @@ -160,14 +160,51 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header } -func (s *OutputKeyChangeEventConsumer) trackChangedUsers(userID string, newlyJoinedRooms, newlyLeftRooms []string) (changed, left []string, err error) { +func (s *OutputKeyChangeEventConsumer) trackChangedUsers( + ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string, +) (changed, left []string, err error) { // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. // Leave algorithm: - // - Get set of users and number of times they appear in room prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. + // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. // - Get users in newly left room. - QueryCurrentState // - Loop set of users and decrement by 1 for each user in newly left room. // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. + var queryRes currentstateAPI.QuerySharedUsersResponse + err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + IncludeRoomIDs: newlyLeftRooms, + }, &queryRes) + if err != nil { + return nil, nil, err + } + var stateRes currentstateAPI.QueryBulkStateContentResponse + err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyLeftRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, nil, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != "join" { + continue + } + queryRes.UserIDsToCount[tuple.StateKey]-- + } + } + for userID, count := range queryRes.UserIDsToCount { + if count <= 0 { + left = append(left, userID) // left is returned + } + } // Join algorithm: // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. @@ -175,7 +212,38 @@ func (s *OutputKeyChangeEventConsumer) trackChangedUsers(userID string, newlyJoi // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? // - If yes: then they already shared a room in common, do nothing. // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' - return + err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + ExcludeRoomIDs: newlyJoinedRooms, + }, &queryRes) + if err != nil { + return nil, left, err + } + err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyJoinedRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, left, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != "join" { + continue + } + // new user who we weren't previously sharing rooms with + if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { + changed = append(changed, tuple.StateKey) // changed is returned + } + } + } + return changed, left, nil } func joinedRooms(res *types.Response, userID string) []string {