mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-28 09:13:09 -06:00
Use gomatrixserverlib.Client since that allows us to disable TLS validation using the config
This commit is contained in:
parent
d606eec69f
commit
9cf65c51f0
|
|
@ -16,9 +16,7 @@ package appservice
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
|
|
@ -33,6 +31,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/kafka"
|
"github.com/matrix-org/dendrite/setup/kafka"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -45,6 +44,7 @@ func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQuer
|
||||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||||
func NewInternalAPI(
|
func NewInternalAPI(
|
||||||
base *setup.BaseDendrite,
|
base *setup.BaseDendrite,
|
||||||
|
client *gomatrixserverlib.Client,
|
||||||
userAPI userapi.UserInternalAPI,
|
userAPI userapi.UserInternalAPI,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
) appserviceAPI.AppServiceQueryAPI {
|
) appserviceAPI.AppServiceQueryAPI {
|
||||||
|
|
@ -79,10 +79,8 @@ func NewInternalAPI(
|
||||||
// Create appserivce query API with an HTTP client that will be used for all
|
// Create appserivce query API with an HTTP client that will be used for all
|
||||||
// outbound and inbound requests (inbound only for the internal API)
|
// outbound and inbound requests (inbound only for the internal API)
|
||||||
appserviceQueryAPI := &query.AppServiceQueryAPI{
|
appserviceQueryAPI := &query.AppServiceQueryAPI{
|
||||||
HTTPClient: &http.Client{
|
HTTPClient: client,
|
||||||
Timeout: time.Second * 30,
|
Cfg: base.Cfg,
|
||||||
},
|
|
||||||
Cfg: base.Cfg,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
|
// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
|
||||||
|
|
@ -98,7 +96,7 @@ func NewInternalAPI(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create application service transaction workers
|
// Create application service transaction workers
|
||||||
if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
|
if err := workers.SetupTransactionWorkers(client, appserviceDB, workerStates); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start app service transaction workers")
|
logrus.WithError(err).Panicf("failed to start app service transaction workers")
|
||||||
}
|
}
|
||||||
return appserviceQueryAPI
|
return appserviceQueryAPI
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice/api"
|
"github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
opentracing "github.com/opentracing/opentracing-go"
|
opentracing "github.com/opentracing/opentracing-go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
@ -33,7 +34,7 @@ const userIDExistsPath = "/users/"
|
||||||
|
|
||||||
// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
|
// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
|
||||||
type AppServiceQueryAPI struct {
|
type AppServiceQueryAPI struct {
|
||||||
HTTPClient *http.Client
|
HTTPClient *gomatrixserverlib.Client
|
||||||
Cfg *config.Dendrite
|
Cfg *config.Dendrite
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -47,11 +48,6 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Create an HTTP client if one does not already exist
|
|
||||||
if a.HTTPClient == nil {
|
|
||||||
a.HTTPClient = makeHTTPClient()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine which application service should handle this request
|
// Determine which application service should handle this request
|
||||||
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
||||||
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
|
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
|
||||||
|
|
@ -68,7 +64,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
||||||
}
|
}
|
||||||
req = req.WithContext(ctx)
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
resp, err := a.HTTPClient.Do(req)
|
resp, err := a.HTTPClient.DoHTTPRequest(ctx, req)
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
defer func() {
|
defer func() {
|
||||||
err = resp.Body.Close()
|
err = resp.Body.Close()
|
||||||
|
|
@ -115,11 +111,6 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
// Create an HTTP client if one does not already exist
|
|
||||||
if a.HTTPClient == nil {
|
|
||||||
a.HTTPClient = makeHTTPClient()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine which application service should handle this request
|
// Determine which application service should handle this request
|
||||||
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
||||||
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
|
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
|
||||||
|
|
@ -134,7 +125,7 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
resp, err := a.HTTPClient.Do(req.WithContext(ctx))
|
resp, err := a.HTTPClient.DoHTTPRequest(ctx, req)
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
defer func() {
|
defer func() {
|
||||||
err = resp.Body.Close()
|
err = resp.Body.Close()
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,6 @@ import (
|
||||||
var (
|
var (
|
||||||
// Maximum size of events sent in each transaction.
|
// Maximum size of events sent in each transaction.
|
||||||
transactionBatchSize = 50
|
transactionBatchSize = 50
|
||||||
// Timeout for sending a single transaction to an application service.
|
|
||||||
transactionTimeout = time.Second * 60
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetupTransactionWorkers spawns a separate goroutine for each application
|
// SetupTransactionWorkers spawns a separate goroutine for each application
|
||||||
|
|
@ -44,6 +42,7 @@ var (
|
||||||
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
|
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
|
||||||
// handles exponentially backing off in case the AS isn't currently available.
|
// handles exponentially backing off in case the AS isn't currently available.
|
||||||
func SetupTransactionWorkers(
|
func SetupTransactionWorkers(
|
||||||
|
client *gomatrixserverlib.Client,
|
||||||
appserviceDB storage.Database,
|
appserviceDB storage.Database,
|
||||||
workerStates []types.ApplicationServiceWorkerState,
|
workerStates []types.ApplicationServiceWorkerState,
|
||||||
) error {
|
) error {
|
||||||
|
|
@ -51,7 +50,7 @@ func SetupTransactionWorkers(
|
||||||
for _, workerState := range workerStates {
|
for _, workerState := range workerStates {
|
||||||
// Don't create a worker if this AS doesn't want to receive events
|
// Don't create a worker if this AS doesn't want to receive events
|
||||||
if workerState.AppService.URL != "" {
|
if workerState.AppService.URL != "" {
|
||||||
go worker(appserviceDB, workerState)
|
go worker(client, appserviceDB, workerState)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -59,17 +58,12 @@ func SetupTransactionWorkers(
|
||||||
|
|
||||||
// worker is a goroutine that sends any queued events to the application service
|
// worker is a goroutine that sends any queued events to the application service
|
||||||
// it is given.
|
// it is given.
|
||||||
func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
|
func worker(client *gomatrixserverlib.Client, db storage.Database, ws types.ApplicationServiceWorkerState) {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"appservice": ws.AppService.ID,
|
"appservice": ws.AppService.ID,
|
||||||
}).Info("Starting application service")
|
}).Info("Starting application service")
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Create a HTTP client for sending requests to app services
|
|
||||||
client := &http.Client{
|
|
||||||
Timeout: transactionTimeout,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initial check for any leftover events to send from last time
|
// Initial check for any leftover events to send from last time
|
||||||
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
|
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -206,7 +200,7 @@ func createTransaction(
|
||||||
// send sends events to an application service. Returns an error if an OK was not
|
// send sends events to an application service. Returns an error if an OK was not
|
||||||
// received back from the application service or the request timed out.
|
// received back from the application service or the request timed out.
|
||||||
func send(
|
func send(
|
||||||
client *http.Client,
|
client *gomatrixserverlib.Client,
|
||||||
appservice config.ApplicationService,
|
appservice config.ApplicationService,
|
||||||
txnID int,
|
txnID int,
|
||||||
transaction []byte,
|
transaction []byte,
|
||||||
|
|
@ -219,7 +213,7 @@ func send(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
resp, err := client.Do(req)
|
resp, err := client.DoHTTPRequest(context.TODO(), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -129,7 +129,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
base, cache.New(), userAPI,
|
base, cache.New(), userAPI,
|
||||||
)
|
)
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(base, base.CreateClient(), userAPI, rsAPI)
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
|
|
||||||
ygg.SetSessionFunc(func(address string) {
|
ygg.SetSessionFunc(func(address string) {
|
||||||
|
|
|
||||||
|
|
@ -163,7 +163,7 @@ func main() {
|
||||||
eduInputAPI := eduserver.NewInternalAPI(
|
eduInputAPI := eduserver.NewInternalAPI(
|
||||||
&base.Base, cache.New(), userAPI,
|
&base.Base, cache.New(), userAPI,
|
||||||
)
|
)
|
||||||
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(&base.Base, base.Base.CreateClient(), userAPI, rsAPI)
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
fsAPI := federationsender.NewInternalAPI(
|
fsAPI := federationsender.NewInternalAPI(
|
||||||
&base.Base, federation, rsAPI, keyRing,
|
&base.Base, federation, rsAPI, keyRing,
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,7 @@ func main() {
|
||||||
base, cache.New(), userAPI,
|
base, cache.New(), userAPI,
|
||||||
)
|
)
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(base, base.CreateClient(), userAPI, rsAPI)
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
fsAPI := federationsender.NewInternalAPI(
|
fsAPI := federationsender.NewInternalAPI(
|
||||||
base, federation, rsAPI, keyRing,
|
base, federation, rsAPI, keyRing,
|
||||||
|
|
|
||||||
|
|
@ -121,7 +121,9 @@ func main() {
|
||||||
eduInputAPI = base.EDUServerClient()
|
eduInputAPI = base.EDUServerClient()
|
||||||
}
|
}
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
client := base.CreateClient()
|
||||||
|
|
||||||
|
asAPI := appservice.NewInternalAPI(base, client, userAPI, rsAPI)
|
||||||
if base.UseHTTPAPIs {
|
if base.UseHTTPAPIs {
|
||||||
appservice.AddInternalRoutes(base.InternalAPIMux, asAPI)
|
appservice.AddInternalRoutes(base.InternalAPIMux, asAPI)
|
||||||
asAPI = base.AppserviceHTTPClient()
|
asAPI = base.AppserviceHTTPClient()
|
||||||
|
|
@ -131,7 +133,7 @@ func main() {
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
AccountDB: accountDB,
|
AccountDB: accountDB,
|
||||||
Client: base.CreateClient(),
|
Client: client,
|
||||||
FedClient: federation,
|
FedClient: federation,
|
||||||
KeyRing: keyRing,
|
KeyRing: keyRing,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,9 @@ import (
|
||||||
func Appservice(base *setup.BaseDendrite, cfg *config.Dendrite) {
|
func Appservice(base *setup.BaseDendrite, cfg *config.Dendrite) {
|
||||||
userAPI := base.UserAPIClient()
|
userAPI := base.UserAPIClient()
|
||||||
rsAPI := base.RoomserverHTTPClient()
|
rsAPI := base.RoomserverHTTPClient()
|
||||||
|
client := base.CreateClient()
|
||||||
|
|
||||||
intAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
intAPI := appservice.NewInternalAPI(base, client, userAPI, rsAPI)
|
||||||
appservice.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
appservice.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
||||||
|
|
||||||
base.SetupAndServeHTTP(
|
base.SetupAndServeHTTP(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue