diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index bf7527085..e2d2de48c 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -80,9 +80,6 @@ func main() { serverKeyAPI = base.ServerKeyAPIClient() } keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, cfg.Derived.ApplicationServices, keyAPI) - keyAPI.SetUserAPI(userAPI) rsImpl := roomserver.NewInternalAPI( base, keyRing, federation, @@ -99,6 +96,23 @@ func main() { } } + stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer) + + fsAPI := federationsender.NewInternalAPI( + base, federation, rsAPI, stateAPI, keyRing, + ) + if base.UseHTTPAPIs { + federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) + fsAPI = base.FederationSenderHTTPClient() + } + // The underlying roomserver implementation needs to be able to call the fedsender. + // This is different to rsAPI which can be the http client which doesn't need this dependency + rsImpl.SetFederationSenderAPI(fsAPI) + + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, cfg.Derived.ApplicationServices, keyAPI) + keyAPI.SetUserAPI(userAPI) + eduInputAPI := eduserver.NewInternalAPI( base, cache.New(), userAPI, ) @@ -113,19 +127,6 @@ func main() { asAPI = base.AppserviceHTTPClient() } - stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer) - - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, stateAPI, keyRing, - ) - if base.UseHTTPAPIs { - federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) - fsAPI = base.FederationSenderHTTPClient() - } - // The underlying roomserver implementation needs to be able to call the fedsender. - // This is different to rsAPI which can be the http client which doesn't need this dependency - rsImpl.SetFederationSenderAPI(fsAPI) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 9f9c2645c..281165e84 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -2,14 +2,38 @@ package api import ( "context" + "fmt" + "time" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) +// FederationClient is a subset of gomatrixserverlib.FederationClient functions which the fedsender +// implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in +// this interface are of type FederationClientError +type FederationClient interface { + GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error) + ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error) + QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error) +} + +// FederationClientError is returned from FederationClient methods in the event of a problem. +type FederationClientError struct { + Err string + RetryAfter time.Duration + Blacklisted bool +} + +func (e *FederationClientError) Error() string { + return fmt.Sprintf("%s - (retry_after=%d, blacklisted=%v", e.Err, e.RetryAfter, e.Blacklisted) +} + // FederationSenderInternalAPI is used to query information from the federation sender. type FederationSenderInternalAPI interface { + FederationClient + // PerformDirectoryLookup looks up a remote room ID from a room alias. PerformDirectoryLookup( ctx context.Context, diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 647e3fcb8..956981377 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -1,12 +1,17 @@ package internal import ( + "context" + "time" + + "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/roomserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" ) // FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI @@ -14,7 +19,7 @@ type FederationSenderInternalAPI struct { db storage.Database cfg *config.FederationSender statistics *statistics.Statistics - rsAPI api.RoomserverInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing queues *queue.OutgoingQueues @@ -22,7 +27,7 @@ type FederationSenderInternalAPI struct { func NewFederationSenderInternalAPI( db storage.Database, cfg *config.FederationSender, - rsAPI api.RoomserverInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, statistics *statistics.Statistics, @@ -38,3 +43,83 @@ func NewFederationSenderInternalAPI( queues: queues, } } + +func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) { + stats := a.statistics.ForServer(s) + if stats.Blacklisted() { + return stats, &api.FederationClientError{ + Blacklisted: true, + } + } + // Call BackoffIfRequired with a closed channel to make it return immediately. + // It will return the duration to backoff for. + var duration time.Duration + interrupt := make(chan bool) + close(interrupt) + var bo atomic.Bool + duration, _ = stats.BackoffIfRequired(bo, interrupt) + if duration > 0 { + return stats, &api.FederationClientError{ + RetryAfter: duration, + } + } + + return stats, nil +} + +func (a *FederationSenderInternalAPI) GetUserDevices( + ctx context.Context, s gomatrixserverlib.ServerName, userID string, +) (gomatrixserverlib.RespUserDevices, error) { + var res gomatrixserverlib.RespUserDevices + stats, err := a.isBlacklistedOrBackingOff(s) + if err != nil { + return res, err + } + res, err = a.federation.GetUserDevices(ctx, s, userID) + if err != nil { + stats.Failure() + return res, &api.FederationClientError{ + Err: err.Error(), + } + } + stats.Success() + return res, nil +} + +func (a *FederationSenderInternalAPI) ClaimKeys( + ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, +) (gomatrixserverlib.RespClaimKeys, error) { + var res gomatrixserverlib.RespClaimKeys + stats, err := a.isBlacklistedOrBackingOff(s) + if err != nil { + return res, err + } + res, err = a.federation.ClaimKeys(ctx, s, oneTimeKeys) + if err != nil { + stats.Failure() + return res, &api.FederationClientError{ + Err: err.Error(), + } + } + stats.Success() + return res, nil +} + +func (a *FederationSenderInternalAPI) QueryKeys( + ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, +) (gomatrixserverlib.RespQueryKeys, error) { + var res gomatrixserverlib.RespQueryKeys + stats, err := a.isBlacklistedOrBackingOff(s) + if err != nil { + return res, err + } + res, err = a.federation.QueryKeys(ctx, s, keys) + if err != nil { + stats.Failure() + return res, &api.FederationClientError{ + Err: err.Error(), + } + } + stats.Success() + return res, nil +} diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 13c2c45aa..846c6d746 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/gomatrix" + "github.com/matrix-org/gomatrixserverlib" "github.com/opentracing/opentracing-go" ) @@ -21,6 +22,10 @@ const ( FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" + + FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices" + FederationSenderClaimKeysPath = "/federationsender/client/claimKeys" + FederationSenderQueryKeysPath = "/federationsender/client/queryKeys" ) // NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. @@ -133,3 +138,93 @@ func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU( apiURL := h.federationSenderURL + FederationSenderPerformBroadcastEDUPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +type getUserDevices struct { + S gomatrixserverlib.ServerName + UserID string + Res *gomatrixserverlib.RespUserDevices + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) GetUserDevices( + ctx context.Context, s gomatrixserverlib.ServerName, userID string, +) (gomatrixserverlib.RespUserDevices, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetUserDevices") + defer span.Finish() + + var result gomatrixserverlib.RespUserDevices + request := getUserDevices{ + S: s, + UserID: userID, + } + var response getUserDevices + apiURL := h.federationSenderURL + FederationSenderGetUserDevicesPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} + +type claimKeys struct { + S gomatrixserverlib.ServerName + OneTimeKeys map[string]map[string]string + Res *gomatrixserverlib.RespClaimKeys + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) ClaimKeys( + ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, +) (gomatrixserverlib.RespClaimKeys, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "ClaimKeys") + defer span.Finish() + + var result gomatrixserverlib.RespClaimKeys + request := claimKeys{ + S: s, + OneTimeKeys: oneTimeKeys, + } + var response claimKeys + apiURL := h.federationSenderURL + FederationSenderClaimKeysPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} + +type queryKeys struct { + S gomatrixserverlib.ServerName + Keys map[string][]string + Res *gomatrixserverlib.RespQueryKeys + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) QueryKeys( + ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, +) (gomatrixserverlib.RespQueryKeys, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeys") + defer span.Finish() + + var result gomatrixserverlib.RespQueryKeys + request := queryKeys{ + S: s, + Keys: keys, + } + var response queryKeys + apiURL := h.federationSenderURL + FederationSenderQueryKeysPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go index f02cbd12d..b18255760 100644 --- a/federationsender/inthttp/server.go +++ b/federationsender/inthttp/server.go @@ -109,4 +109,70 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle( + FederationSenderGetUserDevicesPath, + httputil.MakeInternalAPI("GetUserDevices", func(req *http.Request) util.JSONResponse { + var request getUserDevices + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.GetUserDevices(req.Context(), request.S, request.UserID) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderClaimKeysPath, + httputil.MakeInternalAPI("ClaimKeys", func(req *http.Request) util.JSONResponse { + var request claimKeys + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.ClaimKeys(req.Context(), request.S, request.OneTimeKeys) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderQueryKeysPath, + httputil.MakeInternalAPI("QueryKeys", func(req *http.Request) util.JSONResponse { + var request queryKeys + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.QueryKeys(req.Context(), request.S, request.Keys) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) } diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index dd8fb7008..7ef5c3ee6 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -22,6 +22,7 @@ import ( "sync" "time" + fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -65,7 +66,7 @@ type DeviceListUpdater struct { db DeviceListUpdaterDatabase producer KeyChangeProducer - fedClient *gomatrixserverlib.FederationClient + fedClient fedsenderapi.FederationClient workerChans []chan gomatrixserverlib.ServerName // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will @@ -103,7 +104,7 @@ type KeyChangeProducer interface { // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. func NewDeviceListUpdater( - db DeviceListUpdaterDatabase, producer KeyChangeProducer, fedClient *gomatrixserverlib.FederationClient, + db DeviceListUpdaterDatabase, producer KeyChangeProducer, fedClient fedsenderapi.FederationClient, numWorkers int, ) *DeviceListUpdater { return &DeviceListUpdater{ diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 31fb12367..53afe0a60 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -22,6 +22,7 @@ import ( "sync" "time" + fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" @@ -36,7 +37,7 @@ import ( type KeyInternalAPI struct { DB storage.Database ThisServer gomatrixserverlib.ServerName - FedClient *gomatrixserverlib.FederationClient + FedClient fedsenderapi.FederationClient UserAPI userapi.UserInternalAPI Producer *producers.KeyChange Updater *DeviceListUpdater diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 041369388..2e5613632 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -17,13 +17,13 @@ package keyserver import ( "github.com/Shopify/sarama" "github.com/gorilla/mux" + fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -36,7 +36,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.KeyServer, fedClient *gomatrixserverlib.FederationClient, producer sarama.SyncProducer, + cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer, ) api.KeyInternalAPI { db, err := storage.NewDatabase(&cfg.Database) if err != nil {