mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-22 05:23:09 -06:00
Review comments
This commit is contained in:
parent
fe9ac2511b
commit
4291400cd3
|
|
@ -165,10 +165,6 @@ func NewDeviceListUpdater(
|
||||||
|
|
||||||
// Start the device list updater, which will try to refresh any stale device lists.
|
// Start the device list updater, which will try to refresh any stale device lists.
|
||||||
func (u *DeviceListUpdater) Start() error {
|
func (u *DeviceListUpdater) Start() error {
|
||||||
if err := u.cleanUp(); err != nil {
|
|
||||||
return fmt.Errorf("failed to cleanup stale device lists: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(u.workerChans); i++ {
|
for i := 0; i < len(u.workerChans); i++ {
|
||||||
// Allocate a small buffer per channel.
|
// Allocate a small buffer per channel.
|
||||||
// If the buffer limit is reached, backpressure will cause the processing of EDUs
|
// If the buffer limit is reached, backpressure will cause the processing of EDUs
|
||||||
|
|
@ -197,10 +193,7 @@ func (u *DeviceListUpdater) Start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanUp removes stale device entries for users we don't share a room with anymore
|
// cleanUp removes stale device entries for users we don't share a room with anymore
|
||||||
func (u *DeviceListUpdater) cleanUp() error {
|
func (u *DeviceListUpdater) CleanUp() error {
|
||||||
if u.rsAPI == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
staleUsers, err := u.db.StaleDeviceLists(u.process.Context(), []gomatrixserverlib.ServerName{})
|
staleUsers, err := u.db.StaleDeviceLists(u.process.Context(), []gomatrixserverlib.ServerName{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -211,7 +204,7 @@ func (u *DeviceListUpdater) cleanUp() error {
|
||||||
// In polylith mode, the roomserver api might not be up yet, so we try again
|
// In polylith mode, the roomserver api might not be up yet, so we try again
|
||||||
res := rsapi.QueryLeftUsersResponse{}
|
res := rsapi.QueryLeftUsersResponse{}
|
||||||
for i := 0; i <= maxRetries; i++ {
|
for i := 0; i <= maxRetries; i++ {
|
||||||
if err = u.rsAPI.QueryLeftUsers(u.process.Context(), &rsapi.QueryLeftUsersRequest{UserIDs: staleUsers}, &res); err != nil {
|
if err = u.rsAPI.QueryLeftUsers(u.process.Context(), &rsapi.QueryLeftUsersRequest{StaleDeviceListUsers: staleUsers}, &res); err != nil {
|
||||||
if i == maxRetries {
|
if i == maxRetries {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -219,11 +212,11 @@ func (u *DeviceListUpdater) cleanUp() error {
|
||||||
time.Sleep(time.Second * 3)
|
time.Sleep(time.Second * 3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(res.UserIDs) == 0 {
|
if len(res.LeftUsers) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
logrus.Debugf("Deleting %d stale device list entries", len(res.UserIDs))
|
logrus.Debugf("Deleting %d stale device list entries", len(res.LeftUsers))
|
||||||
return u.db.DeleteStaleDeviceLists(u.process.Context(), res.UserIDs)
|
return u.db.DeleteStaleDeviceLists(u.process.Context(), res.LeftUsers)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
|
func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
|
||||||
|
|
|
||||||
|
|
@ -16,9 +16,10 @@ package keyserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
||||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/consumers"
|
"github.com/matrix-org/dendrite/keyserver/consumers"
|
||||||
|
|
@ -63,6 +64,12 @@ func NewInternalAPI(
|
||||||
}
|
}
|
||||||
updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8, rsAPI, cfg.Matrix.ServerName) // 8 workers TODO: configurable
|
updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8, rsAPI, cfg.Matrix.ServerName) // 8 workers TODO: configurable
|
||||||
ap.Updater = updater
|
ap.Updater = updater
|
||||||
|
|
||||||
|
// Remove users which we don't share a room with anymore
|
||||||
|
if err := updater.CleanUp(); err != nil {
|
||||||
|
logrus.WithError(err).Error("failed to cleanup stale device lists")
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := updater.Start(); err != nil {
|
if err := updater.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start device list updater")
|
logrus.WithError(err).Panicf("failed to start device list updater")
|
||||||
|
|
|
||||||
|
|
@ -448,10 +448,14 @@ type QueryMembershipAtEventResponse struct {
|
||||||
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryLeftUsersRequest is a request to calculate users that we (the server) don't share a
|
||||||
|
// a room with anymore. This is used to cleanup stale device list entries, where we would
|
||||||
|
// otherwise keep on trying to get device lists.
|
||||||
type QueryLeftUsersRequest struct {
|
type QueryLeftUsersRequest struct {
|
||||||
UserIDs []string `json:"user_ids"`
|
StaleDeviceListUsers []string `json:"user_ids"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryLeftUsersResponse is the response to QueryLeftUsersRequest.
|
||||||
type QueryLeftUsersResponse struct {
|
type QueryLeftUsersResponse struct {
|
||||||
UserIDs []string `json:"user_ids"`
|
LeftUsers []string `json:"user_ids"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -807,7 +807,7 @@ func (r *Queryer) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkS
|
||||||
|
|
||||||
func (r *Queryer) QueryLeftUsers(ctx context.Context, req *api.QueryLeftUsersRequest, res *api.QueryLeftUsersResponse) error {
|
func (r *Queryer) QueryLeftUsers(ctx context.Context, req *api.QueryLeftUsersRequest, res *api.QueryLeftUsersResponse) error {
|
||||||
var err error
|
var err error
|
||||||
res.UserIDs, err = r.DB.GetLeftUsers(ctx, req.UserIDs)
|
res.LeftUsers, err = r.DB.GetLeftUsers(ctx, req.StaleDeviceListUsers)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,12 +21,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas"
|
"github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const membershipSchema = `
|
const membershipSchema = `
|
||||||
|
|
@ -231,7 +232,7 @@ func (s *membershipStatements) SelectJoinedUsers(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "SelectJoinedUsers: rows.close() failed")
|
||||||
var targetNID types.EventStateKeyNID
|
var targetNID types.EventStateKeyNID
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err = rows.Scan(&targetNID); err != nil {
|
if err = rows.Scan(&targetNID); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -1365,7 +1365,9 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs [
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLeftUsers calculates users we (the server) don't share a room with anymore.
|
||||||
func (d *Database) GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error) {
|
func (d *Database) GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error) {
|
||||||
|
// Get the userNID for all users with a stale device list
|
||||||
stateKeyNIDMap, err := d.EventStateKeyNIDs(ctx, userIDs)
|
stateKeyNIDMap, err := d.EventStateKeyNIDs(ctx, userIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -1373,20 +1375,25 @@ func (d *Database) GetLeftUsers(ctx context.Context, userIDs []string) ([]string
|
||||||
|
|
||||||
userNIDs := make([]types.EventStateKeyNID, 0, len(stateKeyNIDMap))
|
userNIDs := make([]types.EventStateKeyNID, 0, len(stateKeyNIDMap))
|
||||||
userNIDtoUserID := make(map[types.EventStateKeyNID]string, len(stateKeyNIDMap))
|
userNIDtoUserID := make(map[types.EventStateKeyNID]string, len(stateKeyNIDMap))
|
||||||
|
// Create a map from userNID -> userID
|
||||||
for userID, nid := range stateKeyNIDMap {
|
for userID, nid := range stateKeyNIDMap {
|
||||||
userNIDs = append(userNIDs, nid)
|
userNIDs = append(userNIDs, nid)
|
||||||
userNIDtoUserID[nid] = userID
|
userNIDtoUserID[nid] = userID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get all users whose membership is still join, knock or invite.
|
||||||
stillJoinedUsersNIDs, err := d.MembershipTable.SelectJoinedUsers(ctx, nil, userNIDs)
|
stillJoinedUsersNIDs, err := d.MembershipTable.SelectJoinedUsers(ctx, nil, userNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove joined users from the "user with stale devices" list, which contains left AND joined users
|
||||||
for _, joinedUser := range stillJoinedUsersNIDs {
|
for _, joinedUser := range stillJoinedUsersNIDs {
|
||||||
delete(userNIDtoUserID, joinedUser)
|
delete(userNIDtoUserID, joinedUser)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The users still in our userNIDtoUserID map are the users we don't share a room with anymore,
|
||||||
|
// and the return value we are looking for.
|
||||||
leftUsers := make([]string, 0, len(userNIDtoUserID))
|
leftUsers := make([]string, 0, len(userNIDtoUserID))
|
||||||
for _, userID := range userNIDtoUserID {
|
for _, userID := range userNIDtoUserID {
|
||||||
leftUsers = append(leftUsers, userID)
|
leftUsers = append(leftUsers, userID)
|
||||||
|
|
|
||||||
|
|
@ -21,12 +21,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
|
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const membershipSchema = `
|
const membershipSchema = `
|
||||||
|
|
@ -445,7 +446,7 @@ func (s *membershipStatements) SelectJoinedUsers(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "SelectJoinedUsers: rows.close() failed")
|
||||||
var targetNID types.EventStateKeyNID
|
var targetNID types.EventStateKeyNID
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err = rows.Scan(&targetNID); err != nil {
|
if err = rows.Scan(&targetNID); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue