mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 05:43:09 -06:00
First cut untested algorithm
This commit is contained in:
parent
df9527fcc0
commit
2d89aa2b0e
|
|
@ -120,7 +120,7 @@ func (s *OutputKeyChangeEventConsumer) Catchup(
|
||||||
newlyJoinedRooms := joinedRooms(res, userID)
|
newlyJoinedRooms := joinedRooms(res, userID)
|
||||||
newlyLeftRooms := leftRooms(res)
|
newlyLeftRooms := leftRooms(res)
|
||||||
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
|
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
|
||||||
changed, left, err := s.trackChangedUsers(userID, newlyJoinedRooms, newlyLeftRooms)
|
changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
@ -135,7 +135,7 @@ func (s *OutputKeyChangeEventConsumer) Catchup(
|
||||||
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
||||||
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
|
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
|
||||||
// users keys:
|
// users keys:
|
||||||
changed, _, err := s.trackChangedUsers(*ev.StateKey(), []string{ev.RoomID()}, nil)
|
changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
|
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
|
||||||
return
|
return
|
||||||
|
|
@ -148,7 +148,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
|
||||||
|
|
||||||
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
||||||
// work out who we are no longer sharing any rooms with and notify them about the leaving user
|
// work out who we are no longer sharing any rooms with and notify them about the leaving user
|
||||||
_, left, err := s.trackChangedUsers(*ev.StateKey(), nil, []string{ev.RoomID()})
|
_, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
|
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
|
||||||
return
|
return
|
||||||
|
|
@ -160,14 +160,51 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputKeyChangeEventConsumer) trackChangedUsers(userID string, newlyJoinedRooms, newlyLeftRooms []string) (changed, left []string, err error) {
|
func (s *OutputKeyChangeEventConsumer) trackChangedUsers(
|
||||||
|
ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
||||||
|
) (changed, left []string, err error) {
|
||||||
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
|
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
|
||||||
|
|
||||||
// Leave algorithm:
|
// Leave algorithm:
|
||||||
// - Get set of users and number of times they appear in room prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
|
// - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
|
||||||
// - Get users in newly left room. - QueryCurrentState
|
// - Get users in newly left room. - QueryCurrentState
|
||||||
// - Loop set of users and decrement by 1 for each user in newly left room.
|
// - Loop set of users and decrement by 1 for each user in newly left room.
|
||||||
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
|
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
|
||||||
|
var queryRes currentstateAPI.QuerySharedUsersResponse
|
||||||
|
err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{
|
||||||
|
UserID: userID,
|
||||||
|
IncludeRoomIDs: newlyLeftRooms,
|
||||||
|
}, &queryRes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
var stateRes currentstateAPI.QueryBulkStateContentResponse
|
||||||
|
err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{
|
||||||
|
RoomIDs: newlyLeftRooms,
|
||||||
|
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||||
|
{
|
||||||
|
EventType: gomatrixserverlib.MRoomMember,
|
||||||
|
StateKey: "*",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
AllowWildcards: true,
|
||||||
|
}, &stateRes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
for _, state := range stateRes.Rooms {
|
||||||
|
for tuple, membership := range state {
|
||||||
|
if membership != "join" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
queryRes.UserIDsToCount[tuple.StateKey]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for userID, count := range queryRes.UserIDsToCount {
|
||||||
|
if count <= 0 {
|
||||||
|
left = append(left, userID) // left is returned
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Join algorithm:
|
// Join algorithm:
|
||||||
// - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
|
// - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
|
||||||
|
|
@ -175,7 +212,38 @@ func (s *OutputKeyChangeEventConsumer) trackChangedUsers(userID string, newlyJoi
|
||||||
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
|
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
|
||||||
// - If yes: then they already shared a room in common, do nothing.
|
// - If yes: then they already shared a room in common, do nothing.
|
||||||
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
|
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
|
||||||
return
|
err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{
|
||||||
|
UserID: userID,
|
||||||
|
ExcludeRoomIDs: newlyJoinedRooms,
|
||||||
|
}, &queryRes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, left, err
|
||||||
|
}
|
||||||
|
err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{
|
||||||
|
RoomIDs: newlyJoinedRooms,
|
||||||
|
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||||
|
{
|
||||||
|
EventType: gomatrixserverlib.MRoomMember,
|
||||||
|
StateKey: "*",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
AllowWildcards: true,
|
||||||
|
}, &stateRes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, left, err
|
||||||
|
}
|
||||||
|
for _, state := range stateRes.Rooms {
|
||||||
|
for tuple, membership := range state {
|
||||||
|
if membership != "join" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// new user who we weren't previously sharing rooms with
|
||||||
|
if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
|
||||||
|
changed = append(changed, tuple.StateKey) // changed is returned
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return changed, left, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func joinedRooms(res *types.Response, userID string) []string {
|
func joinedRooms(res *types.Response, userID string) []string {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue