Bugfixes for 'If remote user leaves room we no longer receive device updates'

This commit is contained in:
Kegan Dougal 2020-08-11 18:01:18 +01:00
parent cce3678abe
commit 7f8025a647
4 changed files with 31 additions and 13 deletions

View file

@ -831,6 +831,7 @@ psh Trying to get push rules with unknown scope fails with 400
psh Trying to get push rules with unknown template fails with 400 psh Trying to get push rules with unknown template fails with 400
psh Trying to get push rules with unknown attribute fails with 400 psh Trying to get push rules with unknown attribute fails with 400
psh Trying to get push rules with unknown rule_id fails with 404 psh Trying to get push rules with unknown rule_id fails with 404
psh Rooms with names are correctly named in pushes
v1s GET /initialSync with non-numeric 'limit' v1s GET /initialSync with non-numeric 'limit'
v1s GET /events with non-numeric 'limit' v1s GET /events with non-numeric 'limit'
v1s GET /events with negative 'limit' v1s GET /events with negative 'limit'
@ -839,7 +840,7 @@ ath Event size limits
syn Check creating invalid filters returns 4xx syn Check creating invalid filters returns 4xx
f,pre New federated private chats get full presence information (SYN-115) f,pre New federated private chats get full presence information (SYN-115)
pre Left room members do not cause problems for presence pre Left room members do not cause problems for presence
crm Rooms can be created with an initial invite list (SYN-205) crm Rooms can be created with an initial invite list (SYN-205) (1 subtests)
typ Typing notifications don't leak typ Typing notifications don't leak
ban Non-present room members cannot ban others ban Non-present room members cannot ban others
psh Getting push rules doesn't corrupt the cache SYN-390 psh Getting push rules doesn't corrupt the cache SYN-390

View file

@ -230,6 +230,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
} }
// on failure, spin up a short-lived goroutine to inject the server name again. // on failure, spin up a short-lived goroutine to inject the server name again.
scheduledRetries := make(map[gomatrixserverlib.ServerName]time.Time)
inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) { inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) {
time.Sleep(duration) time.Sleep(duration)
ch <- srv ch <- srv
@ -237,13 +238,20 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
for serverName := range ch { for serverName := range ch {
if !shouldProcess(serverName) { if !shouldProcess(serverName) {
// do not inject into the channel as we know there will be a sleeping goroutine if time.Now().Before(scheduledRetries[serverName]) {
// which will do it after the cooloff period expires // do not inject into the channel as we know there will be a sleeping goroutine
continue // which will do it after the cooloff period expires
continue
} else {
scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
go inject(serverName, cooloffPeriod) // TODO: Backoff?
continue
}
} }
lastProcessed[serverName] = time.Now() lastProcessed[serverName] = time.Now()
shouldRetry := u.processServer(serverName) shouldRetry := u.processServer(serverName)
if shouldRetry { if shouldRetry {
scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
go inject(serverName, cooloffPeriod) // TODO: Backoff? go inject(serverName, cooloffPeriod) // TODO: Backoff?
} }
} }

View file

@ -96,7 +96,8 @@ func DeviceListCatchup(
return hasNew, nil return hasNew, nil
} }
// QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user. // QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user.
queryRes.UserIDs = filterSharedUsers(ctx, stateAPI, userID, queryRes.UserIDs) var sharedUsersMap map[string]int
sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, stateAPI, userID, queryRes.UserIDs)
util.GetLogger(ctx).Debugf( util.GetLogger(ctx).Debugf(
"QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v", "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v",
partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs, partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs,
@ -114,13 +115,20 @@ func DeviceListCatchup(
} }
// if the response has any join/leave events, add them now. // if the response has any join/leave events, add them now.
// TODO: This is sub-optimal because we will add users to `changed` even if we already shared a room with them. // TODO: This is sub-optimal because we will add users to `changed` even if we already shared a room with them.
for _, userID := range membershipEvents(res) { joinUserIDs, leaveUserIDs := membershipEvents(res)
for _, userID := range joinUserIDs {
if !userSet[userID] { if !userSet[userID] {
res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
hasNew = true hasNew = true
userSet[userID] = true userSet[userID] = true
} }
} }
for _, userID := range leaveUserIDs {
if sharedUsersMap[userID] == 0 {
// we no longer share a room with this user when they left, so add to left list.
res.DeviceLists.Left = append(res.DeviceLists.Left, userID)
}
}
// set the new token // set the new token
to.SetLog(DeviceListLogName, &types.LogPosition{ to.SetLog(DeviceListLogName, &types.LogPosition{
Partition: queryRes.Partition, Partition: queryRes.Partition,
@ -221,7 +229,7 @@ func TrackChangedUsers(
func filterSharedUsers( func filterSharedUsers(
ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, usersWithChangedKeys []string, ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, usersWithChangedKeys []string,
) []string { ) (map[string]int, []string) {
var result []string var result []string
var sharedUsersRes currentstateAPI.QuerySharedUsersResponse var sharedUsersRes currentstateAPI.QuerySharedUsersResponse
err := stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{ err := stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
@ -229,7 +237,7 @@ func filterSharedUsers(
}, &sharedUsersRes) }, &sharedUsersRes)
if err != nil { if err != nil {
// default to all users so we do needless queries rather than miss some important device update // default to all users so we do needless queries rather than miss some important device update
return usersWithChangedKeys return nil, usersWithChangedKeys
} }
// We forcibly put ourselves in this list because we should be notified about our own device updates // We forcibly put ourselves in this list because we should be notified about our own device updates
// and if we are in 0 rooms then we don't technically share any room with ourselves so we wouldn't // and if we are in 0 rooms then we don't technically share any room with ourselves so we wouldn't
@ -241,7 +249,7 @@ func filterSharedUsers(
result = append(result, uid) result = append(result, uid)
} }
} }
return result return sharedUsersRes.UserIDsToCount, result
} }
func joinedRooms(res *types.Response, userID string) []string { func joinedRooms(res *types.Response, userID string) []string {
@ -288,16 +296,16 @@ func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID strin
// "For optimal performance, Alice should be added to changed in Bob's sync only when she adds a new device, // "For optimal performance, Alice should be added to changed in Bob's sync only when she adds a new device,
// or when Alice and Bob now share a room but didn't share any room previously. However, for the sake of simpler // or when Alice and Bob now share a room but didn't share any room previously. However, for the sake of simpler
// logic, a server may add Alice to changed when Alice and Bob share a new room, even if they previously already shared a room." // logic, a server may add Alice to changed when Alice and Bob share a new room, even if they previously already shared a room."
func membershipEvents(res *types.Response) (userIDs []string) { func membershipEvents(res *types.Response) (joinUserIDs, leaveUserIDs []string) {
for _, room := range res.Rooms.Join { for _, room := range res.Rooms.Join {
for _, ev := range room.Timeline.Events { for _, ev := range room.Timeline.Events {
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil { if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil {
if strings.Contains(string(ev.Content), `"join"`) { if strings.Contains(string(ev.Content), `"join"`) {
userIDs = append(userIDs, *ev.StateKey) joinUserIDs = append(joinUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"leave"`) { } else if strings.Contains(string(ev.Content), `"leave"`) {
userIDs = append(userIDs, *ev.StateKey) leaveUserIDs = append(leaveUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"ban"`) { } else if strings.Contains(string(ev.Content), `"ban"`) {
userIDs = append(userIDs, *ev.StateKey) leaveUserIDs = append(leaveUserIDs, *ev.StateKey)
} }
} }
} }

View file

@ -142,6 +142,7 @@ Server correctly handles incoming m.device_list_update
Device deletion propagates over federation Device deletion propagates over federation
If remote user leaves room, changes device and rejoins we see update in sync If remote user leaves room, changes device and rejoins we see update in sync
If remote user leaves room, changes device and rejoins we see update in /keys/changes If remote user leaves room, changes device and rejoins we see update in /keys/changes
If remote user leaves room we no longer receive device updates
Can query remote device keys using POST after notification Can query remote device keys using POST after notification
Can add account data Can add account data
Can add account data to room Can add account data to room