diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 933170782..4baf2f42f 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -165,10 +165,6 @@ func NewDeviceListUpdater( // Start the device list updater, which will try to refresh any stale device lists. func (u *DeviceListUpdater) Start() error { - if err := u.cleanUp(); err != nil { - return fmt.Errorf("failed to cleanup stale device lists: %w", err) - } - for i := 0; i < len(u.workerChans); i++ { // Allocate a small buffer per channel. // If the buffer limit is reached, backpressure will cause the processing of EDUs @@ -197,10 +193,7 @@ func (u *DeviceListUpdater) Start() error { } // cleanUp removes stale device entries for users we don't share a room with anymore -func (u *DeviceListUpdater) cleanUp() error { - if u.rsAPI == nil { - return nil - } +func (u *DeviceListUpdater) CleanUp() error { staleUsers, err := u.db.StaleDeviceLists(u.process.Context(), []gomatrixserverlib.ServerName{}) if err != nil { return err @@ -211,7 +204,7 @@ func (u *DeviceListUpdater) cleanUp() error { // In polylith mode, the roomserver api might not be up yet, so we try again res := rsapi.QueryLeftUsersResponse{} for i := 0; i <= maxRetries; i++ { - if err = u.rsAPI.QueryLeftUsers(u.process.Context(), &rsapi.QueryLeftUsersRequest{UserIDs: staleUsers}, &res); err != nil { + if err = u.rsAPI.QueryLeftUsers(u.process.Context(), &rsapi.QueryLeftUsersRequest{StaleDeviceListUsers: staleUsers}, &res); err != nil { if i == maxRetries { return err } @@ -219,11 +212,11 @@ func (u *DeviceListUpdater) cleanUp() error { time.Sleep(time.Second * 3) } } - if len(res.UserIDs) == 0 { + if len(res.LeftUsers) == 0 { return nil } - logrus.Debugf("Deleting %d stale device list entries", len(res.UserIDs)) - return u.db.DeleteStaleDeviceLists(u.process.Context(), res.UserIDs) + logrus.Debugf("Deleting %d stale device list entries", len(res.LeftUsers)) + return u.db.DeleteStaleDeviceLists(u.process.Context(), res.LeftUsers) } func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex { diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 47329511c..275576773 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -16,9 +16,10 @@ package keyserver import ( "github.com/gorilla/mux" - rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/sirupsen/logrus" + rsapi "github.com/matrix-org/dendrite/roomserver/api" + fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/consumers" @@ -63,6 +64,12 @@ func NewInternalAPI( } updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8, rsAPI, cfg.Matrix.ServerName) // 8 workers TODO: configurable ap.Updater = updater + + // Remove users which we don't share a room with anymore + if err := updater.CleanUp(); err != nil { + logrus.WithError(err).Error("failed to cleanup stale device lists") + } + go func() { if err := updater.Start(); err != nil { logrus.WithError(err).Panicf("failed to start device list updater") diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 51008825b..76f8298ca 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -448,10 +448,14 @@ type QueryMembershipAtEventResponse struct { Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"` } +// QueryLeftUsersRequest is a request to calculate users that we (the server) don't share a +// a room with anymore. This is used to cleanup stale device list entries, where we would +// otherwise keep on trying to get device lists. type QueryLeftUsersRequest struct { - UserIDs []string `json:"user_ids"` + StaleDeviceListUsers []string `json:"user_ids"` } +// QueryLeftUsersResponse is the response to QueryLeftUsersRequest. type QueryLeftUsersResponse struct { - UserIDs []string `json:"user_ids"` + LeftUsers []string `json:"user_ids"` } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index d4479fe29..69d841dda 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -807,7 +807,7 @@ func (r *Queryer) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkS func (r *Queryer) QueryLeftUsers(ctx context.Context, req *api.QueryLeftUsersRequest, res *api.QueryLeftUsersResponse) error { var err error - res.UserIDs, err = r.DB.GetLeftUsers(ctx, req.UserIDs) + res.LeftUsers, err = r.DB.GetLeftUsers(ctx, req.StaleDeviceListUsers) return err } diff --git a/roomserver/storage/postgres/membership_table.go b/roomserver/storage/postgres/membership_table.go index ce8888b2f..d774b7892 100644 --- a/roomserver/storage/postgres/membership_table.go +++ b/roomserver/storage/postgres/membership_table.go @@ -21,12 +21,13 @@ import ( "fmt" "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/gomatrixserverlib" ) const membershipSchema = ` @@ -231,7 +232,7 @@ func (s *membershipStatements) SelectJoinedUsers( if err != nil { return nil, err } - + defer internal.CloseAndLogIfError(ctx, rows, "SelectJoinedUsers: rows.close() failed") var targetNID types.EventStateKeyNID for rows.Next() { if err = rows.Scan(&targetNID); err != nil { diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index f5d7c4dd0..725cc5bc7 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -1365,7 +1365,9 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs [ return result, nil } +// GetLeftUsers calculates users we (the server) don't share a room with anymore. func (d *Database) GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error) { + // Get the userNID for all users with a stale device list stateKeyNIDMap, err := d.EventStateKeyNIDs(ctx, userIDs) if err != nil { return nil, err @@ -1373,20 +1375,25 @@ func (d *Database) GetLeftUsers(ctx context.Context, userIDs []string) ([]string userNIDs := make([]types.EventStateKeyNID, 0, len(stateKeyNIDMap)) userNIDtoUserID := make(map[types.EventStateKeyNID]string, len(stateKeyNIDMap)) + // Create a map from userNID -> userID for userID, nid := range stateKeyNIDMap { userNIDs = append(userNIDs, nid) userNIDtoUserID[nid] = userID } + // Get all users whose membership is still join, knock or invite. stillJoinedUsersNIDs, err := d.MembershipTable.SelectJoinedUsers(ctx, nil, userNIDs) if err != nil { return nil, err } + // Remove joined users from the "user with stale devices" list, which contains left AND joined users for _, joinedUser := range stillJoinedUsersNIDs { delete(userNIDtoUserID, joinedUser) } + // The users still in our userNIDtoUserID map are the users we don't share a room with anymore, + // and the return value we are looking for. leftUsers := make([]string, 0, len(userNIDtoUserID)) for _, userID := range userNIDtoUserID { leftUsers = append(leftUsers, userID) diff --git a/roomserver/storage/sqlite3/membership_table.go b/roomserver/storage/sqlite3/membership_table.go index 66ed3f45f..8a60b359f 100644 --- a/roomserver/storage/sqlite3/membership_table.go +++ b/roomserver/storage/sqlite3/membership_table.go @@ -21,12 +21,13 @@ import ( "fmt" "strings" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/gomatrixserverlib" ) const membershipSchema = ` @@ -445,7 +446,7 @@ func (s *membershipStatements) SelectJoinedUsers( if err != nil { return nil, err } - + defer internal.CloseAndLogIfError(ctx, rows, "SelectJoinedUsers: rows.close() failed") var targetNID types.EventStateKeyNID for rows.Next() { if err = rows.Scan(&targetNID); err != nil {