From 9cf65c51f062a91aec2c516dca878ddbe1737f13 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Mar 2021 14:30:49 +0000 Subject: [PATCH] Use gomatrixserverlib.Client since that allows us to disable TLS validation using the config --- appservice/appservice.go | 12 +++++------- appservice/query/query.go | 17 ++++------------- appservice/workers/transaction_scheduler.go | 16 +++++----------- build/gobind/monolith.go | 2 +- cmd/dendrite-demo-libp2p/main.go | 2 +- cmd/dendrite-demo-yggdrasil/main.go | 2 +- cmd/dendrite-monolith-server/main.go | 6 ++++-- .../personalities/appservice.go | 3 ++- 8 files changed, 23 insertions(+), 37 deletions(-) diff --git a/appservice/appservice.go b/appservice/appservice.go index d783c7eb7..95b8d95c2 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -16,9 +16,7 @@ package appservice import ( "context" - "net/http" "sync" - "time" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" @@ -33,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -45,6 +44,7 @@ func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQuer // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( base *setup.BaseDendrite, + client *gomatrixserverlib.Client, userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { @@ -79,10 +79,8 @@ func NewInternalAPI( // Create appserivce query API with an HTTP client that will be used for all // outbound and inbound requests (inbound only for the internal API) appserviceQueryAPI := &query.AppServiceQueryAPI{ - HTTPClient: &http.Client{ - Timeout: time.Second * 30, - }, - Cfg: base.Cfg, + HTTPClient: client, + Cfg: base.Cfg, } // Only consume if we actually have ASes to track, else we'll just chew cycles needlessly. @@ -98,7 +96,7 @@ func NewInternalAPI( } // Create application service transaction workers - if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil { + if err := workers.SetupTransactionWorkers(client, appserviceDB, workerStates); err != nil { logrus.WithError(err).Panicf("failed to start app service transaction workers") } return appserviceQueryAPI diff --git a/appservice/query/query.go b/appservice/query/query.go index 7e5ac4753..35339539d 100644 --- a/appservice/query/query.go +++ b/appservice/query/query.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" opentracing "github.com/opentracing/opentracing-go" log "github.com/sirupsen/logrus" ) @@ -33,7 +34,7 @@ const userIDExistsPath = "/users/" // AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI type AppServiceQueryAPI struct { - HTTPClient *http.Client + HTTPClient *gomatrixserverlib.Client Cfg *config.Dendrite } @@ -47,11 +48,6 @@ func (a *AppServiceQueryAPI) RoomAliasExists( span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias") defer span.Finish() - // Create an HTTP client if one does not already exist - if a.HTTPClient == nil { - a.HTTPClient = makeHTTPClient() - } - // Determine which application service should handle this request for _, appservice := range a.Cfg.Derived.ApplicationServices { if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) { @@ -68,7 +64,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists( } req = req.WithContext(ctx) - resp, err := a.HTTPClient.Do(req) + resp, err := a.HTTPClient.DoHTTPRequest(ctx, req) if resp != nil { defer func() { err = resp.Body.Close() @@ -115,11 +111,6 @@ func (a *AppServiceQueryAPI) UserIDExists( span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID") defer span.Finish() - // Create an HTTP client if one does not already exist - if a.HTTPClient == nil { - a.HTTPClient = makeHTTPClient() - } - // Determine which application service should handle this request for _, appservice := range a.Cfg.Derived.ApplicationServices { if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) { @@ -134,7 +125,7 @@ func (a *AppServiceQueryAPI) UserIDExists( if err != nil { return err } - resp, err := a.HTTPClient.Do(req.WithContext(ctx)) + resp, err := a.HTTPClient.DoHTTPRequest(ctx, req) if resp != nil { defer func() { err = resp.Body.Close() diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go index 45748c214..47d447c2c 100644 --- a/appservice/workers/transaction_scheduler.go +++ b/appservice/workers/transaction_scheduler.go @@ -34,8 +34,6 @@ import ( var ( // Maximum size of events sent in each transaction. transactionBatchSize = 50 - // Timeout for sending a single transaction to an application service. - transactionTimeout = time.Second * 60 ) // SetupTransactionWorkers spawns a separate goroutine for each application @@ -44,6 +42,7 @@ var ( // size), then send that off to the AS's /transactions/{txnID} endpoint. It also // handles exponentially backing off in case the AS isn't currently available. func SetupTransactionWorkers( + client *gomatrixserverlib.Client, appserviceDB storage.Database, workerStates []types.ApplicationServiceWorkerState, ) error { @@ -51,7 +50,7 @@ func SetupTransactionWorkers( for _, workerState := range workerStates { // Don't create a worker if this AS doesn't want to receive events if workerState.AppService.URL != "" { - go worker(appserviceDB, workerState) + go worker(client, appserviceDB, workerState) } } return nil @@ -59,17 +58,12 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. -func worker(db storage.Database, ws types.ApplicationServiceWorkerState) { +func worker(client *gomatrixserverlib.Client, db storage.Database, ws types.ApplicationServiceWorkerState) { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, }).Info("Starting application service") ctx := context.Background() - // Create a HTTP client for sending requests to app services - client := &http.Client{ - Timeout: transactionTimeout, - } - // Initial check for any leftover events to send from last time eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) if err != nil { @@ -206,7 +200,7 @@ func createTransaction( // send sends events to an application service. Returns an error if an OK was not // received back from the application service or the request timed out. func send( - client *http.Client, + client *gomatrixserverlib.Client, appservice config.ApplicationService, txnID int, transaction []byte, @@ -219,7 +213,7 @@ func send( return err } req.Header.Set("Content-Type", "application/json") - resp, err := client.Do(req) + resp, err := client.DoHTTPRequest(context.TODO(), req) if err != nil { return err } diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 332d156bd..58fe0d155 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -129,7 +129,7 @@ func (m *DendriteMonolith) Start() { base, cache.New(), userAPI, ) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + asAPI := appservice.NewInternalAPI(base, base.CreateClient(), userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) ygg.SetSessionFunc(func(address string) { diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 0610ec777..2e8792b2d 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -163,7 +163,7 @@ func main() { eduInputAPI := eduserver.NewInternalAPI( &base.Base, cache.New(), userAPI, ) - asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) + asAPI := appservice.NewInternalAPI(&base.Base, base.Base.CreateClient(), userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationsender.NewInternalAPI( &base.Base, federation, rsAPI, keyRing, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 2a4a335ab..b74a74a65 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -111,7 +111,7 @@ func main() { base, cache.New(), userAPI, ) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + asAPI := appservice.NewInternalAPI(base, base.CreateClient(), userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationsender.NewInternalAPI( base, federation, rsAPI, keyRing, diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index b82f73211..b8a3e231f 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -121,7 +121,9 @@ func main() { eduInputAPI = base.EDUServerClient() } - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + client := base.CreateClient() + + asAPI := appservice.NewInternalAPI(base, client, userAPI, rsAPI) if base.UseHTTPAPIs { appservice.AddInternalRoutes(base.InternalAPIMux, asAPI) asAPI = base.AppserviceHTTPClient() @@ -131,7 +133,7 @@ func main() { monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, - Client: base.CreateClient(), + Client: client, FedClient: federation, KeyRing: keyRing, diff --git a/cmd/dendrite-polylith-multi/personalities/appservice.go b/cmd/dendrite-polylith-multi/personalities/appservice.go index d269b15d4..5be3e594e 100644 --- a/cmd/dendrite-polylith-multi/personalities/appservice.go +++ b/cmd/dendrite-polylith-multi/personalities/appservice.go @@ -23,8 +23,9 @@ import ( func Appservice(base *setup.BaseDendrite, cfg *config.Dendrite) { userAPI := base.UserAPIClient() rsAPI := base.RoomserverHTTPClient() + client := base.CreateClient() - intAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + intAPI := appservice.NewInternalAPI(base, client, userAPI, rsAPI) appservice.AddInternalRoutes(base.InternalAPIMux, intAPI) base.SetupAndServeHTTP(