mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Merge branch 'main' into neilalexander/syncsnapshot
This commit is contained in:
commit
1a884c9d68
|
|
@ -2,20 +2,23 @@ package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
func AdminEvacuateRoom(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
|
func AdminEvacuateRoom(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
|
||||||
|
|
@ -144,12 +147,6 @@ func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userap
|
||||||
}
|
}
|
||||||
|
|
||||||
func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, natsClient *nats.Conn) util.JSONResponse {
|
func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, natsClient *nats.Conn) util.JSONResponse {
|
||||||
if device.AccountType != userapi.AccountTypeAdmin {
|
|
||||||
return util.JSONResponse{
|
|
||||||
Code: http.StatusForbidden,
|
|
||||||
JSON: jsonerror.Forbidden("This API can only be used by admin users."),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := natsClient.RequestMsg(nats.NewMsg(cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex)), time.Second*10)
|
_, err := natsClient.RequestMsg(nats.NewMsg(cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex)), time.Second*10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("failed to publish nats message")
|
logrus.WithError(err).Error("failed to publish nats message")
|
||||||
|
|
@ -160,3 +157,37 @@ func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *userapi.Devi
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI api.ClientKeyAPI) util.JSONResponse {
|
||||||
|
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||||
|
if err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
userID := vars["userID"]
|
||||||
|
|
||||||
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
if err != nil {
|
||||||
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
|
}
|
||||||
|
if domain == cfg.Matrix.ServerName {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.InvalidParam("Can not mark local device list as stale"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = keyAPI.PerformMarkAsStaleIfNeeded(req.Context(), &api.PerformMarkAsStaleRequest{
|
||||||
|
UserID: userID,
|
||||||
|
Domain: domain,
|
||||||
|
}, &struct{}{})
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusInternalServerError,
|
||||||
|
JSON: jsonerror.Unknown(fmt.Sprintf("Failed to mark device list as stale: %s", err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: struct{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -163,11 +163,17 @@ func Setup(
|
||||||
).Methods(http.MethodPost, http.MethodOptions)
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
dendriteAdminRouter.Handle("/admin/fulltext/reindex",
|
dendriteAdminRouter.Handle("/admin/fulltext/reindex",
|
||||||
httputil.MakeAuthAPI("admin_fultext_reindex", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
httputil.MakeAdminAPI("admin_fultext_reindex", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
return AdminReindex(req, cfg, device, natsClient)
|
return AdminReindex(req, cfg, device, natsClient)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodGet, http.MethodOptions)
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
|
dendriteAdminRouter.Handle("/admin/refreshDevices/{userID}",
|
||||||
|
httputil.MakeAdminAPI("admin_refresh_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
|
return AdminMarkAsStale(req, cfg, keyAPI)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
// server notifications
|
// server notifications
|
||||||
if cfg.Matrix.ServerNotices.Enabled {
|
if cfg.Matrix.ServerNotices.Enabled {
|
||||||
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,11 @@ the full user ID is `@alice:domain.com` then the local part is `alice`.
|
||||||
This endpoint instructs Dendrite to reindex all searchable events (`m.room.message`, `m.room.topic` and `m.room.name`). An empty JSON body will be returned immediately.
|
This endpoint instructs Dendrite to reindex all searchable events (`m.room.message`, `m.room.topic` and `m.room.name`). An empty JSON body will be returned immediately.
|
||||||
Indexing is done in the background, the server logs every 1000 events (or below) when they are being indexed. Once reindexing is done, you'll see something along the lines `Indexed 69586 events in 53.68223182s` in your debug logs.
|
Indexing is done in the background, the server logs every 1000 events (or below) when they are being indexed. Once reindexing is done, you'll see something along the lines `Indexed 69586 events in 53.68223182s` in your debug logs.
|
||||||
|
|
||||||
|
## POST `/_dendrite/admin/refreshDevices/{userID}`
|
||||||
|
|
||||||
|
This endpoint instructs Dendrite to immediately query `/devices/{userID}` on a federated server. An empty JSON body will be returned on success, updating all locally stored user devices/keys. This can be used to possibly resolve E2EE issues, where the remote user can't decrypt messages.
|
||||||
|
|
||||||
|
|
||||||
## POST `/_synapse/admin/v1/send_server_notice`
|
## POST `/_synapse/admin/v1/send_server_notice`
|
||||||
|
|
||||||
Request body format:
|
Request body format:
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -22,7 +22,7 @@ require (
|
||||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220929155203-377b320ad5b1
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220929190355-91d455cd3621
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c
|
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.15
|
github.com/mattn/go-sqlite3 v1.14.15
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -384,8 +384,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220929155203-377b320ad5b1 h1:kS4jgi/DshFGJArjF14b1u42fVWC+8LMVdSVLdQYq3E=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220929190355-91d455cd3621 h1:a8IaoSPDxevkgXnOUrtIW9AqVNvXBJAG0gtnX687S7g=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220929155203-377b320ad5b1/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220929190355-91d455cd3621/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c h1:iCHLYwwlPsf4TYFrvhKdhQoAM2lXzcmDZYqwBNWcnVk=
|
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c h1:iCHLYwwlPsf4TYFrvhKdhQoAM2lXzcmDZYqwBNWcnVk=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ type ClientKeyAPI interface {
|
||||||
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) error
|
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) error
|
||||||
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
||||||
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) error
|
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) error
|
||||||
|
PerformMarkAsStaleIfNeeded(ctx context.Context, req *PerformMarkAsStaleRequest, res *struct{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// API functions required by the userapi
|
// API functions required by the userapi
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ package internal
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"net"
|
"net"
|
||||||
|
|
@ -31,6 +32,7 @@ import (
|
||||||
|
|
||||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -45,6 +47,9 @@ var (
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const defaultWaitTime = time.Minute
|
||||||
|
const requestTimeout = time.Second * 30
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(
|
prometheus.MustRegister(
|
||||||
deviceListUpdateCount,
|
deviceListUpdateCount,
|
||||||
|
|
@ -80,6 +85,7 @@ func init() {
|
||||||
// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
|
// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
|
||||||
// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
|
// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
|
||||||
type DeviceListUpdater struct {
|
type DeviceListUpdater struct {
|
||||||
|
process *process.ProcessContext
|
||||||
// A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1
|
// A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1
|
||||||
// request to the remote server and race.
|
// request to the remote server and race.
|
||||||
// TODO: Put in an LRU cache to bound growth
|
// TODO: Put in an LRU cache to bound growth
|
||||||
|
|
@ -131,10 +137,12 @@ type KeyChangeProducer interface {
|
||||||
|
|
||||||
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
|
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
|
||||||
func NewDeviceListUpdater(
|
func NewDeviceListUpdater(
|
||||||
db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
process *process.ProcessContext, db DeviceListUpdaterDatabase,
|
||||||
|
api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||||
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
||||||
) *DeviceListUpdater {
|
) *DeviceListUpdater {
|
||||||
return &DeviceListUpdater{
|
return &DeviceListUpdater{
|
||||||
|
process: process,
|
||||||
userIDToMutex: make(map[string]*sync.Mutex),
|
userIDToMutex: make(map[string]*sync.Mutex),
|
||||||
mu: &sync.Mutex{},
|
mu: &sync.Mutex{},
|
||||||
db: db,
|
db: db,
|
||||||
|
|
@ -234,7 +242,7 @@ func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib.
|
||||||
"prev_ids": event.PrevID,
|
"prev_ids": event.PrevID,
|
||||||
"display_name": event.DeviceDisplayName,
|
"display_name": event.DeviceDisplayName,
|
||||||
"deleted": event.Deleted,
|
"deleted": event.Deleted,
|
||||||
}).Info("DeviceListUpdater.Update")
|
}).Trace("DeviceListUpdater.Update")
|
||||||
|
|
||||||
// if we haven't missed anything update the database and notify users
|
// if we haven't missed anything update the database and notify users
|
||||||
if exists || event.Deleted {
|
if exists || event.Deleted {
|
||||||
|
|
@ -378,74 +386,99 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
|
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
|
||||||
deviceListUpdateCount.WithLabelValues(string(serverName)).Inc()
|
ctx := u.process.Context()
|
||||||
requestTimeout := time.Second * 30 // max amount of time we want to spend on each request
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
||||||
defer cancel()
|
|
||||||
logger := util.GetLogger(ctx).WithField("server_name", serverName)
|
logger := util.GetLogger(ctx).WithField("server_name", serverName)
|
||||||
waitTime := 2 * time.Second
|
deviceListUpdateCount.WithLabelValues(string(serverName)).Inc()
|
||||||
// fetch stale device lists
|
|
||||||
|
waitTime := defaultWaitTime // How long should we wait to try again?
|
||||||
|
successCount := 0 // How many user requests failed?
|
||||||
|
|
||||||
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
|
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("Failed to load stale device lists")
|
logger.WithError(err).Error("Failed to load stale device lists")
|
||||||
return waitTime, true
|
return waitTime, true
|
||||||
}
|
}
|
||||||
failCount := 0
|
|
||||||
|
|
||||||
userLoop:
|
defer func() {
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
if ctx.Err() != nil {
|
// always clear the channel to unblock Update calls regardless of success/failure
|
||||||
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
u.clearChannel(userID)
|
||||||
failCount += 1
|
}
|
||||||
waitTime = time.Minute * 10
|
}()
|
||||||
|
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
userWait, err := u.processServerUser(ctx, serverName, userID)
|
||||||
|
if err != nil {
|
||||||
|
if userWait > waitTime {
|
||||||
|
waitTime = userWait
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
successCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
allUsersSucceeded := successCount == len(userIDs)
|
||||||
|
if !allUsersSucceeded {
|
||||||
|
logger.WithFields(logrus.Fields{
|
||||||
|
"total": len(userIDs),
|
||||||
|
"succeeded": successCount,
|
||||||
|
"failed": len(userIDs) - successCount,
|
||||||
|
"wait_time": waitTime,
|
||||||
|
}).Warn("Failed to query device keys for some users")
|
||||||
|
}
|
||||||
|
return waitTime, !allUsersSucceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *DeviceListUpdater) processServerUser(ctx context.Context, serverName gomatrixserverlib.ServerName, userID string) (time.Duration, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
logger := util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||||
|
"server_name": serverName,
|
||||||
|
"user_id": userID,
|
||||||
|
})
|
||||||
|
|
||||||
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failCount += 1
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
select {
|
return time.Minute * 10, err
|
||||||
case <-ctx.Done():
|
|
||||||
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
|
||||||
waitTime = time.Minute * 10
|
|
||||||
break userLoop
|
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
|
case *json.UnmarshalTypeError, *json.SyntaxError:
|
||||||
|
logger.WithError(err).Debugf("Device list update for %q contained invalid JSON", userID)
|
||||||
|
return defaultWaitTime, nil
|
||||||
case *fedsenderapi.FederationClientError:
|
case *fedsenderapi.FederationClientError:
|
||||||
if e.RetryAfter > 0 {
|
if e.RetryAfter > 0 {
|
||||||
waitTime = e.RetryAfter
|
return e.RetryAfter, err
|
||||||
} else if e.Blacklisted {
|
} else if e.Blacklisted {
|
||||||
waitTime = time.Hour * 8
|
return time.Hour * 8, err
|
||||||
break userLoop
|
|
||||||
} else if e.Code >= 300 {
|
} else if e.Code >= 300 {
|
||||||
// We didn't get a real FederationClientError (e.g. in polylith mode, where gomatrix.HTTPError
|
// We didn't get a real FederationClientError (e.g. in polylith mode, where gomatrix.HTTPError
|
||||||
// are "converted" to FederationClientError), but we probably shouldn't hit them every $waitTime seconds.
|
// are "converted" to FederationClientError), but we probably shouldn't hit them every $waitTime seconds.
|
||||||
waitTime = time.Hour
|
return time.Hour, err
|
||||||
break userLoop
|
|
||||||
}
|
}
|
||||||
case net.Error:
|
case net.Error:
|
||||||
// Use the default waitTime, if it's a timeout.
|
// Use the default waitTime, if it's a timeout.
|
||||||
// It probably doesn't make sense to try further users.
|
// It probably doesn't make sense to try further users.
|
||||||
if !e.Timeout() {
|
if !e.Timeout() {
|
||||||
waitTime = time.Minute * 10
|
logger.WithError(e).Debug("GetUserDevices returned net.Error")
|
||||||
logger.WithError(e).Error("GetUserDevices returned net.Error")
|
return time.Minute * 10, err
|
||||||
break userLoop
|
|
||||||
}
|
}
|
||||||
case gomatrix.HTTPError:
|
case gomatrix.HTTPError:
|
||||||
// The remote server returned an error, give it some time to recover.
|
// The remote server returned an error, give it some time to recover.
|
||||||
// This is to avoid spamming remote servers, which may not be Matrix servers anymore.
|
// This is to avoid spamming remote servers, which may not be Matrix servers anymore.
|
||||||
if e.Code >= 300 {
|
if e.Code >= 300 {
|
||||||
waitTime = time.Hour
|
logger.WithError(e).Debug("GetUserDevices returned gomatrix.HTTPError")
|
||||||
logger.WithError(e).Error("GetUserDevices returned gomatrix.HTTPError")
|
return time.Hour, err
|
||||||
break userLoop
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
// Something else failed
|
// Something else failed
|
||||||
waitTime = time.Minute * 10
|
logger.WithError(err).Debugf("GetUserDevices returned unknown error type: %T", err)
|
||||||
logger.WithError(err).WithField("user_id", userID).Debugf("GetUserDevices returned unknown error type: %T", err)
|
return time.Minute * 10, err
|
||||||
break userLoop
|
|
||||||
}
|
}
|
||||||
continue
|
}
|
||||||
|
if res.UserID != userID {
|
||||||
|
logger.WithError(err).Debugf("User ID %q in device list update response doesn't match expected %q", res.UserID, userID)
|
||||||
|
return defaultWaitTime, nil
|
||||||
}
|
}
|
||||||
if res.MasterKey != nil || res.SelfSigningKey != nil {
|
if res.MasterKey != nil || res.SelfSigningKey != nil {
|
||||||
uploadReq := &api.PerformUploadDeviceKeysRequest{
|
uploadReq := &api.PerformUploadDeviceKeysRequest{
|
||||||
|
|
@ -466,23 +499,10 @@ userLoop:
|
||||||
}
|
}
|
||||||
err = u.updateDeviceList(&res)
|
err = u.updateDeviceList(&res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("user_id", userID).Error("Fetched device list but failed to store/emit it")
|
logger.WithError(err).Error("Fetched device list but failed to store/emit it")
|
||||||
failCount += 1
|
return defaultWaitTime, err
|
||||||
}
|
}
|
||||||
}
|
return defaultWaitTime, nil
|
||||||
if failCount > 0 {
|
|
||||||
logger.WithFields(logrus.Fields{
|
|
||||||
"total": len(userIDs),
|
|
||||||
"failed": failCount,
|
|
||||||
"skipped": len(userIDs) - failCount,
|
|
||||||
"waittime": waitTime,
|
|
||||||
}).Warn("Failed to query device keys for some users")
|
|
||||||
}
|
|
||||||
for _, userID := range userIDs {
|
|
||||||
// always clear the channel to unblock Update calls regardless of success/failure
|
|
||||||
u.clearChannel(userID)
|
|
||||||
}
|
|
||||||
return waitTime, failCount > 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {
|
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -146,7 +147,7 @@ func TestUpdateHavePrevID(t *testing.T) {
|
||||||
}
|
}
|
||||||
ap := &mockDeviceListUpdaterAPI{}
|
ap := &mockDeviceListUpdaterAPI{}
|
||||||
producer := &mockKeyChangeProducer{}
|
producer := &mockKeyChangeProducer{}
|
||||||
updater := NewDeviceListUpdater(db, ap, producer, nil, 1)
|
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1)
|
||||||
event := gomatrixserverlib.DeviceListUpdateEvent{
|
event := gomatrixserverlib.DeviceListUpdateEvent{
|
||||||
DeviceDisplayName: "Foo Bar",
|
DeviceDisplayName: "Foo Bar",
|
||||||
Deleted: false,
|
Deleted: false,
|
||||||
|
|
@ -218,7 +219,7 @@ func TestUpdateNoPrevID(t *testing.T) {
|
||||||
`)),
|
`)),
|
||||||
}, nil
|
}, nil
|
||||||
})
|
})
|
||||||
updater := NewDeviceListUpdater(db, ap, producer, fedClient, 2)
|
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2)
|
||||||
if err := updater.Start(); err != nil {
|
if err := updater.Start(); err != nil {
|
||||||
t.Fatalf("failed to start updater: %s", err)
|
t.Fatalf("failed to start updater: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -287,7 +288,7 @@ func TestDebounce(t *testing.T) {
|
||||||
close(incomingFedReq)
|
close(incomingFedReq)
|
||||||
return <-fedCh, nil
|
return <-fedCh, nil
|
||||||
})
|
})
|
||||||
updater := NewDeviceListUpdater(db, ap, producer, fedClient, 1)
|
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1)
|
||||||
if err := updater.Start(); err != nil {
|
if err := updater.Start(); err != nil {
|
||||||
t.Fatalf("failed to start updater: %s", err)
|
t.Fatalf("failed to start updater: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -228,14 +228,21 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query
|
||||||
// PerformMarkAsStaleIfNeeded marks the users device list as stale, if the given deviceID is not present
|
// PerformMarkAsStaleIfNeeded marks the users device list as stale, if the given deviceID is not present
|
||||||
// in our database.
|
// in our database.
|
||||||
func (a *KeyInternalAPI) PerformMarkAsStaleIfNeeded(ctx context.Context, req *api.PerformMarkAsStaleRequest, res *struct{}) error {
|
func (a *KeyInternalAPI) PerformMarkAsStaleIfNeeded(ctx context.Context, req *api.PerformMarkAsStaleRequest, res *struct{}) error {
|
||||||
knownDevices, err := a.DB.DeviceKeysForUser(ctx, req.UserID, []string{req.DeviceID}, true)
|
knownDevices, err := a.DB.DeviceKeysForUser(ctx, req.UserID, []string{}, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(knownDevices) == 0 {
|
if len(knownDevices) == 0 {
|
||||||
return a.Updater.ManualUpdate(ctx, req.Domain, req.UserID)
|
return fmt.Errorf("unknown user %s", req.UserID)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
for i := range knownDevices {
|
||||||
|
if knownDevices[i].DeviceID == req.DeviceID {
|
||||||
|
return nil // we already know about this device
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.Updater.ManualUpdate(ctx, req.Domain, req.UserID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ func NewInternalAPI(
|
||||||
FedClient: fedClient,
|
FedClient: fedClient,
|
||||||
Producer: keyChangeProducer,
|
Producer: keyChangeProducer,
|
||||||
}
|
}
|
||||||
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||||
ap.Updater = updater
|
ap.Updater = updater
|
||||||
go func() {
|
go func() {
|
||||||
if err := updater.Start(); err != nil {
|
if err := updater.Start(); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue