Merge branch 'master' of https://github.com/matrix-org/dendrite into add-nats-support

This commit is contained in:
Till Faelligen 2021-07-02 20:21:17 +02:00
commit c575f5501f
51 changed files with 1346 additions and 295 deletions

View file

@ -1,7 +1,7 @@
# Dendrite [![Build Status](https://badge.buildkite.com/4be40938ab19f2bbc4a6c6724517353ee3ec1422e279faf374.svg?branch=master)](https://buildkite.com/matrix-dot-org/dendrite) [![Dendrite](https://img.shields.io/matrix/dendrite:matrix.org.svg?label=%23dendrite%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite:matrix.org) [![Dendrite Dev](https://img.shields.io/matrix/dendrite-dev:matrix.org.svg?label=%23dendrite-dev%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite-dev:matrix.org)
Dendrite is a second-generation Matrix homeserver written in Go.
It intends to provide an **efficient**, **reliable** and **scalable** alternative to Synapse:
It intends to provide an **efficient**, **reliable** and **scalable** alternative to [Synapse](https://github.com/matrix-org/synapse):
- Efficient: A small memory footprint with better baseline performance than an out-of-the-box Synapse.
- Reliable: Implements the Matrix specification as written, using the
[same test suite](https://github.com/matrix-org/sytest) as Synapse as well as

View file

@ -32,14 +32,18 @@ INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1);
`
const selectTxnIDSQL = `
SELECT last_id FROM appservice_counters WHERE name='txn_id';
UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id';
SELECT last_id FROM appservice_counters WHERE name='txn_id'
`
const updateTxnIDSQL = `
UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id'
`
type txnStatements struct {
db *sql.DB
writer sqlutil.Writer
selectTxnIDStmt *sql.Stmt
updateTxnIDStmt *sql.Stmt
}
func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
@ -54,6 +58,10 @@ func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
return
}
if s.updateTxnIDStmt, err = db.Prepare(updateTxnIDSQL); err != nil {
return
}
return
}
@ -63,6 +71,11 @@ func (s *txnStatements) selectTxnID(
) (txnID int, err error) {
err = s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
err := s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
if err != nil {
return err
}
_, err = s.updateTxnIDStmt.ExecContext(ctx)
return err
})
return

View file

@ -37,19 +37,21 @@ runtime config should come from. The mounted folder must contain:
To generate keys:
```
go run github.com/matrix-org/dendrite/cmd/generate-keys \
--private-key=matrix_key.pem \
--tls-cert=server.crt \
--tls-key=server.key
docker run --rm --entrypoint="" \
-v $(pwd):/mnt \
matrixdotorg/dendrite-monolith:latest \
/usr/bin/generate-keys \
-private-key /mnt/matrix_key.pem \
-tls-cert /mnt/server.crt \
-tls-key /mnt/server.key
```
The key files will now exist in your current working directory, and can be mounted into place.
## Starting Dendrite as a monolith deployment
Create your config based on the `dendrite.yaml` configuration file in the `docker/config`
folder in the [Dendrite repository](https://github.com/matrix-org/dendrite). Additionally,
make the following changes to the configuration:
- Enable Naffka: `use_naffka: true`
folder in the [Dendrite repository](https://github.com/matrix-org/dendrite).
Once in place, start the PostgreSQL dependency:

View file

@ -7,7 +7,7 @@ do
case "$option"
in
a) gomobile bind -v -target android -trimpath -ldflags="-s -w" github.com/matrix-org/dendrite/build/gobind-pinecone ;;
i) gomobile bind -v -target ios -trimpath -ldflags="-s -w" github.com/matrix-org/dendrite/build/gobind-pinecone ;;
i) gomobile bind -v -target ios -trimpath -ldflags="" github.com/matrix-org/dendrite/build/gobind-pinecone ;;
*) echo "No target specified, specify -a or -i"; exit 1 ;;
esac
done

View file

@ -10,7 +10,6 @@ import (
"io"
"io/ioutil"
"log"
"math"
"net"
"net/http"
"os"
@ -37,15 +36,16 @@ import (
userapiAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
"github.com/matrix-org/pinecone/router"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
"github.com/matrix-org/pinecone/types"
pineconeTypes "github.com/matrix-org/pinecone/types"
_ "golang.org/x/mobile/bind"
)
const (
@ -63,7 +63,7 @@ type DendriteMonolith struct {
CacheDirectory string
staticPeerURI string
staticPeerMutex sync.RWMutex
staticPeerAttempts atomic.Uint32
staticPeerAttempt chan struct{}
listener net.Listener
httpServer *http.Server
processContext *process.ProcessContext
@ -97,7 +97,9 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Unlock()
m.DisconnectType(pineconeRouter.PeerTypeRemote)
if uri != "" {
m.staticPeerConnect()
go func() {
m.staticPeerAttempt <- struct{}{}
}()
}
}
@ -193,6 +195,8 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
}
func (m *DendriteMonolith) staticPeerConnect() {
attempt := func() {
if m.PineconeRouter.PeerCount(router.PeerTypeRemote) == 0 {
m.staticPeerMutex.RLock()
uri := m.staticPeerURI
m.staticPeerMutex.RUnlock()
@ -200,10 +204,18 @@ func (m *DendriteMonolith) staticPeerConnect() {
return
}
if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
exp := time.Second * time.Duration(math.Exp2(float64(m.staticPeerAttempts.Inc())))
time.AfterFunc(exp, m.staticPeerConnect)
} else {
m.staticPeerAttempts.Store(0)
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
}
for {
select {
case <-m.processContext.Context().Done():
case <-m.staticPeerAttempt:
attempt()
case <-time.After(time.Second * 5):
attempt()
}
}
}
@ -246,13 +258,6 @@ func (m *DendriteMonolith) Start() {
m.PineconeQUIC = pineconeSessions.NewSessions(logger, m.PineconeRouter)
m.PineconeMulticast = pineconeMulticast.NewMulticast(logger, m.PineconeRouter)
m.PineconeRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
if peertype == pineconeRouter.PeerTypeRemote {
m.staticPeerAttempts.Store(0)
time.AfterFunc(time.Second, m.staticPeerConnect)
}
})
prefix := hex.EncodeToString(pk)
cfg := &config.Dendrite{}
cfg.Defaults()
@ -272,6 +277,7 @@ func (m *DendriteMonolith) Start() {
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-appservice.db", m.StorageDirectory, prefix))
cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory))
cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory))
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
if err := cfg.Derive(); err != nil {
panic(err)
}
@ -290,7 +296,7 @@ func (m *DendriteMonolith) Start() {
)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
base, federation, rsAPI, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
@ -356,8 +362,12 @@ func (m *DendriteMonolith) Start() {
},
Handler: h2c.NewHandler(pMux, h2s),
}
m.processContext = base.ProcessContext
m.staticPeerAttempt = make(chan struct{}, 1)
go m.staticPeerConnect()
go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName)
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC))

View file

@ -25,6 +25,8 @@ import (
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
_ "golang.org/x/mobile/bind"
)
type DendriteMonolith struct {
@ -118,7 +120,7 @@ func (m *DendriteMonolith) Start() {
)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
base, federation, rsAPI, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)

View file

@ -69,7 +69,7 @@ func GetAccountData(
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.Forbidden("data not found"),
JSON: jsonerror.NotFound("data not found"),
}
}

View file

@ -18,13 +18,18 @@ import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"syscall"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/bcrypt"
"golang.org/x/term"
)
const usage = `Usage: %s
@ -33,7 +38,15 @@ Creates a new user account on the homeserver.
Example:
./create-account --config dendrite.yaml --username alice --password foobarbaz
# provide password by parameter
%s --config dendrite.yaml -username alice -password foobarbaz
# use password from file
%s --config dendrite.yaml -username alice -passwordfile my.pass
# ask user to provide password
%s --config dendrite.yaml -username alice -ask-pass
# read password from stdin
%s --config dendrite.yaml -username alice -passwordstdin < my.pass
cat my.pass | %s --config dendrite.yaml -username alice -passwordstdin
Arguments:
@ -42,11 +55,15 @@ Arguments:
var (
username = flag.String("username", "", "The username of the account to register (specify the localpart only, e.g. 'alice' for '@alice:domain.com')")
password = flag.String("password", "", "The password to associate with the account (optional, account will be password-less if not specified)")
pwdFile = flag.String("passwordfile", "", "The file to use for the password (e.g. for automated account creation)")
pwdStdin = flag.Bool("passwordstdin", false, "Reads the password from stdin")
askPass = flag.Bool("ask-pass", false, "Ask for the password to use")
)
func main() {
name := os.Args[0]
flag.Usage = func() {
fmt.Fprintf(os.Stderr, usage, os.Args[0])
_, _ = fmt.Fprintf(os.Stderr, usage, name, name, name, name, name, name)
flag.PrintDefaults()
}
cfg := setup.ParseFlags(true)
@ -56,6 +73,8 @@ func main() {
os.Exit(1)
}
pass := getPassword(password, pwdFile, pwdStdin, askPass, os.Stdin)
accountDB, err := accounts.NewDatabase(&config.DatabaseOptions{
ConnectionString: cfg.UserAPI.AccountDatabase.ConnectionString,
}, cfg.Global.ServerName, bcrypt.DefaultCost, cfg.UserAPI.OpenIDTokenLifetimeMS)
@ -63,10 +82,61 @@ func main() {
logrus.Fatalln("Failed to connect to the database:", err.Error())
}
_, err = accountDB.CreateAccount(context.Background(), *username, *password, "")
_, err = accountDB.CreateAccount(context.Background(), *username, pass, "")
if err != nil {
logrus.Fatalln("Failed to create the account:", err.Error())
}
logrus.Infoln("Created account", *username)
}
func getPassword(password, pwdFile *string, pwdStdin, askPass *bool, r io.Reader) string {
// no password option set, use empty password
if password == nil && pwdFile == nil && pwdStdin == nil && askPass == nil {
return ""
}
// password defined as parameter
if password != nil && *password != "" {
return *password
}
// read password from file
if pwdFile != nil && *pwdFile != "" {
pw, err := ioutil.ReadFile(*pwdFile)
if err != nil {
logrus.Fatalln("Unable to read password from file:", err)
}
return strings.TrimSpace(string(pw))
}
// read password from stdin
if pwdStdin != nil && *pwdStdin {
data, err := ioutil.ReadAll(r)
if err != nil {
logrus.Fatalln("Unable to read password from stdin:", err)
}
return strings.TrimSpace(string(data))
}
// ask the user to provide the password
if *askPass {
fmt.Print("Enter Password: ")
bytePassword, err := term.ReadPassword(syscall.Stdin)
if err != nil {
logrus.Fatalln("Unable to read password:", err)
}
fmt.Println()
fmt.Print("Confirm Password: ")
bytePassword2, err := term.ReadPassword(syscall.Stdin)
if err != nil {
logrus.Fatalln("Unable to read password:", err)
}
fmt.Println()
if strings.TrimSpace(string(bytePassword)) != strings.TrimSpace(string(bytePassword2)) {
logrus.Fatalln("Entered passwords don't match")
}
return strings.TrimSpace(string(bytePassword))
}
return ""
}

View file

@ -0,0 +1,62 @@
package main
import (
"bytes"
"io"
"testing"
)
func Test_getPassword(t *testing.T) {
type args struct {
password *string
pwdFile *string
pwdStdin *bool
askPass *bool
reader io.Reader
}
pass := "mySecretPass"
passwordFile := "testdata/my.pass"
passwordStdin := true
reader := &bytes.Buffer{}
_, err := reader.WriteString(pass)
if err != nil {
t.Errorf("unable to write to buffer: %+v", err)
}
tests := []struct {
name string
args args
want string
}{
{
name: "no password defined",
args: args{},
want: "",
},
{
name: "password defined",
args: args{password: &pass},
want: pass,
},
{
name: "pwdFile defined",
args: args{pwdFile: &passwordFile},
want: pass,
},
{
name: "read pass from stdin defined",
args: args{
pwdStdin: &passwordStdin,
reader: reader,
},
want: pass,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getPassword(tt.args.password, tt.args.pwdFile, tt.args.pwdStdin, tt.args.askPass, tt.args.reader); got != tt.want {
t.Errorf("getPassword() = '%v', want '%v'", got, tt.want)
}
})
}
}

1
cmd/create-account/testdata/my.pass vendored Normal file
View file

@ -0,0 +1 @@
mySecretPass

View file

@ -166,7 +166,7 @@ func main() {
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationsender.NewInternalAPI(
&base.Base, federation, rsAPI, keyRing,
&base.Base, federation, rsAPI, keyRing, true,
)
rsAPI.SetFederationSenderAPI(fsAPI)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)

View file

@ -1,14 +1,15 @@
package conn
import (
"context"
"fmt"
"net"
"net/http"
"strings"
"github.com/gorilla/websocket"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/gomatrixserverlib"
"nhooyr.io/websocket"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
@ -17,11 +18,12 @@ import (
func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
var parent net.Conn
if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") {
c, _, err := websocket.DefaultDialer.Dial(peer, nil)
ctx := context.Background()
c, _, err := websocket.Dial(ctx, peer, nil)
if err != nil {
return fmt.Errorf("websocket.DefaultDialer.Dial: %w", err)
}
parent = WrapWebSocketConn(c)
parent = websocket.NetConn(ctx, c, websocket.MessageBinary)
} else {
var err error
parent, err = net.Dial("tcp", peer)
@ -46,7 +48,13 @@ func (y *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
}
func createTransport(s *pineconeSessions.Sessions) *http.Transport {
tr := &http.Transport{}
tr := &http.Transport{
DisableKeepAlives: false,
Dial: s.Dial,
DialContext: s.DialContext,
DialTLS: s.DialTLS,
DialTLSContext: s.DialTLSContext,
}
tr.RegisterProtocol(
"matrix", &RoundTripper{
inner: &http.Transport{

View file

@ -23,7 +23,6 @@ import (
"fmt"
"io/ioutil"
"log"
"math"
"net"
"net/http"
"os"
@ -48,12 +47,11 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
"github.com/matrix-org/pinecone/router"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
pineconeTypes "github.com/matrix-org/pinecone/types"
"github.com/sirupsen/logrus"
)
@ -123,27 +121,23 @@ func main() {
pMulticast := pineconeMulticast.NewMulticast(logger, pRouter)
pMulticast.Start()
var staticPeerAttempts atomic.Uint32
var connectToStaticPeer func()
connectToStaticPeer = func() {
connectToStaticPeer := func() {
attempt := func() {
if pRouter.PeerCount(router.PeerTypeRemote) == 0 {
uri := *instancePeer
if uri == "" {
return
}
if err := conn.ConnectToPeer(pRouter, uri); err != nil {
exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc())))
time.AfterFunc(exp, connectToStaticPeer)
} else {
staticPeerAttempts.Store(0)
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
if peertype == pineconeRouter.PeerTypeRemote && err != nil {
staticPeerAttempts.Store(0)
time.AfterFunc(time.Second, connectToStaticPeer)
}
})
go connectToStaticPeer()
for {
attempt()
time.Sleep(time.Second * 5)
}
}
cfg := &config.Dendrite{}
cfg.Defaults()
@ -161,6 +155,7 @@ func main() {
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
if err := cfg.Derive(); err != nil {
panic(err)
}
@ -179,7 +174,7 @@ func main() {
)
rsAPI := rsComponent
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
base, federation, rsAPI, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
@ -217,7 +212,11 @@ func main() {
base.PublicMediaAPIMux,
)
wsUpgrader := websocket.Upgrader{}
wsUpgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool {
return true
},
}
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
@ -256,6 +255,7 @@ func main() {
Handler: pMux,
}
go connectToStaticPeer()
go func() {
pubkey := pRouter.PublicKey()
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))

View file

@ -114,7 +114,7 @@ func main() {
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
base, federation, rsAPI, keyRing, true,
)
ygg.SetSessionFunc(func(address string) {

View file

@ -99,7 +99,7 @@ func main() {
}
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
base, federation, rsAPI, keyRing, false,
)
if base.UseHTTPAPIs {
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)

View file

@ -33,7 +33,7 @@ func FederationAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
base.PublicFederationAPIMux, base.PublicKeyAPIMux,
&base.Cfg.FederationAPI, userAPI, federation, keyRing,
rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
&base.Cfg.MSCs,
&base.Cfg.MSCs, nil,
)
base.SetupAndServeHTTP(

View file

@ -28,7 +28,7 @@ func FederationSender(base *setup.BaseDendrite, cfg *config.Dendrite) {
rsAPI := base.RoomserverHTTPClient()
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
base, federation, rsAPI, keyRing, false,
)
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)

View file

@ -0,0 +1,100 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build wasm
package main
import (
"bufio"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"syscall/js"
)
// JSServer exposes an HTTP-like server interface which allows JS to 'send' requests to it.
type JSServer struct {
// The router which will service requests
Mux http.Handler
}
// OnRequestFromJS is the function that JS will invoke when there is a new request.
// The JS function signature is:
// function(reqString: string): Promise<{result: string, error: string}>
// Usage is like:
// const res = await global._go_js_server.fetch(reqString);
// if (res.error) {
// // handle error: this is a 'network' error, not a non-2xx error.
// }
// const rawHttpResponse = res.result;
func (h *JSServer) OnRequestFromJS(this js.Value, args []js.Value) interface{} {
// we HAVE to spawn a new goroutine and return immediately or else Go will deadlock
// if this request blocks at all e.g for /sync calls
httpStr := args[0].String()
promise := js.Global().Get("Promise").New(js.FuncOf(func(pthis js.Value, pargs []js.Value) interface{} {
// The initial callback code for new Promise() is also called on the critical path, which is why
// we need to put this in an immediately invoked goroutine.
go func() {
resolve := pargs[0]
resStr, err := h.handle(httpStr)
errStr := ""
if err != nil {
errStr = err.Error()
}
resolve.Invoke(map[string]interface{}{
"result": resStr,
"error": errStr,
})
}()
return nil
}))
return promise
}
// handle invokes the http.ServeMux for this request and returns the raw HTTP response.
func (h *JSServer) handle(httpStr string) (resStr string, err error) {
req, err := http.ReadRequest(bufio.NewReader(strings.NewReader(httpStr)))
if err != nil {
return
}
w := httptest.NewRecorder()
h.Mux.ServeHTTP(w, req)
res := w.Result()
var resBuffer strings.Builder
err = res.Write(&resBuffer)
return resBuffer.String(), err
}
// ListenAndServe registers a variable in JS-land with the given namespace. This variable is
// a function which JS-land can call to 'send' HTTP requests. The function is attached to
// a global object called "_go_js_server". See OnRequestFromJS for more info.
func (h *JSServer) ListenAndServe(namespace string) {
globalName := "_go_js_server"
// register a hook in JS-land for it to invoke stuff
server := js.Global().Get(globalName)
if !server.Truthy() {
server = js.Global().Get("Object").New()
js.Global().Set(globalName, server)
}
server.Set(namespace, js.FuncOf(h.OnRequestFromJS))
fmt.Printf("Listening for requests from JS on function %s.%s\n", globalName, namespace)
// Block forever to mimic http.ListenAndServe
select {}
}

View file

@ -0,0 +1,256 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build wasm
package main
import (
"crypto/ed25519"
"encoding/hex"
"fmt"
"log"
"os"
"syscall/js"
"time"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
_ "github.com/matrix-org/go-sqlite3-js"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
var GitCommit string
func init() {
fmt.Printf("[%s] dendrite.js starting...\n", GitCommit)
}
const publicPeer = "wss://pinecone.matrix.org/public"
const keyNameEd25519 = "_go_ed25519_key"
func readKeyFromLocalStorage() (key ed25519.PrivateKey, err error) {
localforage := js.Global().Get("localforage")
if !localforage.Truthy() {
err = fmt.Errorf("readKeyFromLocalStorage: no localforage")
return
}
// https://localforage.github.io/localForage/
item, ok := await(localforage.Call("getItem", keyNameEd25519))
if !ok || !item.Truthy() {
err = fmt.Errorf("readKeyFromLocalStorage: no key in localforage")
return
}
fmt.Println("Found key in localforage")
// extract []byte and make an ed25519 key
seed := make([]byte, 32, 32)
js.CopyBytesToGo(seed, item)
return ed25519.NewKeyFromSeed(seed), nil
}
func writeKeyToLocalStorage(key ed25519.PrivateKey) error {
localforage := js.Global().Get("localforage")
if !localforage.Truthy() {
return fmt.Errorf("writeKeyToLocalStorage: no localforage")
}
// make a Uint8Array from the key's seed
seed := key.Seed()
jsSeed := js.Global().Get("Uint8Array").New(len(seed))
js.CopyBytesToJS(jsSeed, seed)
// write it
localforage.Call("setItem", keyNameEd25519, jsSeed)
return nil
}
// taken from https://go-review.googlesource.com/c/go/+/150917
// await waits until the promise v has been resolved or rejected and returns the promise's result value.
// The boolean value ok is true if the promise has been resolved, false if it has been rejected.
// If v is not a promise, v itself is returned as the value and ok is true.
func await(v js.Value) (result js.Value, ok bool) {
if v.Type() != js.TypeObject || v.Get("then").Type() != js.TypeFunction {
return v, true
}
done := make(chan struct{})
onResolve := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
result = args[0]
ok = true
close(done)
return nil
})
defer onResolve.Release()
onReject := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
result = args[0]
ok = false
close(done)
return nil
})
defer onReject.Release()
v.Call("then", onResolve, onReject)
<-done
return
}
func generateKey() ed25519.PrivateKey {
// attempt to look for a seed in JS-land and if it exists use it.
priv, err := readKeyFromLocalStorage()
if err == nil {
fmt.Println("Read key from localStorage")
return priv
}
// generate a new key
fmt.Println(err, " : Generating new ed25519 key")
_, priv, err = ed25519.GenerateKey(nil)
if err != nil {
logrus.Fatalf("Failed to generate ed25519 key: %s", err)
}
if err := writeKeyToLocalStorage(priv); err != nil {
fmt.Println("failed to write key to localStorage: ", err)
// non-fatal, we'll just have amnesia for a while
}
return priv
}
func main() {
sk := generateKey()
pk := sk.Public().(ed25519.PublicKey)
logger := log.New(os.Stdout, "", 0)
pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil)
pSessions := pineconeSessions.NewSessions(logger, pRouter)
cfg := &config.Dendrite{}
cfg.Defaults()
cfg.UserAPI.AccountDatabase.ConnectionString = "file:/idb/dendritejs_account.db"
cfg.AppServiceAPI.Database.ConnectionString = "file:/idb/dendritejs_appservice.db"
cfg.UserAPI.DeviceDatabase.ConnectionString = "file:/idb/dendritejs_device.db"
cfg.FederationSender.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db"
cfg.MediaAPI.Database.ConnectionString = "file:/idb/dendritejs_mediaapi.db"
cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db"
cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db"
cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db"
cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db"
cfg.Global.Kafka.UseNaffka = true
cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db"
cfg.Global.TrustedIDServers = []string{}
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
cfg.Global.PrivateKey = sk
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
if err := cfg.Derive(); err != nil {
logrus.Fatalf("Failed to derive values from config: %s", err)
}
base := setup.NewBaseDendrite(cfg, "Monolith", false)
defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB()
federation := conn.CreateFederationClient(base, pSessions)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
rsAPI := roomserver.NewInternalAPI(base, keyRing)
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, keyRing, true)
rsAPI.SetFederationSenderAPI(fedSenderAPI)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
Client: conn.CreateClient(base, pSessions),
FedClient: federation,
KeyRing: keyRing,
AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI,
FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation),
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
base.PublicMediaAPIMux,
)
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
p2pRouter := pSessions.HTTP().Mux()
p2pRouter.Handle(httputil.PublicFederationPathPrefix, base.PublicFederationAPIMux)
p2pRouter.Handle(httputil.PublicMediaPathPrefix, base.PublicMediaAPIMux)
// Expose the matrix APIs via fetch - for local traffic
go func() {
logrus.Info("Listening for service-worker fetch traffic")
s := JSServer{
Mux: httpRouter,
}
s.ListenAndServe("fetch")
}()
// Connect to the static peer
go func() {
for {
if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
if err := conn.ConnectToPeer(pRouter, publicPeer); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
select {
case <-base.ProcessContext.Context().Done():
return
case <-time.After(time.Second * 5):
}
}
}()
// We want to block forever to let the fetch and libp2p handler serve the APIs
select {}
}

View file

@ -0,0 +1,23 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package main
import "fmt"
func main() {
fmt.Println("dendritejs: no-op when not compiling for WebAssembly")
}

View file

@ -210,7 +210,7 @@ func main() {
base, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing)
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing, true)
rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)

View file

@ -1,6 +1,6 @@
server {
listen 443 ssl; # IPv4
listen [::]:443; # IPv6
listen [::]:443 ssl; # IPv6
server_name my.hostname.com;
ssl_certificate /path/to/fullchain.pem;
@ -16,6 +16,9 @@ server {
}
location /.well-known/matrix/client {
# If your sever_name here doesn't match your matrix homeserver URL
# (e.g. hostname.com as server_name and matrix.hostname.com as homeserver URL)
# add_header Access-Control-Allow-Origin '*';
return 200 '{ "m.homeserver": { "base_url": "https://my.hostname.com" } }';
}

View file

@ -1,6 +1,6 @@
server {
listen 443 ssl; # IPv4
listen [::]:443; # IPv6
listen [::]:443 ssl; # IPv6
server_name my.hostname.com;
ssl_certificate /path/to/fullchain.pem;
@ -16,6 +16,9 @@ server {
}
location /.well-known/matrix/client {
# If your sever_name here doesn't match your matrix homeserver URL
# (e.g. hostname.com as server_name and matrix.hostname.com as homeserver URL)
# add_header Access-Control-Allow-Origin '*';
return 200 '{ "m.homeserver": { "base_url": "https://my.hostname.com" } }';
}

View file

@ -0,0 +1,11 @@
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
)
type ServersInRoomProvider interface {
GetServersForRoom(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName
}

View file

@ -17,6 +17,7 @@ package federationapi
import (
"github.com/gorilla/mux"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -39,10 +40,12 @@ func AddPublicRoutes(
eduAPI eduserverAPI.EDUServerInputAPI,
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) {
routing.Setup(
fedRouter, keyRouter, cfg, rsAPI,
eduAPI, federationSenderAPI, keyRing,
federation, userAPI, keyAPI, mscCfg,
servers,
)
}

View file

@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationSenderHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs)
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))

View file

@ -20,6 +20,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil"
@ -50,6 +51,7 @@ func Setup(
userAPI userapi.UserInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) {
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
@ -99,7 +101,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu,
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers,
)
},
)).Methods(http.MethodPut, http.MethodOptions)

View file

@ -16,16 +16,16 @@ package routing
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
@ -34,6 +34,7 @@ import (
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
const (
@ -88,6 +89,67 @@ func init() {
)
}
type sendFIFOQueue struct {
tasks []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}
func newSendFIFOQueue() *sendFIFOQueue {
q := &sendFIFOQueue{
notifs: make(chan struct{}, 1),
}
return q
}
func (q *sendFIFOQueue) push(frame *inputTask) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.tasks = append(q.tasks, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
}
// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
func (q *sendFIFOQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.tasks[0]
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.tasks = nil
}
return frame, true
}
type inputTask struct {
ctx context.Context
t *txnReq
event *gomatrixserverlib.Event
wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done
duration time.Duration // written back by worker, only safe to read when all tasks are done
}
type inputWorker struct {
running atomic.Bool
input *sendFIFOQueue
}
var inputWorkers sync.Map // room ID -> *inputWorker
// Send implements /_matrix/federation/v1/send/{txnID}
func Send(
httpReq *http.Request,
@ -100,14 +162,16 @@ func Send(
keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient,
mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
) util.JSONResponse {
t := txnReq{
rsAPI: rsAPI,
eduAPI: eduAPI,
keys: keys,
federation: federation,
hadEvents: make(map[string]bool),
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool),
servers: servers,
keyAPI: keyAPI,
roomsMu: mu,
}
@ -164,15 +228,15 @@ type txnReq struct {
keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
servers []gomatrixserverlib.ServerName
serversMutex sync.RWMutex
roomsMu *internal.MutexByRoom
// something that can tell us about which servers are in a room right now
servers federationAPI.ServersInRoomProvider
// a list of events from the auth and prev events which we already had
hadEvents map[string]bool
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
// new events which the roomserver does not know about
newEvents map[string]bool
newEventsMutex sync.RWMutex
haveEventsMutex sync.Mutex
work string // metrics
}
@ -189,8 +253,12 @@ type txnFederationClient interface {
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult)
//var resultsMutex sync.Mutex
var wg sync.WaitGroup
var tasks []*inputTask
wg.Add(1) // for processEDUs
pdus := []*gomatrixserverlib.HeaderedEvent{}
for _, pdu := range t.PDUs {
pduCountTotal.WithLabelValues("total").Inc()
var header struct {
@ -241,83 +309,97 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
}
continue
}
pdus = append(pdus, event.Headered(verRes.RoomVersion))
}
// Process the events.
for _, e := range pdus {
evStart := time.Now()
if err := t.processEvent(ctx, e.Unwrap()); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
jsonErr := util.ErrorResponse(err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
return nil, &jsonErr
} else {
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := ""
outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected {
errMsg = err.Error()
outcome = MetricsOutcomeFail
}
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping",
)
processEventSummary.WithLabelValues(t.work, outcome).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg,
}
}
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
}
v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{
input: newSendFIFOQueue(),
})
worker := v.(*inputWorker)
if !worker.running.Load() {
go worker.run()
}
wg.Add(1)
task := &inputTask{
ctx: ctx,
t: t,
event: event,
wg: &wg,
}
tasks = append(tasks, task)
worker.input.push(task)
}
go func() {
defer wg.Done()
t.processEDUs(ctx)
}()
wg.Wait()
for _, task := range tasks {
if task.err != nil {
results[task.event.EventID()] = gomatrixserverlib.PDUResult{
Error: task.err.Error(),
}
} else {
results[task.event.EventID()] = gomatrixserverlib.PDUResult{}
}
}
if c := len(results); c > 0 {
util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID)
}
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
// isProcessingErrorFatal returns true if the error is really bad and
// we should stop processing the transaction, and returns false if it
// is just some less serious error about a specific event.
func isProcessingErrorFatal(err error) bool {
switch err {
case sql.ErrConnDone:
case sql.ErrTxDone:
return true
func (t *inputWorker) run() {
if !t.running.CAS(false, true) {
return
}
defer t.running.Store(false)
for {
task, ok := t.input.pop()
if !ok {
return
}
if task == nil {
continue
}
func() {
defer task.wg.Done()
select {
case <-task.ctx.Done():
task.err = context.DeadlineExceeded
return
default:
evStart := time.Now()
task.err = task.t.processEvent(task.ctx, task.event)
task.duration = time.Since(evStart)
if err := task.err; err != nil {
switch err.(type) {
case *gomatrixserverlib.NotAllowed:
processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeRejected).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", true).Warn(
"Failed to process incoming federation event, skipping",
)
task.err = nil // make "rejected" failures silent
default:
processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeFail).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", false).Warn(
"Failed to process incoming federation event, skipping",
)
}
} else {
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
}
}
}()
}
return false
}
type roomNotFoundError struct {
@ -340,19 +422,6 @@ func (e missingPrevEventsError) Error() string {
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
}
func (t *txnReq) haveEventIDs() map[string]bool {
t.newEventsMutex.RLock()
defer t.newEventsMutex.RUnlock()
result := make(map[string]bool, len(t.haveEvents))
for eventID := range t.haveEvents {
if t.newEvents[eventID] {
continue
}
result[eventID] = true
}
return result
}
func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs {
eduCountTotal.Inc()
@ -479,22 +548,24 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
}
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
t.serversMutex.Lock()
defer t.serversMutex.Unlock()
func (t *txnReq) getServers(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName {
// The server that sent us the event should be sufficient to tell us about missing
// prev and auth events.
servers := []gomatrixserverlib.ServerName{t.Origin}
// If the event origin is different to the transaction origin then we can use
// this as a last resort. The origin server that created the event would have
// had to know the auth and prev events.
if event != nil {
if origin := event.Origin(); origin != t.Origin {
servers = append(servers, origin)
}
}
// If a specific room-to-server provider exists then use that. This will primarily
// be used for the P2P demos.
if t.servers != nil {
return t.servers
servers = append(servers, t.servers.GetServersForRoom(ctx, roomID, event)...)
}
t.servers = []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID,
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
t.servers = append(t.servers, serverRes.ServerNames...)
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
}
return t.servers
return servers
}
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
@ -527,6 +598,15 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
return roomNotFoundError{e.RoomID()}
}
// Prepare a map of all the events we already had before this point, so
// that we don't send them to the roomserver again.
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
t.hadEvents[eventID] = true
}
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
t.hadEvents[eventID] = false
}
if len(stateResp.MissingAuthEventIDs) > 0 {
t.work = MetricsWorkMissingAuthEvents
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
@ -570,11 +650,14 @@ func (t *txnReq) retrieveMissingAuthEvents(
withNextEvent:
for missingAuthEventID := range missingAuthEvents {
withNextServer:
for _, server := range t.getServers(ctx, e.RoomID()) {
for _, server := range t.getServers(ctx, e.RoomID(), e) {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil {
logger.WithError(err).Warnf("Failed to retrieve auth event %q", missingAuthEventID)
if errors.Is(err, context.DeadlineExceeded) {
return err
}
continue withNextServer
}
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
@ -596,6 +679,8 @@ withNextEvent:
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
}
t.hadEvents[ev.EventID()] = true // if the roomserver didn't know about the event before, it does now
t.cacheAndReturn(ev.Headered(stateResp.RoomVersion))
delete(missingAuthEvents, missingAuthEventID)
continue withNextEvent
}
@ -618,14 +703,14 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
return gomatrixserverlib.Allowed(e, &authUsingState)
}
var processEventWithMissingStateMutexes = internal.NewMutexByRoom()
func (t *txnReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) error {
// Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote
// side retries, we'll have fetched the missing state.
gmectx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
processEventWithMissingStateMutexes.Lock(e.RoomID())
defer processEventWithMissingStateMutexes.Unlock(e.RoomID())
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@ -646,7 +731,7 @@ func (t *txnReq) processEventWithMissingState(
// - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
newEvents, err := t.getMissingEvents(gmectx, e, roomVersion)
newEvents, err := t.getMissingEvents(ctx, e, roomVersion)
if err != nil {
return err
}
@ -673,7 +758,7 @@ func (t *txnReq) processEventWithMissingState(
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
prevState, trustworthy, lerr := t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
if lerr != nil {
util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return lerr
@ -717,7 +802,7 @@ func (t *txnReq) processEventWithMissingState(
}
// There's more than one previous state - run them all through state res
t.roomsMu.Lock(e.RoomID())
resolvedState, err = t.resolveStatesAndCheck(gmectx, roomVersion, respStates, backwardsExtremity)
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity)
t.roomsMu.Unlock(e.RoomID())
if err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
@ -734,7 +819,7 @@ func (t *txnReq) processEventWithMissingState(
api.KindOld,
resolvedState,
backwardsExtremity.Headered(roomVersion),
t.haveEventIDs(),
t.hadEvents,
)
if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err)
@ -786,7 +871,7 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
default:
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
}
t.cacheAndReturn(h)
h = t.cacheAndReturn(h)
if h.StateKey() != nil {
addedToState := false
for i := range respState.StateEvents {
@ -806,6 +891,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
}
func (t *txnReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent {
t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock()
if cached, exists := t.haveEvents[ev.EventID()]; exists {
return cached
}
@ -828,6 +915,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
// processEvent request, which is better for memory.
stateEvents[i] = t.cacheAndReturn(ev)
t.hadEvents[ev.EventID()] = true
}
// we should never access res.StateEvents again so we delete it here to make GC faster
res.StateEvents = nil
@ -835,6 +923,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
var authEvents []*gomatrixserverlib.Event
missingAuthEvents := map[string]bool{}
for _, ev := range stateEvents {
t.haveEventsMutex.Lock()
for _, ae := range ev.AuthEventIDs() {
if aev, ok := t.haveEvents[ae]; ok {
authEvents = append(authEvents, aev.Unwrap())
@ -842,6 +931,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
missingAuthEvents[ae] = true
}
}
t.haveEventsMutex.Unlock()
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
@ -858,8 +948,9 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil
}
for i := range queryRes.Events {
for i, ev := range queryRes.Events {
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
t.hadEvents[ev.EventID()] = true
}
queryRes.Events = nil
}
@ -934,12 +1025,13 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
return nil, err
}
latestEvents := make([]string, len(res.LatestEvents))
for i := range res.LatestEvents {
for i, ev := range res.LatestEvents {
latestEvents[i] = res.LatestEvents[i].EventID
t.hadEvents[ev.EventID] = true
}
var missingResp *gomatrixserverlib.RespMissingEvents
servers := t.getServers(ctx, e.RoomID())
servers := t.getServers(ctx, e.RoomID(), e)
for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
@ -953,6 +1045,9 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
break
} else {
logger.WithError(err).Errorf("%s pushed us an event but %q did not respond to /get_missing_events", t.Origin, server)
if errors.Is(err, context.DeadlineExceeded) {
break
}
}
}
@ -980,6 +1075,12 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
// For now, we do not allow Case B, so reject the event.
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
// Make sure events from the missingResp are using the cache - missing events
// will be added and duplicates will be removed.
for i, ev := range missingResp.Events {
missingResp.Events[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
// topologically sort and sanity check that we are making forward progress
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
shouldHaveSomeEventIDs := e.PrevEventIDs()
@ -1018,6 +1119,14 @@ func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID
if err := state.Check(ctx, t.keys, nil); err != nil {
return nil, err
}
// Cache the results of this state lookup and deduplicate anything we already
// have in the cache, freeing up memory.
for i, ev := range state.AuthEvents {
state.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
for i, ev := range state.StateEvents {
state.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
return &state, nil
}
@ -1033,6 +1142,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...)
missing := make(map[string]bool)
var missingEventList []string
t.haveEventsMutex.Lock()
for _, sid := range wantIDs {
if _, ok := t.haveEvents[sid]; !ok {
if !missing[sid] {
@ -1041,6 +1151,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
}
}
}
t.haveEventsMutex.Unlock()
// fetch as many as we can from the roomserver
queryReq := api.QueryEventsByIDRequest{
@ -1050,9 +1161,10 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil, err
}
for i := range queryRes.Events {
for i, ev := range queryRes.Events {
queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i])
t.hadEvents[ev.EventID()] = true
evID := queryRes.Events[i].EventID()
t.cacheAndReturn(queryRes.Events[i])
if missing[evID] {
delete(missing, evID)
}
@ -1153,6 +1265,9 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
*gomatrixserverlib.RespState, error) { // nolint:unparam
t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock()
// create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib.RespState{}
@ -1193,11 +1308,14 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
}
var event *gomatrixserverlib.Event
found := false
servers := t.getServers(ctx, roomID)
servers := t.getServers(ctx, roomID, nil)
for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("Failed to get missing /event for event ID")
if errors.Is(err, context.DeadlineExceeded) {
break
}
continue
}
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(txn.PDUs[0], roomVersion)
@ -1216,9 +1334,5 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, verifySigError{event.EventID(), err}
}
h := event.Headered(roomVersion)
t.newEventsMutex.Lock()
t.newEvents[h.EventID()] = true
t.newEventsMutex.Unlock()
return h, nil
return t.cacheAndReturn(event.Headered(roomVersion)), nil
}

View file

@ -370,7 +370,7 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat
keys: &test.NopJSONVerifier{},
federation: fedClient,
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool),
hadEvents: make(map[string]bool),
roomsMu: internal.NewMutexByRoom(),
}
t.PDUs = pdus

View file

@ -43,6 +43,7 @@ func NewInternalAPI(
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI,
keyRing *gomatrixserverlib.KeyRing,
resetBlacklist bool,
) api.FederationSenderInternalAPI {
cfg := &base.Cfg.FederationSender
@ -51,6 +52,10 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
if resetBlacklist {
_ = federationSenderDB.RemoveAllServersFromBlacklist()
}
stats := &statistics.Statistics{
DB: federationSenderDB,
FailuresUntilBlacklist: cfg.FederationMaxRetries,

View file

@ -572,6 +572,7 @@ func (r *FederationSenderInternalAPI) PerformServersAlive(
response *api.PerformServersAliveResponse,
) (err error) {
for _, srv := range request.Servers {
_ = r.db.RemoveServerFromBlacklist(srv)
r.queues.RetryServer(srv)
}

View file

@ -54,6 +54,7 @@ type Database interface {
// these don't have contexts passed in as we want things to happen regardless of the request context
AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
RemoveAllServersFromBlacklist() error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error

View file

@ -40,11 +40,15 @@ const selectBlacklistSQL = "" +
const deleteBlacklistSQL = "" +
"DELETE FROM federationsender_blacklist WHERE server_name = $1"
const deleteAllBlacklistSQL = "" +
"TRUNCATE federationsender_blacklist"
type blacklistStatements struct {
db *sql.DB
insertBlacklistStmt *sql.Stmt
selectBlacklistStmt *sql.Stmt
deleteBlacklistStmt *sql.Stmt
deleteAllBlacklistStmt *sql.Stmt
}
func NewPostgresBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
@ -65,11 +69,12 @@ func NewPostgresBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
if s.deleteBlacklistStmt, err = db.Prepare(deleteBlacklistSQL); err != nil {
return
}
if s.deleteAllBlacklistStmt, err = db.Prepare(deleteAllBlacklistSQL); err != nil {
return
}
return
}
// insertRoom inserts the room if it didn't already exist.
// If the room didn't exist then last_event_id is set to the empty string.
func (s *blacklistStatements) InsertBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error {
@ -78,9 +83,6 @@ func (s *blacklistStatements) InsertBlacklist(
return err
}
// selectRoomForUpdate locks the row for the room and returns the last_event_id.
// The row must already exist in the table. Callers can ensure that the row
// exists by calling insertRoom first.
func (s *blacklistStatements) SelectBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (bool, error) {
@ -96,8 +98,6 @@ func (s *blacklistStatements) SelectBlacklist(
return res.Next(), nil
}
// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
// have already been called earlier within the transaction.
func (s *blacklistStatements) DeleteBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error {
@ -105,3 +105,11 @@ func (s *blacklistStatements) DeleteBlacklist(
_, err := stmt.ExecContext(ctx, serverName)
return err
}
func (s *blacklistStatements) DeleteAllBlacklist(
ctx context.Context, txn *sql.Tx,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteAllBlacklistStmt)
_, err := stmt.ExecContext(ctx)
return err
}

View file

@ -148,6 +148,12 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server
})
}
func (d *Database) RemoveAllServersFromBlacklist() error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderBlacklist.DeleteAllBlacklist(context.TODO(), txn)
})
}
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
}

View file

@ -40,11 +40,15 @@ const selectBlacklistSQL = "" +
const deleteBlacklistSQL = "" +
"DELETE FROM federationsender_blacklist WHERE server_name = $1"
const deleteAllBlacklistSQL = "" +
"DELETE FROM federationsender_blacklist"
type blacklistStatements struct {
db *sql.DB
insertBlacklistStmt *sql.Stmt
selectBlacklistStmt *sql.Stmt
deleteBlacklistStmt *sql.Stmt
deleteAllBlacklistStmt *sql.Stmt
}
func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
@ -65,11 +69,12 @@ func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
if s.deleteBlacklistStmt, err = db.Prepare(deleteBlacklistSQL); err != nil {
return
}
if s.deleteAllBlacklistStmt, err = db.Prepare(deleteAllBlacklistSQL); err != nil {
return
}
return
}
// insertRoom inserts the room if it didn't already exist.
// If the room didn't exist then last_event_id is set to the empty string.
func (s *blacklistStatements) InsertBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error {
@ -78,9 +83,6 @@ func (s *blacklistStatements) InsertBlacklist(
return err
}
// selectRoomForUpdate locks the row for the room and returns the last_event_id.
// The row must already exist in the table. Callers can ensure that the row
// exists by calling insertRoom first.
func (s *blacklistStatements) SelectBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (bool, error) {
@ -96,8 +98,6 @@ func (s *blacklistStatements) SelectBlacklist(
return res.Next(), nil
}
// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
// have already been called earlier within the transaction.
func (s *blacklistStatements) DeleteBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error {
@ -105,3 +105,11 @@ func (s *blacklistStatements) DeleteBlacklist(
_, err := stmt.ExecContext(ctx, serverName)
return err
}
func (s *blacklistStatements) DeleteAllBlacklist(
ctx context.Context, txn *sql.Tx,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteAllBlacklistStmt)
_, err := stmt.ExecContext(ctx)
return err
}

View file

@ -60,6 +60,7 @@ type FederationSenderBlacklist interface {
InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
DeleteAllBlacklist(ctx context.Context, txn *sql.Tx) error
}
type FederationSenderOutboundPeeks interface {

12
go.mod
View file

@ -23,11 +23,11 @@ require (
github.com/lucas-clemente/quic-go v0.19.3
github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
github.com/matrix-org/pinecone v0.0.0-20210510160342-a1dfbcf4bd47
github.com/matrix-org/gomatrixserverlib v0.0.0-20210702152949-0cac5159e7d6
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
github.com/nats-io/nats.go v1.11.0
@ -46,10 +46,12 @@ require (
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa
go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08
golang.org/x/net v0.0.0-20210525063256-abc453219eb5
golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69 // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
gopkg.in/h2non/bimg.v1 v1.1.5
gopkg.in/yaml.v2 v2.4.0
nhooyr.io/websocket v1.8.7
)
go 1.14

70
go.sum
View file

@ -13,6 +13,7 @@ github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0 h1:p3puK8Sl2xK+2Fnn
github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno=
github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
@ -56,9 +57,7 @@ github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgw
github.com/anacrolix/missinggo v1.1.2-0.20190815015349-b888af804467/go.mod h1:MBJu3Sk/k3ZfGYcS7z18gwfu72Ey/xopPFJJbTi5yIo=
github.com/anacrolix/missinggo v1.2.1 h1:0IE3TqX5y5D0IxeMwTyIgqdDew4QrzcXaaEnJQyjHvw=
github.com/anacrolix/missinggo v1.2.1/go.mod h1:J5cMhif8jPmFoC3+Uvob3OXXNIhOUikzMt+uUjeM21Y=
github.com/anacrolix/missinggo/perf v1.0.0 h1:7ZOGYziGEBytW49+KmYGTaNfnwUqP1HBsy6BqESAJVw=
github.com/anacrolix/missinggo/perf v1.0.0/go.mod h1:ljAFWkBuzkO12MQclXzZrosP5urunoLS0Cbvb4V0uMQ=
github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY=
github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/utp v0.1.0/go.mod h1:MDwc+vsGEq7RMw6lr2GKOEqjWny5hO5OZXRVNaBJ2Dk=
@ -186,7 +185,11 @@ github.com/getsentry/sentry-go v0.10.0 h1:6gwY+66NHKqyZrdi6O2jGdo7wGdo9b3B69E01N
github.com/getsentry/sentry-go v0.10.0/go.mod h1:kELm/9iCblqUYh+ZRML7PNdCvEuw24wBvJPYyi86cws=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
@ -200,10 +203,21 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@ -228,6 +242,7 @@ github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -421,6 +436,7 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
@ -442,6 +458,7 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
@ -465,6 +482,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8=
github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
@ -698,17 +717,17 @@ github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b h1:xpcmnpfUImRC4
github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b/go.mod h1:NgPCr+UavRGH6n5jmdX8DuqFZ4JiCWIJoZiuhTRLSUg=
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 h1:eqE5OnGx9ZMWmrRbD3KF/3KtTunw0iQulI7YxOIdxo4=
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4/go.mod h1:3WluEZ9QXSwU30tWYqktnpC1x9mwZKx1r8uAv8Iq+a4=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf/iHhWlLWd+kCgG+Fsg4Dc+xBl7hptfK7lD0zY=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8 h1:/FKUeUlCATr1gXxYqlaJgH8FW/sw0Jz8t7s8BIlECfg=
github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c h1:vW3jBp1PnZ4uUqyS+JatJRYIl73ZLHkUkRzf1JdGxOI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161 h1:h1XVh05pLoC+nJjP3GIpj5wUsuC8WdHP3He0RTkRJTs=
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20210510160342-a1dfbcf4bd47 h1:kWLnEcQoqJWlBtgtooa7ixyGNL3fy4CsyfR8M6b8vdI=
github.com/matrix-org/pinecone v0.0.0-20210510160342-a1dfbcf4bd47/go.mod h1:/75aR0l7umUnMBk02Q5+QC2IPY58S21BLpoPR9BUr0k=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210702152949-0cac5159e7d6 h1:Rm5saCQeFfS15T0uD6qanSpx2kj/PfPBogbj5kFfQ+4=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210702152949-0cac5159e7d6/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b h1:5X5vdWQ13xrNkJVqaJHPsrt7rKkMJH5iac0EtfOuxSg=
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b/go.mod h1:CVlrvs1R5iz7Omy2GqAjJJKbACn07GZgUq1Gli18FYE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
@ -722,6 +741,8 @@ github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.2/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
@ -759,8 +780,10 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
@ -846,8 +869,8 @@ github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/neilalexander/utp v0.1.1-0.20210510143443-c1bac1cd577f h1:xcJVva0Ziw+Ud4AaY/g9OMNc7veEfsYVox3eItY2w8Q=
github.com/neilalexander/utp v0.1.1-0.20210510143443-c1bac1cd577f/go.mod h1:ylsx0342RjGHjOoVKhR/wz/7Lhiusonihfj4QLxEMcU=
github.com/neilalexander/utp v0.1.1-0.20210622132614-ee9a34a30488 h1:xZk82i6JK2d0SqRIXwaxj7J/NQB6ngq0PuMx3wXBaRQ=
github.com/neilalexander/utp v0.1.1-0.20210622132614-ee9a34a30488/go.mod h1:NPHGhPc0/wudcaCqL/H5AOddkRf8GPRhzOujuUKGQu8=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6 h1:evlcQnJY+v8XRRchV3hXzpHDl6GcEZeLXAhlH9Csdww=
@ -1040,8 +1063,10 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
@ -1053,7 +1078,9 @@ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/vishvananda/netlink v1.0.0/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
@ -1140,11 +1167,15 @@ golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -1152,8 +1183,13 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08 h1:h+GZ3ubjuWaQjGe8owMGcmMVCqs0xYJtRG5y2bpHaqU=
golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -1228,6 +1264,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190526052359-791d8a0f4d09/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1241,6 +1278,7 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1250,12 +1288,13 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201231184435-2d18734c6014/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309040221-94ec62e08169/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -1280,6 +1319,7 @@ golang.org/x/tools v0.0.0-20181221001348-537d06c36207/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190327201419-c70d86f8b7cf/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@ -1289,6 +1329,7 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69 h1:yBHHx+XZqXJBm6Exke3N7V9gnlsyXxoCPEb1yVenjfk=
@ -1300,6 +1341,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.zx2c4.com/wireguard v0.0.0-20210203165646-9c7bd73be2cc/go.mod h1:r0ExowOoGFfDoLDxx+M9SYbNVsoZ0xviLL+K4f2mt+A=
golang.zx2c4.com/wireguard v0.0.0-20210212170059-7a0fb5bbb172/go.mod h1:r0ExowOoGFfDoLDxx+M9SYbNVsoZ0xviLL+K4f2mt+A=
golang.zx2c4.com/wireguard v0.0.0-20210510202332-9844c74f67ec/go.mod h1:a057zjmoc00UN7gVkaJt2sXVK523kMJcogDTEvPIasg=
golang.zx2c4.com/wireguard/windows v0.3.5/go.mod h1:ATrIFNoq3rsK735WJiQzfWYyNFc9xLBhMMjW9DWIvnU=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
@ -1397,6 +1439,8 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=

View file

@ -20,6 +20,7 @@ import (
"encoding/json"
"time"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/api"
@ -47,7 +48,7 @@ const upsertKeysSQL = "" +
" DO UPDATE SET key_json = $6"
const selectKeysSQL = "" +
"SELECT key_id, algorithm, key_json FROM keyserver_one_time_keys WHERE user_id=$1 AND device_id=$2"
"SELECT concat(algorithm, ':', key_id) as algorithmwithid, key_json FROM keyserver_one_time_keys WHERE user_id=$1 AND device_id=$2 AND concat(algorithm, ':', key_id) = ANY($3);"
const selectKeysCountSQL = "" +
"SELECT algorithm, COUNT(key_id) FROM keyserver_one_time_keys WHERE user_id=$1 AND device_id=$2 GROUP BY algorithm"
@ -94,29 +95,22 @@ func NewPostgresOneTimeKeysTable(db *sql.DB) (tables.OneTimeKeys, error) {
}
func (s *oneTimeKeysStatements) SelectOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) {
rows, err := s.selectKeysStmt.QueryContext(ctx, userID, deviceID)
rows, err := s.selectKeysStmt.QueryContext(ctx, userID, deviceID, pq.Array(keyIDsWithAlgorithms))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectKeysStmt: rows.close() failed")
wantSet := make(map[string]bool, len(keyIDsWithAlgorithms))
for _, ka := range keyIDsWithAlgorithms {
wantSet[ka] = true
}
result := make(map[string]json.RawMessage)
var (
algorithmWithID string
keyJSONStr string
)
for rows.Next() {
var keyID string
var algorithm string
var keyJSONStr string
if err := rows.Scan(&keyID, &algorithm, &keyJSONStr); err != nil {
if err := rows.Scan(&algorithmWithID, &keyJSONStr); err != nil {
return nil, err
}
keyIDWithAlgo := algorithm + ":" + keyID
if wantSet[keyIDWithAlgo] {
result[keyIDWithAlgo] = json.RawMessage(keyJSONStr)
}
result[algorithmWithID] = json.RawMessage(keyJSONStr)
}
return result, rows.Err()
}

View file

@ -685,7 +685,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string,
r.Logger.WithError(parseErr).Warn("Failed to parse content length")
return 0, nil, fmt.Errorf("strconv.ParseInt: %w", parseErr)
}
if parsedLength > int64(maxFileSizeBytes) {
if maxFileSizeBytes > 0 && parsedLength > int64(maxFileSizeBytes) {
return 0, nil, fmt.Errorf(
"remote file size (%d bytes) exceeds locally configured max media size (%d bytes)",
parsedLength, maxFileSizeBytes,

View file

@ -147,7 +147,20 @@ func (r *uploadRequest) doUpload(
// r.storeFileAndMetadata(ctx, tmpDir, ...)
// before you return from doUpload else we will leak a temp file. We could make this nicer with a `WithTransaction` style of
// nested function to guarantee either storage or cleanup.
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, reqReader, cfg.AbsBasePath)
// should not happen, but prevents any int overflows
if cfg.MaxFileSizeBytes != nil && *cfg.MaxFileSizeBytes+1 <= 0 {
r.Logger.WithFields(log.Fields{
"MaxFileSizeBytes": *cfg.MaxFileSizeBytes + 1,
}).Error("Error while transferring file, configured max_file_size_bytes overflows int64")
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown("Failed to upload"),
}
}
lr := io.LimitReader(reqReader, int64(*cfg.MaxFileSizeBytes)+1)
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, lr, cfg.AbsBasePath)
if err != nil {
r.Logger.WithError(err).WithFields(log.Fields{
"MaxFileSizeBytes": *cfg.MaxFileSizeBytes,

View file

@ -0,0 +1,132 @@
package routing
import (
"context"
"io"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"github.com/matrix-org/dendrite/mediaapi/fileutils"
"github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
func Test_uploadRequest_doUpload(t *testing.T) {
type fields struct {
MediaMetadata *types.MediaMetadata
Logger *log.Entry
}
type args struct {
ctx context.Context
reqReader io.Reader
cfg *config.MediaAPI
db storage.Database
activeThumbnailGeneration *types.ActiveThumbnailGeneration
}
wd, err := os.Getwd()
if err != nil {
t.Errorf("failed to get current working directory: %v", err)
}
maxSize := config.FileSizeBytes(8)
logger := log.New().WithField("mediaapi", "test")
testdataPath := filepath.Join(wd, "./testdata")
cfg := &config.MediaAPI{
MaxFileSizeBytes: &maxSize,
BasePath: config.Path(testdataPath),
AbsBasePath: config.Path(testdataPath),
DynamicThumbnails: false,
}
// create testdata folder and remove when done
_ = os.Mkdir(testdataPath, os.ModePerm)
defer fileutils.RemoveDir(types.Path(testdataPath), nil)
db, err := storage.Open(&config.DatabaseOptions{
ConnectionString: "file::memory:?cache=shared",
MaxOpenConnections: 100,
MaxIdleConnections: 2,
ConnMaxLifetimeSeconds: -1,
})
if err != nil {
t.Errorf("error opening mediaapi database: %v", err)
}
tests := []struct {
name string
fields fields
args args
want *util.JSONResponse
}{
{
name: "upload ok",
args: args{
ctx: context.Background(),
reqReader: strings.NewReader("test"),
cfg: cfg,
db: db,
},
fields: fields{
Logger: logger,
MediaMetadata: &types.MediaMetadata{
MediaID: "1337",
UploadName: "test ok",
},
},
want: nil,
},
{
name: "upload ok (exact size)",
args: args{
ctx: context.Background(),
reqReader: strings.NewReader("testtest"),
cfg: cfg,
db: db,
},
fields: fields{
Logger: logger,
MediaMetadata: &types.MediaMetadata{
MediaID: "1338",
UploadName: "test ok (exact size)",
},
},
want: nil,
},
{
name: "upload not ok",
args: args{
ctx: context.Background(),
reqReader: strings.NewReader("test test test"),
cfg: cfg,
db: db,
},
fields: fields{
Logger: logger,
MediaMetadata: &types.MediaMetadata{
MediaID: "1339",
UploadName: "test fail",
},
},
want: requestEntityTooLargeJSONResponse(maxSize),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &uploadRequest{
MediaMetadata: tt.fields.MediaMetadata,
Logger: tt.fields.Logger,
}
if got := r.doUpload(tt.args.ctx, tt.args.reqReader, tt.args.cfg, tt.args.db, tt.args.activeThumbnailGeneration); !reflect.DeepEqual(got, tt.want) {
t.Errorf("doUpload() = %+v, want %+v", got, tt.want)
}
})
}
}

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
@ -38,7 +39,6 @@ type Inputer struct {
ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs
OutputRoomEventTopic string
workers sync.Map // room ID -> *inputWorker
}
@ -52,7 +52,7 @@ type inputTask struct {
type inputWorker struct {
r *Inputer
running atomic.Bool
input chan *inputTask
input *fifoQueue
}
// Guarded by a CAS on w.running
@ -60,7 +60,14 @@ func (w *inputWorker) start() {
defer w.running.Store(false)
for {
select {
case task := <-w.input:
case <-w.input.wait():
task, ok := w.input.pop()
if !ok {
continue
}
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": task.event.Event.RoomID(),
}).Dec()
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil {
@ -117,6 +124,20 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
return errs
}
func init() {
prometheus.MustRegister(roomserverInputBackpressure)
}
var roomserverInputBackpressure = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "roomserver",
Name: "input_backpressure",
Help: "How many events are queued for input for a given room",
},
[]string{"room_id"},
)
// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
_ context.Context,
@ -143,7 +164,7 @@ func (r *Inputer) InputRoomEvents(
// room - the channel will be quite small as it's just pointer types.
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
r: r,
input: make(chan *inputTask, 32),
input: newFIFOQueue(),
})
worker := w.(*inputWorker)
@ -160,7 +181,10 @@ func (r *Inputer) InputRoomEvents(
if worker.running.CAS(false, true) {
go worker.start()
}
worker.input <- tasks[i]
worker.input.push(tasks[i])
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": roomID,
}).Inc()
}
// Wait for all of the workers to return results about our tasks.

View file

@ -0,0 +1,64 @@
package input
import (
"sync"
)
type fifoQueue struct {
tasks []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}
func newFIFOQueue() *fifoQueue {
q := &fifoQueue{
notifs: make(chan struct{}, 1),
}
return q
}
func (q *fifoQueue) push(frame *inputTask) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.tasks = append(q.tasks, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
}
// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
// You must check this value, even after calling wait().
func (q *fifoQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.tasks[0]
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.tasks = nil
}
return frame, true
}
// wait returns a channel which can be used to detect when an
// item is waiting in the queue.
func (q *fifoQueue) wait() <-chan struct{} {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count > 0 && len(q.notifs) == 0 {
ch := make(chan struct{})
close(ch)
return ch
}
return q.notifs
}

View file

@ -119,11 +119,15 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
_roomserver_state_snapshots
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
WHERE
_roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT
_roomserver_state_snapshots.state_snapshot_nid = ANY (
SELECT
_roomserver_state_snapshots.state_snapshot_nid
FROM
_roomserver_state_snapshots
LIMIT $1 OFFSET $2)) AS _roomserver_state_block
ORDER BY _roomserver_state_snapshots.state_snapshot_nid ASC
LIMIT $1 OFFSET $2
)
) AS _roomserver_state_block
GROUP BY
state_snapshot_nid,
room_nid,
@ -202,6 +206,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
}
// By this point we should have no more state_snapshot_nids below maxsnapshotid in either roomserver_rooms or roomserver_events
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
// in roomserver_state_snapshots
var count int64
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if _, err = tx.Exec(`
DROP TABLE _roomserver_state_snapshots;
DROP SEQUENCE roomserver_state_snapshot_nid_seq;

View file

@ -156,7 +156,7 @@ func (d *Database) AddState(
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (stateNID types.StateSnapshotNID, err error) {
if len(stateBlockNIDs) > 0 {
if len(stateBlockNIDs) > 0 && len(state) > 0 {
// Check to see if the event already appears in any of the existing state
// blocks. If it does then we should not add it again, as this will just
// result in excess state blocks and snapshots.

View file

@ -31,6 +31,7 @@ func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
// nolint:gocyclo
func UpStateBlocksRefactor(tx *sql.Tx) error {
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
defer logrus.Warn("State storage upgrade complete")
@ -45,6 +46,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
maxsnapshotid++
maxblockid++
oldMaxSnapshotID := maxsnapshotid
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
@ -133,6 +135,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
if jerr != nil {
return fmt.Errorf("json.Marshal (new blocks): %w", jerr)
}
var newsnapshot types.StateSnapshotNID
err = tx.QueryRow(`
INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids)
@ -144,7 +147,8 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
}
maxsnapshotid++
if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
_, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid)
if err != nil {
return fmt.Errorf("tx.Exec (update events): %w", err)
}
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
@ -153,6 +157,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
}
// By this point we should have no more state_snapshot_nids below oldMaxSnapshotID in either roomserver_rooms or roomserver_events
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
// in roomserver_state_snapshots
var count int64
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
}

View file

@ -101,7 +101,7 @@ func (a *ApplicationService) IsInterestedInRoomID(
) bool {
if namespaceSlice, ok := a.NamespaceMap["rooms"]; ok {
for _, namespace := range namespaceSlice {
if namespace.RegexpObject.MatchString(roomID) {
if namespace.RegexpObject != nil && namespace.RegexpObject.MatchString(roomID) {
return true
}
}
@ -222,6 +222,10 @@ func setupRegexps(asAPI *AppServiceAPI, derived *Derived) (err error) {
case "aliases":
appendExclusiveNamespaceRegexs(&exclusiveAliasStrings, namespaceSlice)
}
if err = compileNamespaceRegexes(namespaceSlice); err != nil {
return fmt.Errorf("invalid regex in appservice %q, namespace %q: %w", appservice.ID, key, err)
}
}
}
@ -258,18 +262,31 @@ func setupRegexps(asAPI *AppServiceAPI, derived *Derived) (err error) {
func appendExclusiveNamespaceRegexs(
exclusiveStrings *[]string, namespaces []ApplicationServiceNamespace,
) {
for index, namespace := range namespaces {
for _, namespace := range namespaces {
if namespace.Exclusive {
// We append parenthesis to later separate each regex when we compile
// i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
*exclusiveStrings = append(*exclusiveStrings, "("+namespace.Regex+")")
}
// Compile this regex into a Regexp object for later use
namespaces[index].RegexpObject, _ = regexp.Compile(namespace.Regex)
}
}
// compileNamespaceRegexes turns strings into regex objects and complains
// if some of there are bad
func compileNamespaceRegexes(namespaces []ApplicationServiceNamespace) (err error) {
for index, namespace := range namespaces {
// Compile this regex into a Regexp object for later use
r, err := regexp.Compile(namespace.Regex)
if err != nil {
return fmt.Errorf("regex at namespace %d: %w", index, err)
}
namespaces[index].RegexpObject = r
}
return nil
}
// checkErrors checks for any configuration errors amongst the loaded
// application services according to the application service spec.
func checkErrors(config *AppServiceAPI, derived *Derived) (err error) {

View file

@ -2,6 +2,7 @@ package config
import (
"fmt"
"math"
)
type MediaAPI struct {
@ -57,6 +58,11 @@ func (c *MediaAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString))
checkNotEmpty(configErrs, "media_api.base_path", string(c.BasePath))
// allow "unlimited" file size
if c.MaxFileSizeBytes != nil && *c.MaxFileSizeBytes <= 0 {
unlimitedSize := FileSizeBytes(math.MaxInt64 - 1)
c.MaxFileSizeBytes = &unlimitedSize
}
checkPositive(configErrs, "media_api.max_file_size_bytes", int64(*c.MaxFileSizeBytes))
checkPositive(configErrs, "media_api.max_thumbnail_generators", int64(c.MaxThumbnailGenerators))

View file

@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
federationapi.AddPublicRoutes(
ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(

View file

@ -39,9 +39,9 @@ import (
)
const (
ConstCreateEventContentKey = "org.matrix.msc1772.type"
ConstSpaceChildEventType = "org.matrix.msc1772.space.child"
ConstSpaceParentEventType = "org.matrix.msc1772.space.parent"
ConstCreateEventContentKey = "type"
ConstSpaceChildEventType = "m.space.child"
ConstSpaceParentEventType = "m.space.parent"
)
// Defaults sets the request defaults