mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-03 12:13:09 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/filter
This commit is contained in:
commit
7c39cc3e48
|
|
@ -52,6 +52,7 @@ import (
|
||||||
"golang.org/x/net/http2"
|
"golang.org/x/net/http2"
|
||||||
"golang.org/x/net/http2/h2c"
|
"golang.org/x/net/http2/h2c"
|
||||||
|
|
||||||
|
pineconeConnections "github.com/matrix-org/pinecone/connections"
|
||||||
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
|
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
|
||||||
pineconeRouter "github.com/matrix-org/pinecone/router"
|
pineconeRouter "github.com/matrix-org/pinecone/router"
|
||||||
pineconeSessions "github.com/matrix-org/pinecone/sessions"
|
pineconeSessions "github.com/matrix-org/pinecone/sessions"
|
||||||
|
|
@ -71,11 +72,9 @@ type DendriteMonolith struct {
|
||||||
PineconeRouter *pineconeRouter.Router
|
PineconeRouter *pineconeRouter.Router
|
||||||
PineconeMulticast *pineconeMulticast.Multicast
|
PineconeMulticast *pineconeMulticast.Multicast
|
||||||
PineconeQUIC *pineconeSessions.Sessions
|
PineconeQUIC *pineconeSessions.Sessions
|
||||||
|
PineconeManager *pineconeConnections.ConnectionManager
|
||||||
StorageDirectory string
|
StorageDirectory string
|
||||||
CacheDirectory string
|
CacheDirectory string
|
||||||
staticPeerURI string
|
|
||||||
staticPeerMutex sync.RWMutex
|
|
||||||
staticPeerAttempt chan struct{}
|
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
httpServer *http.Server
|
httpServer *http.Server
|
||||||
processContext *process.ProcessContext
|
processContext *process.ProcessContext
|
||||||
|
|
@ -104,15 +103,8 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DendriteMonolith) SetStaticPeer(uri string) {
|
func (m *DendriteMonolith) SetStaticPeer(uri string) {
|
||||||
m.staticPeerMutex.Lock()
|
m.PineconeManager.RemovePeers()
|
||||||
m.staticPeerURI = strings.TrimSpace(uri)
|
m.PineconeManager.AddPeer(strings.TrimSpace(uri))
|
||||||
m.staticPeerMutex.Unlock()
|
|
||||||
m.DisconnectType(int(pineconeRouter.PeerTypeRemote))
|
|
||||||
if uri != "" {
|
|
||||||
go func() {
|
|
||||||
m.staticPeerAttempt <- struct{}{}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DendriteMonolith) DisconnectType(peertype int) {
|
func (m *DendriteMonolith) DisconnectType(peertype int) {
|
||||||
|
|
@ -210,43 +202,6 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
|
||||||
return loginRes.Device.AccessToken, nil
|
return loginRes.Device.AccessToken, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DendriteMonolith) staticPeerConnect() {
|
|
||||||
connected := map[string]bool{} // URI -> connected?
|
|
||||||
attempt := func() {
|
|
||||||
m.staticPeerMutex.RLock()
|
|
||||||
uri := m.staticPeerURI
|
|
||||||
m.staticPeerMutex.RUnlock()
|
|
||||||
if uri == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for k := range connected {
|
|
||||||
delete(connected, k)
|
|
||||||
}
|
|
||||||
for _, uri := range strings.Split(uri, ",") {
|
|
||||||
connected[strings.TrimSpace(uri)] = false
|
|
||||||
}
|
|
||||||
for _, info := range m.PineconeRouter.Peers() {
|
|
||||||
connected[info.URI] = true
|
|
||||||
}
|
|
||||||
for k, online := range connected {
|
|
||||||
if !online {
|
|
||||||
if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil {
|
|
||||||
logrus.WithError(err).Error("Failed to connect to static peer")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-m.processContext.Context().Done():
|
|
||||||
case <-m.staticPeerAttempt:
|
|
||||||
attempt()
|
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
attempt()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (m *DendriteMonolith) Start() {
|
func (m *DendriteMonolith) Start() {
|
||||||
var err error
|
var err error
|
||||||
|
|
@ -284,6 +239,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
||||||
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
||||||
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
||||||
|
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter)
|
||||||
|
|
||||||
prefix := hex.EncodeToString(pk)
|
prefix := hex.EncodeToString(pk)
|
||||||
cfg := &config.Dendrite{}
|
cfg := &config.Dendrite{}
|
||||||
|
|
@ -392,9 +348,6 @@ func (m *DendriteMonolith) Start() {
|
||||||
|
|
||||||
m.processContext = base.ProcessContext
|
m.processContext = base.ProcessContext
|
||||||
|
|
||||||
m.staticPeerAttempt = make(chan struct{}, 1)
|
|
||||||
go m.staticPeerConnect()
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
m.logger.Info("Listening on ", cfg.Global.ServerName)
|
m.logger.Info("Listening on ", cfg.Global.ServerName)
|
||||||
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")))
|
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")))
|
||||||
|
|
|
||||||
|
|
@ -64,11 +64,6 @@ const (
|
||||||
sessionIDLength = 24
|
sessionIDLength = 24
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
// Register prometheus metrics. They must be registered to be exposed.
|
|
||||||
prometheus.MustRegister(amtRegUsers)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sessionsDict keeps track of completed auth stages for each session.
|
// sessionsDict keeps track of completed auth stages for each session.
|
||||||
// It shouldn't be passed by value because it contains a mutex.
|
// It shouldn't be passed by value because it contains a mutex.
|
||||||
type sessionsDict struct {
|
type sessionsDict struct {
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -60,6 +61,8 @@ func Setup(
|
||||||
extRoomsProvider api.ExtraPublicRoomsProvider,
|
extRoomsProvider api.ExtraPublicRoomsProvider,
|
||||||
mscCfg *config.MSCs, natsClient *nats.Conn,
|
mscCfg *config.MSCs, natsClient *nats.Conn,
|
||||||
) {
|
) {
|
||||||
|
prometheus.MustRegister(amtRegUsers, sendEventDuration)
|
||||||
|
|
||||||
rateLimits := httputil.NewRateLimits(&cfg.RateLimiting)
|
rateLimits := httputil.NewRateLimits(&cfg.RateLimiting)
|
||||||
userInteractiveAuth := auth.NewUserInteractive(userAPI, cfg)
|
userInteractiveAuth := auth.NewUserInteractive(userAPI, cfg)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -46,10 +46,6 @@ var (
|
||||||
userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
|
userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
prometheus.MustRegister(sendEventDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
var sendEventDuration = prometheus.NewHistogramVec(
|
var sendEventDuration = prometheus.NewHistogramVec(
|
||||||
prometheus.HistogramOpts{
|
prometheus.HistogramOpts{
|
||||||
Namespace: "dendrite",
|
Namespace: "dendrite",
|
||||||
|
|
|
||||||
|
|
@ -1,156 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httputil"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
const usage = `Usage: %s
|
|
||||||
|
|
||||||
Create a single endpoint URL which clients can be pointed at.
|
|
||||||
|
|
||||||
The client-server API in Dendrite is split across multiple processes
|
|
||||||
which listen on multiple ports. You cannot point a Matrix client at
|
|
||||||
any of those ports, as there will be unimplemented functionality.
|
|
||||||
In addition, all client-server API processes start with the additional
|
|
||||||
path prefix '/api', which Matrix clients will be unaware of.
|
|
||||||
|
|
||||||
This tool will proxy requests for all client-server URLs and forward
|
|
||||||
them to their respective process. It will also add the '/api' path
|
|
||||||
prefix to incoming requests.
|
|
||||||
|
|
||||||
THIS TOOL IS FOR TESTING AND NOT INTENDED FOR PRODUCTION USE.
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
|
|
||||||
`
|
|
||||||
|
|
||||||
var (
|
|
||||||
syncServerURL = flag.String("sync-api-server-url", "", "The base URL of the listening 'dendrite-sync-api-server' process. E.g. 'http://localhost:4200'")
|
|
||||||
clientAPIURL = flag.String("client-api-server-url", "", "The base URL of the listening 'dendrite-client-api-server' process. E.g. 'http://localhost:4321'")
|
|
||||||
mediaAPIURL = flag.String("media-api-server-url", "", "The base URL of the listening 'dendrite-media-api-server' process. E.g. 'http://localhost:7779'")
|
|
||||||
bindAddress = flag.String("bind-address", ":8008", "The listening port for the proxy.")
|
|
||||||
certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS")
|
|
||||||
keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS")
|
|
||||||
)
|
|
||||||
|
|
||||||
func makeProxy(targetURL string) (*httputil.ReverseProxy, error) {
|
|
||||||
targetURL = strings.TrimSuffix(targetURL, "/")
|
|
||||||
|
|
||||||
// Check that we can parse the URL.
|
|
||||||
_, err := url.Parse(targetURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &httputil.ReverseProxy{
|
|
||||||
Director: func(req *http.Request) {
|
|
||||||
// URL.Path() removes the % escaping from the path.
|
|
||||||
// The % encoding will be added back when the url is encoded
|
|
||||||
// when the request is forwarded.
|
|
||||||
// This means that we will lose any unessecary escaping from the URL.
|
|
||||||
// Pratically this means that any distinction between '%2F' and '/'
|
|
||||||
// in the URL will be lost by the time it reaches the target.
|
|
||||||
path := req.URL.Path
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"path": path,
|
|
||||||
"url": targetURL,
|
|
||||||
"method": req.Method,
|
|
||||||
}).Print("proxying request")
|
|
||||||
newURL, err := url.Parse(targetURL)
|
|
||||||
// Set the path separately as we need to preserve '#' characters
|
|
||||||
// that would otherwise be interpreted as being the start of a URL
|
|
||||||
// fragment.
|
|
||||||
newURL.Path += path
|
|
||||||
if err != nil {
|
|
||||||
// We already checked that we can parse the URL
|
|
||||||
// So this shouldn't ever get hit.
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
// Copy the query parameters from the request.
|
|
||||||
newURL.RawQuery = req.URL.RawQuery
|
|
||||||
req.URL = newURL
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
flag.Usage = func() {
|
|
||||||
fmt.Fprintf(os.Stderr, usage, os.Args[0])
|
|
||||||
flag.PrintDefaults()
|
|
||||||
}
|
|
||||||
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if *syncServerURL == "" {
|
|
||||||
flag.Usage()
|
|
||||||
fmt.Fprintln(os.Stderr, "no --sync-api-server-url specified.")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if *clientAPIURL == "" {
|
|
||||||
flag.Usage()
|
|
||||||
fmt.Fprintln(os.Stderr, "no --client-api-server-url specified.")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if *mediaAPIURL == "" {
|
|
||||||
flag.Usage()
|
|
||||||
fmt.Fprintln(os.Stderr, "no --media-api-server-url specified.")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
syncProxy, err := makeProxy(*syncServerURL)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
clientProxy, err := makeProxy(*clientAPIURL)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
mediaProxy, err := makeProxy(*mediaAPIURL)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Handle("/_matrix/client/r0/sync", syncProxy)
|
|
||||||
http.Handle("/_matrix/media/v1/", mediaProxy)
|
|
||||||
http.Handle("/", clientProxy)
|
|
||||||
|
|
||||||
srv := &http.Server{
|
|
||||||
Addr: *bindAddress,
|
|
||||||
ReadTimeout: 1 * time.Minute, // how long we wait for the client to send the entire request (after connection accept)
|
|
||||||
WriteTimeout: 5 * time.Minute, // how long the proxy has to write the full response
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Proxying requests to:")
|
|
||||||
fmt.Println(" /_matrix/client/r0/sync => ", *syncServerURL+"/api/_matrix/client/r0/sync")
|
|
||||||
fmt.Println(" /_matrix/media/v1 => ", *mediaAPIURL+"/api/_matrix/media/v1")
|
|
||||||
fmt.Println(" /* => ", *clientAPIURL+"/api/*")
|
|
||||||
fmt.Println("Listening on ", *bindAddress)
|
|
||||||
if *certFile != "" && *keyFile != "" {
|
|
||||||
panic(srv.ListenAndServeTLS(*certFile, *keyFile))
|
|
||||||
} else {
|
|
||||||
panic(srv.ListenAndServe())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -25,7 +25,6 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
|
@ -47,6 +46,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi"
|
"github.com/matrix-org/dendrite/userapi"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
pineconeConnections "github.com/matrix-org/pinecone/connections"
|
||||||
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
|
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
|
||||||
pineconeRouter "github.com/matrix-org/pinecone/router"
|
pineconeRouter "github.com/matrix-org/pinecone/router"
|
||||||
pineconeSessions "github.com/matrix-org/pinecone/sessions"
|
pineconeSessions "github.com/matrix-org/pinecone/sessions"
|
||||||
|
|
@ -90,6 +90,13 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
||||||
|
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
||||||
|
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
||||||
|
pManager := pineconeConnections.NewConnectionManager(pRouter)
|
||||||
|
pMulticast.Start()
|
||||||
|
if instancePeer != nil && *instancePeer != "" {
|
||||||
|
pManager.AddPeer(*instancePeer)
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
listener, err := net.Listen("tcp", *instanceListen)
|
listener, err := net.Listen("tcp", *instanceListen)
|
||||||
|
|
@ -119,36 +126,6 @@ func main() {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
|
||||||
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
|
||||||
pMulticast.Start()
|
|
||||||
|
|
||||||
connectToStaticPeer := func() {
|
|
||||||
connected := map[string]bool{} // URI -> connected?
|
|
||||||
for _, uri := range strings.Split(*instancePeer, ",") {
|
|
||||||
connected[strings.TrimSpace(uri)] = false
|
|
||||||
}
|
|
||||||
attempt := func() {
|
|
||||||
for k := range connected {
|
|
||||||
connected[k] = false
|
|
||||||
}
|
|
||||||
for _, info := range pRouter.Peers() {
|
|
||||||
connected[info.URI] = true
|
|
||||||
}
|
|
||||||
for k, online := range connected {
|
|
||||||
if !online {
|
|
||||||
if err := conn.ConnectToPeer(pRouter, k); err != nil {
|
|
||||||
logrus.WithError(err).Error("Failed to connect to static peer")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
attempt()
|
|
||||||
time.Sleep(time.Second * 5)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := &config.Dendrite{}
|
cfg := &config.Dendrite{}
|
||||||
cfg.Defaults(true)
|
cfg.Defaults(true)
|
||||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||||
|
|
@ -268,7 +245,6 @@ func main() {
|
||||||
Handler: pMux,
|
Handler: pMux,
|
||||||
}
|
}
|
||||||
|
|
||||||
go connectToStaticPeer()
|
|
||||||
go func() {
|
go func() {
|
||||||
pubkey := pRouter.PublicKey()
|
pubkey := pRouter.PublicKey()
|
||||||
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
|
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"syscall/js"
|
"syscall/js"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
|
|
@ -44,6 +43,7 @@ import (
|
||||||
|
|
||||||
_ "github.com/matrix-org/go-sqlite3-js"
|
_ "github.com/matrix-org/go-sqlite3-js"
|
||||||
|
|
||||||
|
pineconeConnections "github.com/matrix-org/pinecone/connections"
|
||||||
pineconeRouter "github.com/matrix-org/pinecone/router"
|
pineconeRouter "github.com/matrix-org/pinecone/router"
|
||||||
pineconeSessions "github.com/matrix-org/pinecone/sessions"
|
pineconeSessions "github.com/matrix-org/pinecone/sessions"
|
||||||
)
|
)
|
||||||
|
|
@ -154,6 +154,8 @@ func startup() {
|
||||||
|
|
||||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
||||||
pSessions := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
pSessions := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
||||||
|
pManager := pineconeConnections.NewConnectionManager(pRouter)
|
||||||
|
pManager.AddPeer("wss://pinecone.matrix.org/public")
|
||||||
|
|
||||||
cfg := &config.Dendrite{}
|
cfg := &config.Dendrite{}
|
||||||
cfg.Defaults(true)
|
cfg.Defaults(true)
|
||||||
|
|
@ -237,20 +239,4 @@ func startup() {
|
||||||
}
|
}
|
||||||
s.ListenAndServe("fetch")
|
s.ListenAndServe("fetch")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Connect to the static peer
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
|
|
||||||
if err := conn.ConnectToPeer(pRouter, publicPeer); err != nil {
|
|
||||||
logrus.WithError(err).Error("Failed to connect to static peer")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-base.ProcessContext.Context().Done():
|
|
||||||
return
|
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,138 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httputil"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
const usage = `Usage: %s
|
|
||||||
|
|
||||||
Create a single endpoint URL which remote matrix servers can be pointed at.
|
|
||||||
|
|
||||||
The server-server API in Dendrite is split across multiple processes
|
|
||||||
which listen on multiple ports. You cannot point a Matrix server at
|
|
||||||
any of those ports, as there will be unimplemented functionality.
|
|
||||||
In addition, all server-server API processes start with the additional
|
|
||||||
path prefix '/api', which Matrix servers will be unaware of.
|
|
||||||
|
|
||||||
This tool will proxy requests for all server-server URLs and forward
|
|
||||||
them to their respective process. It will also add the '/api' path
|
|
||||||
prefix to incoming requests.
|
|
||||||
|
|
||||||
THIS TOOL IS FOR TESTING AND NOT INTENDED FOR PRODUCTION USE.
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
|
|
||||||
`
|
|
||||||
|
|
||||||
var (
|
|
||||||
federationAPIURL = flag.String("federation-api-url", "", "The base URL of the listening 'dendrite-federation-api-server' process. E.g. 'http://localhost:4200'")
|
|
||||||
mediaAPIURL = flag.String("media-api-server-url", "", "The base URL of the listening 'dendrite-media-api-server' process. E.g. 'http://localhost:7779'")
|
|
||||||
bindAddress = flag.String("bind-address", ":8448", "The listening port for the proxy.")
|
|
||||||
certFile = flag.String("tls-cert", "server.crt", "The PEM formatted X509 certificate to use for TLS")
|
|
||||||
keyFile = flag.String("tls-key", "server.key", "The PEM private key to use for TLS")
|
|
||||||
)
|
|
||||||
|
|
||||||
func makeProxy(targetURL string) (*httputil.ReverseProxy, error) {
|
|
||||||
if !strings.HasSuffix(targetURL, "/") {
|
|
||||||
targetURL += "/"
|
|
||||||
}
|
|
||||||
// Check that we can parse the URL.
|
|
||||||
_, err := url.Parse(targetURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &httputil.ReverseProxy{
|
|
||||||
Director: func(req *http.Request) {
|
|
||||||
// URL.Path() removes the % escaping from the path.
|
|
||||||
// The % encoding will be added back when the url is encoded
|
|
||||||
// when the request is forwarded.
|
|
||||||
// This means that we will lose any unessecary escaping from the URL.
|
|
||||||
// Pratically this means that any distinction between '%2F' and '/'
|
|
||||||
// in the URL will be lost by the time it reaches the target.
|
|
||||||
path := req.URL.Path
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"path": path,
|
|
||||||
"url": targetURL,
|
|
||||||
"method": req.Method,
|
|
||||||
}).Print("proxying request")
|
|
||||||
newURL, err := url.Parse(targetURL + path)
|
|
||||||
if err != nil {
|
|
||||||
// We already checked that we can parse the URL
|
|
||||||
// So this shouldn't ever get hit.
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
// Copy the query parameters from the request.
|
|
||||||
newURL.RawQuery = req.URL.RawQuery
|
|
||||||
req.URL = newURL
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
flag.Usage = func() {
|
|
||||||
fmt.Fprintf(os.Stderr, usage, os.Args[0])
|
|
||||||
flag.PrintDefaults()
|
|
||||||
}
|
|
||||||
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if *federationAPIURL == "" {
|
|
||||||
flag.Usage()
|
|
||||||
fmt.Fprintln(os.Stderr, "no --federation-api-url specified.")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if *mediaAPIURL == "" {
|
|
||||||
flag.Usage()
|
|
||||||
fmt.Fprintln(os.Stderr, "no --media-api-server-url specified.")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
federationProxy, err := makeProxy(*federationAPIURL)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
mediaProxy, err := makeProxy(*mediaAPIURL)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Handle("/_matrix/media/v1/", mediaProxy)
|
|
||||||
http.Handle("/", federationProxy)
|
|
||||||
|
|
||||||
srv := &http.Server{
|
|
||||||
Addr: *bindAddress,
|
|
||||||
ReadTimeout: 1 * time.Minute, // how long we wait for the client to send the entire request (after connection accept)
|
|
||||||
WriteTimeout: 5 * time.Minute, // how long the proxy has to write the full response
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Proxying requests to:")
|
|
||||||
fmt.Println(" /_matrix/media/v1 => ", *mediaAPIURL+"/api/_matrix/media/v1")
|
|
||||||
fmt.Println(" /* => ", *federationAPIURL+"/api/*")
|
|
||||||
fmt.Println("Listening on ", *bindAddress)
|
|
||||||
panic(srv.ListenAndServeTLS(*certFile, *keyFile))
|
|
||||||
}
|
|
||||||
|
|
@ -29,6 +29,7 @@ import (
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -53,6 +54,10 @@ func Setup(
|
||||||
servers federationAPI.ServersInRoomProvider,
|
servers federationAPI.ServersInRoomProvider,
|
||||||
producer *producers.SyncAPIProducer,
|
producer *producers.SyncAPIProducer,
|
||||||
) {
|
) {
|
||||||
|
prometheus.MustRegister(
|
||||||
|
pduCountTotal, eduCountTotal,
|
||||||
|
)
|
||||||
|
|
||||||
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
|
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
|
||||||
v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
|
v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
|
||||||
v2fedmux := fedMux.PathPrefix("/v2").Subrouter()
|
v2fedmux := fedMux.PathPrefix("/v2").Subrouter()
|
||||||
|
|
|
||||||
|
|
@ -74,12 +74,6 @@ var (
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
prometheus.MustRegister(
|
|
||||||
pduCountTotal, eduCountTotal,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
|
var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
|
||||||
|
|
||||||
// Send implements /_matrix/federation/v1/send/{txnID}
|
// Send implements /_matrix/federation/v1/send/{txnID}
|
||||||
|
|
|
||||||
31
go.mod
31
go.mod
|
|
@ -12,20 +12,20 @@ require (
|
||||||
github.com/MFAshby/stdemuxerhook v1.0.0
|
github.com/MFAshby/stdemuxerhook v1.0.0
|
||||||
github.com/Masterminds/semver/v3 v3.1.1
|
github.com/Masterminds/semver/v3 v3.1.1
|
||||||
github.com/codeclysm/extract v2.2.0+incompatible
|
github.com/codeclysm/extract v2.2.0+incompatible
|
||||||
github.com/containerd/containerd v1.5.9 // indirect
|
github.com/containerd/containerd v1.6.2 // indirect
|
||||||
github.com/docker/docker v20.10.12+incompatible
|
github.com/docker/docker v20.10.14+incompatible
|
||||||
github.com/docker/go-connections v0.4.0
|
github.com/docker/go-connections v0.4.0
|
||||||
github.com/frankban/quicktest v1.14.0 // indirect
|
github.com/frankban/quicktest v1.14.3 // indirect
|
||||||
github.com/getsentry/sentry-go v0.12.0
|
github.com/getsentry/sentry-go v0.13.0
|
||||||
github.com/gologme/log v1.3.0
|
github.com/gologme/log v1.3.0
|
||||||
github.com/google/go-cmp v0.5.6
|
github.com/google/go-cmp v0.5.7
|
||||||
github.com/google/uuid v1.2.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/gorilla/websocket v1.4.2
|
github.com/gorilla/websocket v1.5.0
|
||||||
github.com/h2non/filetype v1.1.3 // indirect
|
github.com/h2non/filetype v1.1.3 // indirect
|
||||||
github.com/hashicorp/golang-lru v0.5.4
|
github.com/hashicorp/golang-lru v0.5.4
|
||||||
github.com/juju/testing v0.0.0-20211215003918-77eb13d6cad2 // indirect
|
github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494 // indirect
|
||||||
github.com/lib/pq v1.10.4
|
github.com/lib/pq v1.10.5
|
||||||
github.com/libp2p/go-libp2p v0.13.0
|
github.com/libp2p/go-libp2p v0.13.0
|
||||||
github.com/libp2p/go-libp2p-circuit v0.4.0
|
github.com/libp2p/go-libp2p-circuit v0.4.0
|
||||||
github.com/libp2p/go-libp2p-core v0.8.3
|
github.com/libp2p/go-libp2p-core v0.8.3
|
||||||
|
|
@ -42,12 +42,12 @@ require (
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220404141326-e526fa82f79d
|
github.com/matrix-org/pinecone v0.0.0-20220404141326-e526fa82f79d
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.10
|
github.com/mattn/go-sqlite3 v1.14.10
|
||||||
github.com/morikuni/aec v1.0.0 // indirect
|
|
||||||
github.com/nats-io/nats-server/v2 v2.7.4-0.20220309205833-773636c1c5bb
|
github.com/nats-io/nats-server/v2 v2.7.4-0.20220309205833-773636c1c5bb
|
||||||
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
|
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
|
||||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
||||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||||
github.com/ngrok/sqlmw v0.0.0-20211220175533-9d16fdc47b31
|
github.com/ngrok/sqlmw v0.0.0-20211220175533-9d16fdc47b31
|
||||||
|
github.com/opencontainers/image-spec v1.0.2 // indirect
|
||||||
github.com/opentracing/opentracing-go v1.2.0
|
github.com/opentracing/opentracing-go v1.2.0
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
|
|
@ -60,14 +60,13 @@ require (
|
||||||
github.com/uber/jaeger-lib v2.4.1+incompatible
|
github.com/uber/jaeger-lib v2.4.1+incompatible
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.4.3
|
github.com/yggdrasil-network/yggdrasil-go v0.4.3
|
||||||
go.uber.org/atomic v1.9.0
|
go.uber.org/atomic v1.9.0
|
||||||
golang.org/x/crypto v0.0.0-20220214200702-86341886e292
|
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29
|
||||||
golang.org/x/image v0.0.0-20211028202545-6944b10bf410
|
golang.org/x/image v0.0.0-20220321031419-a8550c1d254a
|
||||||
golang.org/x/mobile v0.0.0-20220325161704-447654d348e3
|
golang.org/x/mobile v0.0.0-20220407111146-e579adbbc4a2
|
||||||
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
|
golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3
|
||||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
||||||
gopkg.in/h2non/bimg.v1 v1.1.5
|
gopkg.in/h2non/bimg.v1 v1.1.9
|
||||||
gopkg.in/yaml.v2 v2.4.0
|
gopkg.in/yaml.v2 v2.4.0
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
|
||||||
nhooyr.io/websocket v1.8.7
|
nhooyr.io/websocket v1.8.7
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -171,6 +171,7 @@ func MakeHTMLAPI(metricsName string, f func(http.ResponseWriter, *http.Request)
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: metricsName,
|
Name: metricsName,
|
||||||
Help: "Total number of http requests for HTML resources",
|
Help: "Total number of http requests for HTML resources",
|
||||||
|
Namespace: "dendrite",
|
||||||
},
|
},
|
||||||
[]string{"code"},
|
[]string{"code"},
|
||||||
),
|
),
|
||||||
|
|
@ -201,7 +202,28 @@ func MakeInternalAPI(metricsName string, f func(*http.Request) util.JSONResponse
|
||||||
h.ServeHTTP(w, req)
|
h.ServeHTTP(w, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
return http.HandlerFunc(withSpan)
|
return promhttp.InstrumentHandlerCounter(
|
||||||
|
promauto.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: metricsName + "_requests_total",
|
||||||
|
Help: "Total number of internal API calls",
|
||||||
|
Namespace: "dendrite",
|
||||||
|
},
|
||||||
|
[]string{"code"},
|
||||||
|
),
|
||||||
|
promhttp.InstrumentHandlerResponseSize(
|
||||||
|
promauto.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Name: metricsName + "_response_size_bytes",
|
||||||
|
Help: "A histogram of response sizes for requests.",
|
||||||
|
Buckets: []float64{200, 500, 900, 1500, 5000, 15000, 50000, 100000},
|
||||||
|
},
|
||||||
|
[]string{},
|
||||||
|
),
|
||||||
|
http.HandlerFunc(withSpan),
|
||||||
|
),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakeFedAPI makes an http.Handler that checks matrix federation authentication.
|
// MakeFedAPI makes an http.Handler that checks matrix federation authentication.
|
||||||
|
|
|
||||||
|
|
@ -167,6 +167,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
|
||||||
// will look to see if we have a worker for that room which has its
|
// will look to see if we have a worker for that room which has its
|
||||||
// own consumer. If we don't, we'll start one.
|
// own consumer. If we don't, we'll start one.
|
||||||
func (r *Inputer) Start() error {
|
func (r *Inputer) Start() error {
|
||||||
|
prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration)
|
||||||
_, err := r.JetStream.Subscribe(
|
_, err := r.JetStream.Subscribe(
|
||||||
"", // This is blank because we specified it in BindStream.
|
"", // This is blank because we specified it in BindStream.
|
||||||
func(m *nats.Msg) {
|
func(m *nats.Msg) {
|
||||||
|
|
@ -421,10 +422,6 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
prometheus.MustRegister(roomserverInputBackpressure)
|
|
||||||
}
|
|
||||||
|
|
||||||
var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: "dendrite",
|
Namespace: "dendrite",
|
||||||
|
|
|
||||||
|
|
@ -37,10 +37,6 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
prometheus.MustRegister(processRoomEventDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Does this value make sense?
|
// TODO: Does this value make sense?
|
||||||
const MaximumMissingProcessingTime = time.Minute * 2
|
const MaximumMissingProcessingTime = time.Minute * 2
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -67,6 +67,9 @@ func NewRequestPool(
|
||||||
streams *streams.Streams, notifier *notifier.Notifier,
|
streams *streams.Streams, notifier *notifier.Notifier,
|
||||||
producer PresencePublisher,
|
producer PresencePublisher,
|
||||||
) *RequestPool {
|
) *RequestPool {
|
||||||
|
prometheus.MustRegister(
|
||||||
|
activeSyncRequests, waitingSyncRequests,
|
||||||
|
)
|
||||||
rp := &RequestPool{
|
rp := &RequestPool{
|
||||||
db: db,
|
db: db,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
|
@ -183,12 +186,6 @@ func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device)
|
||||||
rp.lastseen.Store(device.UserID+device.ID, time.Now())
|
rp.lastseen.Store(device.UserID+device.ID, time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
prometheus.MustRegister(
|
|
||||||
activeSyncRequests, waitingSyncRequests,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
var activeSyncRequests = prometheus.NewGauge(
|
var activeSyncRequests = prometheus.NewGauge(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: "dendrite",
|
Namespace: "dendrite",
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue