mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Add mutex, rename variables, update logging
This commit is contained in:
parent
4d4978b1ae
commit
f01ff850ab
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
|
@ -58,8 +59,10 @@ func NewInternalAPI(
|
||||||
// Create appserivce query API with an HTTP client that will be used for all
|
// Create appserivce query API with an HTTP client that will be used for all
|
||||||
// outbound and inbound requests (inbound only for the internal API)
|
// outbound and inbound requests (inbound only for the internal API)
|
||||||
appserviceQueryAPI := &query.AppServiceQueryAPI{
|
appserviceQueryAPI := &query.AppServiceQueryAPI{
|
||||||
HTTPClient: client,
|
HTTPClient: client,
|
||||||
Cfg: &base.Cfg.AppServiceAPI,
|
Cfg: &base.Cfg.AppServiceAPI,
|
||||||
|
ProtocolCache: map[string]appserviceAPI.ASProtocolResponse{},
|
||||||
|
CacheMu: sync.Mutex{},
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(base.Cfg.Derived.ApplicationServices) == 0 {
|
if len(base.Cfg.Derived.ApplicationServices) == 0 {
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -39,7 +40,8 @@ const userIDExistsPath = "/users/"
|
||||||
type AppServiceQueryAPI struct {
|
type AppServiceQueryAPI struct {
|
||||||
HTTPClient *http.Client
|
HTTPClient *http.Client
|
||||||
Cfg *config.AppServiceAPI
|
Cfg *config.AppServiceAPI
|
||||||
protocolCache map[string]api.ASProtocolResponse
|
ProtocolCache map[string]api.ASProtocolResponse
|
||||||
|
CacheMu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// RoomAliasExists performs a request to '/room/{roomAlias}' on all known
|
// RoomAliasExists performs a request to '/room/{roomAlias}' on all known
|
||||||
|
|
@ -208,7 +210,7 @@ func (a *AppServiceQueryAPI) Locations(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, as := range a.Cfg.Derived.ApplicationServices {
|
for _, as := range a.Cfg.Derived.ApplicationServices {
|
||||||
var proto []api.ASLocationResponse
|
var asLocations []api.ASLocationResponse
|
||||||
params.Set("access_token", as.HSToken)
|
params.Set("access_token", as.HSToken)
|
||||||
|
|
||||||
url := as.URL + api.ASLocationPath
|
url := as.URL + api.ASLocationPath
|
||||||
|
|
@ -216,12 +218,12 @@ func (a *AppServiceQueryAPI) Locations(
|
||||||
url += "/" + req.Protocol
|
url += "/" + req.Protocol
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := requestDo[[]api.ASLocationResponse](a.HTTPClient, url+"?"+params.Encode(), &proto); err != nil {
|
if err := requestDo[[]api.ASLocationResponse](a.HTTPClient, url+"?"+params.Encode(), &asLocations); err != nil {
|
||||||
log.WithError(err).Error("unable to get protocolResponse from application service")
|
log.WithError(err).Error("unable to get 'locations' from application service")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.Locations = append(resp.Locations, proto...)
|
resp.Locations = append(resp.Locations, asLocations...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(resp.Locations) == 0 {
|
if len(resp.Locations) == 0 {
|
||||||
|
|
@ -243,7 +245,7 @@ func (a *AppServiceQueryAPI) User(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, as := range a.Cfg.Derived.ApplicationServices {
|
for _, as := range a.Cfg.Derived.ApplicationServices {
|
||||||
var proto []api.ASUserResponse
|
var asUsers []api.ASUserResponse
|
||||||
params.Set("access_token", as.HSToken)
|
params.Set("access_token", as.HSToken)
|
||||||
|
|
||||||
url := as.URL + api.ASUserPath
|
url := as.URL + api.ASUserPath
|
||||||
|
|
@ -251,12 +253,12 @@ func (a *AppServiceQueryAPI) User(
|
||||||
url += "/" + req.Protocol
|
url += "/" + req.Protocol
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := requestDo[[]api.ASUserResponse](a.HTTPClient, url+"?"+params.Encode(), &proto); err != nil {
|
if err := requestDo[[]api.ASUserResponse](a.HTTPClient, url+"?"+params.Encode(), &asUsers); err != nil {
|
||||||
log.WithError(err).Error("unable to get protocolResponse from application service")
|
log.WithError(err).Error("unable to get 'user' from application service")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.Users = append(resp.Users, proto...)
|
resp.Users = append(resp.Users, asUsers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(resp.Users) == 0 {
|
if len(resp.Users) == 0 {
|
||||||
|
|
@ -276,7 +278,9 @@ func (a *AppServiceQueryAPI) Protocols(
|
||||||
// get a single protocol response
|
// get a single protocol response
|
||||||
if req.Protocol != "" {
|
if req.Protocol != "" {
|
||||||
|
|
||||||
if proto, ok := a.protocolCache[req.Protocol]; ok {
|
a.CacheMu.Lock()
|
||||||
|
defer a.CacheMu.Unlock()
|
||||||
|
if proto, ok := a.ProtocolCache[req.Protocol]; ok {
|
||||||
resp.Exists = true
|
resp.Exists = true
|
||||||
resp.Protocols = map[string]api.ASProtocolResponse{
|
resp.Protocols = map[string]api.ASProtocolResponse{
|
||||||
req.Protocol: proto,
|
req.Protocol: proto,
|
||||||
|
|
@ -285,11 +289,10 @@ func (a *AppServiceQueryAPI) Protocols(
|
||||||
}
|
}
|
||||||
|
|
||||||
response := api.ASProtocolResponse{}
|
response := api.ASProtocolResponse{}
|
||||||
log.Debugf("XXX: getting single protocol")
|
|
||||||
for _, as := range a.Cfg.Derived.ApplicationServices {
|
for _, as := range a.Cfg.Derived.ApplicationServices {
|
||||||
var proto api.ASProtocolResponse
|
var proto api.ASProtocolResponse
|
||||||
if err := requestDo[api.ASProtocolResponse](a.HTTPClient, as.URL+api.ASProtocolPath+req.Protocol, &proto); err != nil {
|
if err := requestDo[api.ASProtocolResponse](a.HTTPClient, as.URL+api.ASProtocolPath+req.Protocol, &proto); err != nil {
|
||||||
logrus.WithError(err).Error("unable to get protocolResponse from application service")
|
logrus.WithError(err).Error("unable to get 'protocol' from application service")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -337,7 +340,10 @@ func (a *AppServiceQueryAPI) Protocols(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
a.protocolCache = response
|
a.CacheMu.Lock()
|
||||||
|
defer a.CacheMu.Unlock()
|
||||||
|
a.ProtocolCache = response
|
||||||
|
|
||||||
resp.Exists = true
|
resp.Exists = true
|
||||||
resp.Protocols = response
|
resp.Protocols = response
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue