merge master

This commit is contained in:
Matthew Hodgson 2020-08-31 12:26:23 +03:00
commit cfa0be544d
38 changed files with 260 additions and 167 deletions

View file

@ -111,13 +111,12 @@ func (m *DendriteMonolith) Start() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := ygg.CreateFederationClient(base) federation := ygg.CreateFederationClient(base)
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, cfg.Derived.ApplicationServices, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
rsAPI := roomserver.NewInternalAPI( rsAPI := roomserver.NewInternalAPI(
@ -153,7 +152,6 @@ func (m *DendriteMonolith) Start() {
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB,
Client: ygg.CreateClient(base), Client: ygg.CreateClient(base),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,

View file

@ -1,5 +1,5 @@
FROM golang:1.13-stretch as build FROM golang:1.13-stretch as build
RUN apt-get update && apt-get install sqlite3 RUN apt-get update && apt-get install -y sqlite3
WORKDIR /build WORKDIR /build
# Utilise Docker caching when downloading dependencies, this stops us needlessly # Utilise Docker caching when downloading dependencies, this stops us needlessly

View file

@ -30,7 +30,6 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -39,7 +38,6 @@ func AddPublicRoutes(
router *mux.Router, router *mux.Router,
cfg *config.ClientAPI, cfg *config.ClientAPI,
producer sarama.SyncProducer, producer sarama.SyncProducer,
deviceDB devices.Database,
accountsDB accounts.Database, accountsDB accounts.Database,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
@ -59,7 +57,7 @@ func AddPublicRoutes(
routing.Setup( routing.Setup(
router, cfg, eduInputAPI, rsAPI, asAPI, router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, userAPI, federation, accountsDB, userAPI, federation,
syncProducer, transactionsCache, fsAPI, stateAPI, keyAPI, extRoomsProvider, syncProducer, transactionsCache, fsAPI, stateAPI, keyAPI, extRoomsProvider,
) )
} }

View file

@ -15,7 +15,6 @@
package routing package routing
import ( import (
"database/sql"
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -23,7 +22,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/devices" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -50,57 +49,56 @@ type devicesDeleteJSON struct {
// GetDeviceByID handles /devices/{deviceID} // GetDeviceByID handles /devices/{deviceID}
func GetDeviceByID( func GetDeviceByID(
req *http.Request, deviceDB devices.Database, device *api.Device, req *http.Request, userAPI userapi.UserInternalAPI, device *api.Device,
deviceID string, deviceID string,
) util.JSONResponse { ) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) var queryRes userapi.QueryDevicesResponse
err := userAPI.QueryDevices(req.Context(), &userapi.QueryDevicesRequest{
UserID: device.UserID,
}, &queryRes)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") util.GetLogger(req.Context()).WithError(err).Error("QueryDevices failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
var targetDevice *userapi.Device
ctx := req.Context() for _, device := range queryRes.Devices {
dev, err := deviceDB.GetDeviceByID(ctx, localpart, deviceID) if device.ID == deviceID {
if err == sql.ErrNoRows { targetDevice = &device
break
}
}
if targetDevice == nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusNotFound, Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Unknown device"), JSON: jsonerror.NotFound("Unknown device"),
} }
} else if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("deviceDB.GetDeviceByID failed")
return jsonerror.InternalServerError()
} }
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: deviceJSON{ JSON: deviceJSON{
DeviceID: dev.ID, DeviceID: targetDevice.ID,
DisplayName: dev.DisplayName, DisplayName: targetDevice.DisplayName,
}, },
} }
} }
// GetDevicesByLocalpart handles /devices // GetDevicesByLocalpart handles /devices
func GetDevicesByLocalpart( func GetDevicesByLocalpart(
req *http.Request, deviceDB devices.Database, device *api.Device, req *http.Request, userAPI userapi.UserInternalAPI, device *api.Device,
) util.JSONResponse { ) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) var queryRes userapi.QueryDevicesResponse
err := userAPI.QueryDevices(req.Context(), &userapi.QueryDevicesRequest{
UserID: device.UserID,
}, &queryRes)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") util.GetLogger(req.Context()).WithError(err).Error("QueryDevices failed")
return jsonerror.InternalServerError()
}
ctx := req.Context()
deviceList, err := deviceDB.GetDevicesByLocalpart(ctx, localpart)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("deviceDB.GetDevicesByLocalpart failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
res := devicesJSON{} res := devicesJSON{}
for _, dev := range deviceList { for _, dev := range queryRes.Devices {
res.Devices = append(res.Devices, deviceJSON{ res.Devices = append(res.Devices, deviceJSON{
DeviceID: dev.ID, DeviceID: dev.ID,
DisplayName: dev.DisplayName, DisplayName: dev.DisplayName,

View file

@ -41,6 +41,17 @@ func JoinRoomByIDOrAlias(
} }
joinRes := roomserverAPI.PerformJoinResponse{} joinRes := roomserverAPI.PerformJoinResponse{}
// Check to see if any ?server_name= query parameters were
// given in the request.
if serverNames, ok := req.URL.Query()["server_name"]; ok {
for _, serverName := range serverNames {
joinReq.ServerNames = append(
joinReq.ServerNames,
gomatrixserverlib.ServerName(serverName),
)
}
}
// If content was provided in the request then include that // If content was provided in the request then include that
// in the request. It'll get used as a part of the membership // in the request. It'll get used as a part of the membership
// event content. // event content.

View file

@ -19,23 +19,21 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/devices" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
// Logout handles POST /logout // Logout handles POST /logout
func Logout( func Logout(
req *http.Request, deviceDB devices.Database, device *api.Device, req *http.Request, userAPI userapi.UserInternalAPI, device *api.Device,
) util.JSONResponse { ) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) var performRes userapi.PerformDeviceDeletionResponse
err := userAPI.PerformDeviceDeletion(req.Context(), &userapi.PerformDeviceDeletionRequest{
UserID: device.UserID,
DeviceIDs: []string{device.ID},
}, &performRes)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") util.GetLogger(req.Context()).WithError(err).Error("PerformDeviceDeletion failed")
return jsonerror.InternalServerError()
}
if err := deviceDB.RemoveDevice(req.Context(), device.ID, localpart); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("deviceDB.RemoveDevice failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -47,16 +45,15 @@ func Logout(
// LogoutAll handles POST /logout/all // LogoutAll handles POST /logout/all
func LogoutAll( func LogoutAll(
req *http.Request, deviceDB devices.Database, device *api.Device, req *http.Request, userAPI userapi.UserInternalAPI, device *api.Device,
) util.JSONResponse { ) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) var performRes userapi.PerformDeviceDeletionResponse
err := userAPI.PerformDeviceDeletion(req.Context(), &userapi.PerformDeviceDeletionRequest{
UserID: device.UserID,
DeviceIDs: nil,
}, &performRes)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") util.GetLogger(req.Context()).WithError(err).Error("PerformDeviceDeletion failed")
return jsonerror.InternalServerError()
}
if err := deviceDB.RemoveAllDevices(req.Context(), localpart); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("deviceDB.RemoveAllDevices failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -35,7 +35,6 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -52,7 +51,6 @@ func Setup(
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
accountDB accounts.Database, accountDB accounts.Database,
deviceDB devices.Database,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
syncProducer *producers.SyncAPIProducer, syncProducer *producers.SyncAPIProducer,
@ -333,13 +331,13 @@ func Setup(
r0mux.Handle("/logout", r0mux.Handle("/logout",
httputil.MakeAuthAPI("logout", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("logout", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return Logout(req, deviceDB, device) return Logout(req, userAPI, device)
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/logout/all", r0mux.Handle("/logout/all",
httputil.MakeAuthAPI("logout", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("logout", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return LogoutAll(req, deviceDB, device) return LogoutAll(req, userAPI, device)
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
@ -643,7 +641,7 @@ func Setup(
r0mux.Handle("/devices", r0mux.Handle("/devices",
httputil.MakeAuthAPI("get_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("get_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return GetDevicesByLocalpart(req, deviceDB, device) return GetDevicesByLocalpart(req, userAPI, device)
}), }),
).Methods(http.MethodGet, http.MethodOptions) ).Methods(http.MethodGet, http.MethodOptions)
@ -653,7 +651,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return GetDeviceByID(req, deviceDB, device, vars["deviceID"]) return GetDeviceByID(req, userAPI, device, vars["deviceID"])
}), }),
).Methods(http.MethodGet, http.MethodOptions) ).Methods(http.MethodGet, http.MethodOptions)

View file

@ -27,7 +27,6 @@ func main() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
asQuery := base.AppserviceHTTPClient() asQuery := base.AppserviceHTTPClient()
@ -39,7 +38,7 @@ func main() {
keyAPI := base.KeyServerHTTPClient() keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, deviceDB, accountDB, federation, base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation,
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, keyAPI, nil, rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, keyAPI, nil,
) )

View file

@ -140,10 +140,9 @@ func main() {
defer base.Base.Close() // nolint: errcheck defer base.Base.Close() // nolint: errcheck
accountDB := base.Base.CreateAccountsDB() accountDB := base.Base.CreateAccountsDB()
deviceDB := base.Base.CreateDeviceDB()
federation := createFederationClient(base) federation := createFederationClient(base)
keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer)
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, nil, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
serverKeyAPI := serverkeyapi.NewInternalAPI( serverKeyAPI := serverkeyapi.NewInternalAPI(
@ -175,7 +174,6 @@ func main() {
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Base.Cfg, Config: base.Base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB,
Client: createClient(base), Client: createClient(base),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,

View file

@ -94,14 +94,13 @@ func main() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := ygg.CreateFederationClient(base) federation := ygg.CreateFederationClient(base)
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, nil, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
rsComponent := roomserver.NewInternalAPI( rsComponent := roomserver.NewInternalAPI(
@ -136,7 +135,6 @@ func main() {
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB,
Client: ygg.CreateClient(base), Client: ygg.CreateClient(base),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,

View file

@ -69,7 +69,6 @@ func main() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
serverKeyAPI := serverkeyapi.NewInternalAPI( serverKeyAPI := serverkeyapi.NewInternalAPI(
@ -110,7 +109,7 @@ func main() {
rsImpl.SetFederationSenderAPI(fsAPI) rsImpl.SetFederationSenderAPI(fsAPI)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer)
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, cfg.Derived.ApplicationServices, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
eduInputAPI := eduserver.NewInternalAPI( eduInputAPI := eduserver.NewInternalAPI(
@ -130,7 +129,6 @@ func main() {
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB,
Client: gomatrixserverlib.NewClient(cfg.FederationSender.DisableTLSValidation), Client: gomatrixserverlib.NewClient(cfg.FederationSender.DisableTLSValidation),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,

View file

@ -25,9 +25,8 @@ func main() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, cfg.Derived.ApplicationServices, base.KeyServerHTTPClient()) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, base.KeyServerHTTPClient())
userapi.AddInternalRoutes(base.InternalAPIMux, userAPI) userapi.AddInternalRoutes(base.InternalAPIMux, userAPI)

View file

@ -191,10 +191,9 @@ func main() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := createFederationClient(cfg, node) federation := createFederationClient(cfg, node)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, nil, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
fetcher := &libp2pKeyFetcher{} fetcher := &libp2pKeyFetcher{}
@ -218,7 +217,6 @@ func main() {
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB,
Client: createClient(node), Client: createClient(node),
FedClient: federation, FedClient: federation,
KeyRing: &keyRing, KeyRing: &keyRing,

View file

@ -302,6 +302,8 @@ user_api:
conn_max_lifetime: -1 conn_max_lifetime: -1
# Configuration for Opentracing. # Configuration for Opentracing.
# See https://github.com/matrix-org/dendrite/tree/master/docs/tracing for information on
# how this works and how to set it up.
tracing: tracing:
enabled: false enabled: false
jaeger: jaeger:

BIN
docs/tracing/jaeger.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 258 KiB

49
docs/tracing/setup.md Normal file
View file

@ -0,0 +1,49 @@
## OpenTracing Setup
![Trace when sending an event into a room](/docs/tracing/jaeger.png)
Dendrite uses [Jaeger](https://www.jaegertracing.io/) for tracing between microservices.
Tracing shows the nesting of logical spans which provides visibility on how the microservices interact.
This document explains how to set up Jaeger locally on a single machine.
### Set up the Jaeger backend
The [easiest way](https://www.jaegertracing.io/docs/1.18/getting-started/) is to use the all-in-one Docker image:
```
$ docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 14250:14250 \
-p 9411:9411 \
jaegertracing/all-in-one:1.18
```
### Configuring Dendrite to talk to Jaeger
Modify your config to look like: (this will send every single span to Jaeger which will be slow on large instances, but for local testing it's fine)
```
tracing:
enabled: true
jaeger:
serviceName: "dendrite"
disabled: false
rpc_metrics: true
tags: []
sampler:
type: const
param: 1
```
then run the monolith server with `--api true` to use polylith components which do tracing spans:
```
$ ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml --api true
```
### Checking traces
Visit http://localhost:16686 to see traces under `DendriteMonolith`.

View file

@ -21,7 +21,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -33,7 +32,7 @@ func (d *Database) AssociateEDUWithDestination(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipt *Receipt, receipt *Receipt,
) error { ) error {
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
for _, nid := range receipt.nids { for _, nid := range receipt.nids {
if err := d.FederationSenderQueueEDUs.InsertQueueEDU( if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
ctx, // context ctx, // context
@ -60,7 +59,7 @@ func (d *Database) GetNextTransactionEDUs(
receipt *Receipt, receipt *Receipt,
err error, err error,
) { ) {
err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueEDUs: %w", err) return fmt.Errorf("SelectQueueEDUs: %w", err)
@ -99,7 +98,7 @@ func (d *Database) CleanEDUs(
return errors.New("expected receipt") return errors.New("expected receipt")
} }
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil { if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil {
return err return err
} }

View file

@ -21,7 +21,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -34,7 +33,7 @@ func (d *Database) AssociatePDUWithDestination(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipt *Receipt, receipt *Receipt,
) error { ) error {
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
for _, nid := range receipt.nids { for _, nid := range receipt.nids {
if err := d.FederationSenderQueuePDUs.InsertQueuePDU( if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
ctx, // context ctx, // context
@ -62,7 +61,12 @@ func (d *Database) GetNextTransactionPDUs(
receipt *Receipt, receipt *Receipt,
err error, err error,
) { ) {
err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { // Strictly speaking this doesn't need to be using the writer
// since we are only performing selects, but since we don't have
// a guarantee of transactional isolation, it's actually useful
// to know in SQLite mode that nothing else is trying to modify
// the database.
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
transactionID, err = d.FederationSenderQueuePDUs.SelectQueuePDUNextTransactionID(ctx, txn, serverName) transactionID, err = d.FederationSenderQueuePDUs.SelectQueuePDUNextTransactionID(ctx, txn, serverName)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueuePDUNextTransactionID: %w", err) return fmt.Errorf("SelectQueuePDUNextTransactionID: %w", err)
@ -111,7 +115,7 @@ func (d *Database) CleanPDUs(
return errors.New("expected receipt") return errors.New("expected receipt")
} }
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil { if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil {
return err return err
} }

View file

@ -32,7 +32,6 @@ import (
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -237,17 +236,6 @@ func (b *BaseDendrite) KeyServerHTTPClient() keyserverAPI.KeyInternalAPI {
return f return f
} }
// CreateDeviceDB creates a new instance of the device database. Should only be
// called once per component.
func (b *BaseDendrite) CreateDeviceDB() devices.Database {
db, err := devices.NewDatabase(&b.Cfg.UserAPI.DeviceDatabase, b.Cfg.Global.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to devices db")
}
return db
}
// CreateAccountsDB creates a new instance of the accounts database. Should only // CreateAccountsDB creates a new instance of the accounts database. Should only
// be called once per component. // be called once per component.
func (b *BaseDendrite) CreateAccountsDB() accounts.Database { func (b *BaseDendrite) CreateAccountsDB() accounts.Database {

View file

@ -33,7 +33,6 @@ import (
"github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/dendrite/syncapi"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -41,7 +40,6 @@ import (
// all components of Dendrite, for use in monolith mode. // all components of Dendrite, for use in monolith mode.
type Monolith struct { type Monolith struct {
Config *config.Dendrite Config *config.Dendrite
DeviceDB devices.Database
AccountDB accounts.Database AccountDB accounts.Database
KeyRing *gomatrixserverlib.KeyRing KeyRing *gomatrixserverlib.KeyRing
Client *gomatrixserverlib.Client Client *gomatrixserverlib.Client
@ -65,7 +63,7 @@ type Monolith struct {
// AddAllPublicRoutes attaches all public paths to the given router // AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) { func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
csMux, &m.Config.ClientAPI, m.KafkaProducer, m.DeviceDB, m.AccountDB, csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
m.FedClient, m.RoomserverAPI, m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(), m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,

View file

@ -341,8 +341,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
if err != nil { if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user")
fcerr, ok := err.(*fedsenderapi.FederationClientError) fcerr, ok := err.(*fedsenderapi.FederationClientError)
if ok && fcerr.RetryAfter > 0 { if ok {
if fcerr.RetryAfter > 0 {
waitTime = fcerr.RetryAfter waitTime = fcerr.RetryAfter
} else if fcerr.Blacklisted {
waitTime = time.Hour * 8
}
} }
hasFailures = true hasFailures = true
continue continue

View file

@ -16,6 +16,7 @@ package fileutils
import ( import (
"bufio" "bufio"
"context"
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
@ -27,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -104,18 +106,31 @@ func RemoveDir(dir types.Path, logger *log.Entry) {
} }
} }
// WriteTempFile writes to a new temporary file // WriteTempFile writes to a new temporary file.
func WriteTempFile(reqReader io.Reader, maxFileSizeBytes config.FileSizeBytes, absBasePath config.Path) (hash types.Base64Hash, size types.FileSizeBytes, path types.Path, err error) { // The file is deleted if there was an error while writing.
func WriteTempFile(
ctx context.Context, reqReader io.Reader, maxFileSizeBytes config.FileSizeBytes, absBasePath config.Path,
) (hash types.Base64Hash, size types.FileSizeBytes, path types.Path, err error) {
size = -1 size = -1
logger := util.GetLogger(ctx)
tmpFileWriter, tmpFile, tmpDir, err := createTempFileWriter(absBasePath) tmpFileWriter, tmpFile, tmpDir, err := createTempFileWriter(absBasePath)
if err != nil { if err != nil {
return return
} }
defer (func() { err = tmpFile.Close() })() defer func() {
err2 := tmpFile.Close()
if err == nil {
err = err2
}
}()
// The amount of data read is limited to maxFileSizeBytes. At this point, if there is more data it will be truncated. // If the max_file_size_bytes configuration option is set to a positive
limitedReader := io.LimitReader(reqReader, int64(maxFileSizeBytes)) // number then limit the upload to that size. Otherwise, just read the
// whole file.
limitedReader := reqReader
if maxFileSizeBytes > 0 {
limitedReader = io.LimitReader(reqReader, int64(maxFileSizeBytes))
}
// Hash the file data. The hash will be returned. The hash is useful as a // Hash the file data. The hash will be returned. The hash is useful as a
// method of deduplicating files to save storage, as well as a way to conduct // method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository. // integrity checks on the file data in the repository.
@ -123,11 +138,13 @@ func WriteTempFile(reqReader io.Reader, maxFileSizeBytes config.FileSizeBytes, a
teeReader := io.TeeReader(limitedReader, hasher) teeReader := io.TeeReader(limitedReader, hasher)
bytesWritten, err := io.Copy(tmpFileWriter, teeReader) bytesWritten, err := io.Copy(tmpFileWriter, teeReader)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
RemoveDir(tmpDir, logger)
return return
} }
err = tmpFileWriter.Flush() err = tmpFileWriter.Flush()
if err != nil { if err != nil {
RemoveDir(tmpDir, logger)
return return
} }

View file

@ -728,12 +728,11 @@ func (r *downloadRequest) fetchRemoteFile(
// method of deduplicating files to save storage, as well as a way to conduct // method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository. // integrity checks on the file data in the repository.
// Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK. // Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK.
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(resp.Body, maxFileSizeBytes, absBasePath) hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, resp.Body, maxFileSizeBytes, absBasePath)
if err != nil { if err != nil {
r.Logger.WithError(err).WithFields(log.Fields{ r.Logger.WithError(err).WithFields(log.Fields{
"MaxFileSizeBytes": maxFileSizeBytes, "MaxFileSizeBytes": maxFileSizeBytes,
}).Warn("Error while downloading file from remote server") }).Warn("Error while downloading file from remote server")
fileutils.RemoveDir(tmpDir, r.Logger)
return "", false, errors.New("file could not be downloaded from remote server") return "", false, errors.New("file could not be downloaded from remote server")
} }

View file

@ -53,8 +53,8 @@ func Setup(
uploadHandler := httputil.MakeAuthAPI( uploadHandler := httputil.MakeAuthAPI(
"upload", userAPI, "upload", userAPI,
func(req *http.Request, _ *userapi.Device) util.JSONResponse { func(req *http.Request, dev *userapi.Device) util.JSONResponse {
return Upload(req, cfg, db, activeThumbnailGeneration) return Upload(req, cfg, dev, db, activeThumbnailGeneration)
}, },
) )

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/thumbnailer" "github.com/matrix-org/dendrite/mediaapi/thumbnailer"
"github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/dendrite/mediaapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -55,8 +56,8 @@ type uploadResponse struct {
// This implementation supports a configurable maximum file size limit in bytes. If a user tries to upload more than this, they will receive an error that their upload is too large. // This implementation supports a configurable maximum file size limit in bytes. If a user tries to upload more than this, they will receive an error that their upload is too large.
// Uploaded files are processed piece-wise to avoid DoS attacks which would starve the server of memory. // Uploaded files are processed piece-wise to avoid DoS attacks which would starve the server of memory.
// TODO: We should time out requests if they have not received any data within a configured timeout period. // TODO: We should time out requests if they have not received any data within a configured timeout period.
func Upload(req *http.Request, cfg *config.MediaAPI, db storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) util.JSONResponse { func Upload(req *http.Request, cfg *config.MediaAPI, dev *userapi.Device, db storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) util.JSONResponse {
r, resErr := parseAndValidateRequest(req, cfg) r, resErr := parseAndValidateRequest(req, cfg, dev)
if resErr != nil { if resErr != nil {
return *resErr return *resErr
} }
@ -76,13 +77,14 @@ func Upload(req *http.Request, cfg *config.MediaAPI, db storage.Database, active
// parseAndValidateRequest parses the incoming upload request to validate and extract // parseAndValidateRequest parses the incoming upload request to validate and extract
// all the metadata about the media being uploaded. // all the metadata about the media being uploaded.
// Returns either an uploadRequest or an error formatted as a util.JSONResponse // Returns either an uploadRequest or an error formatted as a util.JSONResponse
func parseAndValidateRequest(req *http.Request, cfg *config.MediaAPI) (*uploadRequest, *util.JSONResponse) { func parseAndValidateRequest(req *http.Request, cfg *config.MediaAPI, dev *userapi.Device) (*uploadRequest, *util.JSONResponse) {
r := &uploadRequest{ r := &uploadRequest{
MediaMetadata: &types.MediaMetadata{ MediaMetadata: &types.MediaMetadata{
Origin: cfg.Matrix.ServerName, Origin: cfg.Matrix.ServerName,
FileSizeBytes: types.FileSizeBytes(req.ContentLength), FileSizeBytes: types.FileSizeBytes(req.ContentLength),
ContentType: types.ContentType(req.Header.Get("Content-Type")), ContentType: types.ContentType(req.Header.Get("Content-Type")),
UploadName: types.Filename(url.PathEscape(req.FormValue("filename"))), UploadName: types.Filename(url.PathEscape(req.FormValue("filename"))),
UserID: types.MatrixUserID(dev.UserID),
}, },
Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.Matrix.ServerName), Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.Matrix.ServerName),
} }
@ -138,12 +140,18 @@ func (r *uploadRequest) doUpload(
// method of deduplicating files to save storage, as well as a way to conduct // method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository. // integrity checks on the file data in the repository.
// Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK. // Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK.
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(reqReader, *cfg.MaxFileSizeBytes, cfg.AbsBasePath) //
// TODO: This has a bad API shape where you either need to call:
// fileutils.RemoveDir(tmpDir, r.Logger)
// or call:
// r.storeFileAndMetadata(ctx, tmpDir, ...)
// before you return from doUpload else we will leak a temp file. We could make this nicer with a `WithTransaction` style of
// nested function to guarantee either storage or cleanup.
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, reqReader, *cfg.MaxFileSizeBytes, cfg.AbsBasePath)
if err != nil { if err != nil {
r.Logger.WithError(err).WithFields(log.Fields{ r.Logger.WithError(err).WithFields(log.Fields{
"MaxFileSizeBytes": *cfg.MaxFileSizeBytes, "MaxFileSizeBytes": *cfg.MaxFileSizeBytes,
}).Warn("Error while transferring file") }).Warn("Error while transferring file")
fileutils.RemoveDir(tmpDir, r.Logger)
return &util.JSONResponse{ return &util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
JSON: jsonerror.Unknown("Failed to upload"), JSON: jsonerror.Unknown("Failed to upload"),
@ -157,11 +165,14 @@ func (r *uploadRequest) doUpload(
ctx, hash, r.MediaMetadata.Origin, ctx, hash, r.MediaMetadata.Origin,
) )
if err != nil { if err != nil {
fileutils.RemoveDir(tmpDir, r.Logger)
r.Logger.WithError(err).Error("Error querying the database by hash.") r.Logger.WithError(err).Error("Error querying the database by hash.")
resErr := jsonerror.InternalServerError() resErr := jsonerror.InternalServerError()
return &resErr return &resErr
} }
if existingMetadata != nil { if existingMetadata != nil {
// The file already exists, delete the uploaded temporary file.
defer fileutils.RemoveDir(tmpDir, r.Logger)
// The file already exists. Make a new media ID up for it. // The file already exists. Make a new media ID up for it.
mediaID, merr := r.generateMediaID(ctx, db) mediaID, merr := r.generateMediaID(ctx, db)
if merr != nil { if merr != nil {
@ -181,15 +192,13 @@ func (r *uploadRequest) doUpload(
Base64Hash: hash, Base64Hash: hash,
UserID: r.MediaMetadata.UserID, UserID: r.MediaMetadata.UserID,
} }
// Clean up the uploaded temporary file.
fileutils.RemoveDir(tmpDir, r.Logger)
} else { } else {
// The file doesn't exist. Update the request metadata. // The file doesn't exist. Update the request metadata.
r.MediaMetadata.FileSizeBytes = bytesWritten r.MediaMetadata.FileSizeBytes = bytesWritten
r.MediaMetadata.Base64Hash = hash r.MediaMetadata.Base64Hash = hash
r.MediaMetadata.MediaID, err = r.generateMediaID(ctx, db) r.MediaMetadata.MediaID, err = r.generateMediaID(ctx, db)
if err != nil { if err != nil {
fileutils.RemoveDir(tmpDir, r.Logger)
r.Logger.WithError(err).Error("Failed to generate media ID for new upload") r.Logger.WithError(err).Error("Failed to generate media ID for new upload")
resErr := jsonerror.InternalServerError() resErr := jsonerror.InternalServerError()
return &resErr return &resErr

View file

@ -161,8 +161,9 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
// where we might think we know about a room in the following // where we might think we know about a room in the following
// section but don't know the latest state as all of our users // section but don't know the latest state as all of our users
// have left. // have left.
serverInRoom, _ := r.isServerCurrentlyInRoom(ctx, r.ServerName, req.RoomIDOrAlias)
isInvitePending, inviteSender, _, err := r.isInvitePending(ctx, req.RoomIDOrAlias, req.UserID) isInvitePending, inviteSender, _, err := r.isInvitePending(ctx, req.RoomIDOrAlias, req.UserID)
if err == nil && isInvitePending { if err == nil && isInvitePending && !serverInRoom {
// Check if there's an invite pending. // Check if there's an invite pending.
_, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender) _, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
if ierr != nil { if ierr != nil {

View file

@ -468,7 +468,7 @@ func (d *Database) addPDUDeltaToResponse(
wantFullState bool, wantFullState bool,
res *types.Response, res *types.Response,
) (joinedRoomIDs []string, err error) { ) (joinedRoomIDs []string, err error) {
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.DB.BeginTx(context.TODO(), &txReadOnlySnapshot) // TODO: check mattn/go-sqlite3#764
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -594,20 +594,23 @@ func (d *Database) IncrementalSync(
joinedRoomIDs, err = d.addPDUDeltaToResponse( joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, r, numRecentEventsPerRoom, wantFullState, res, ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
) )
if err != nil {
return nil, fmt.Errorf("d.addPDUDeltaToResponse: %w", err)
}
} else { } else {
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership( joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
ctx, nil, device.UserID, gomatrixserverlib.Join, ctx, nil, device.UserID, gomatrixserverlib.Join,
) )
}
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("d.CurrentRoomState.SelectRoomIDsWithMembership: %w", err)
}
} }
err = d.addEDUDeltaToResponse( err = d.addEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res, fromPos, toPos, joinedRoomIDs, res,
) )
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
} }
return res, nil return res, nil
@ -649,7 +652,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
// a consistent view of the database throughout. This includes extracting the sync position. // a consistent view of the database throughout. This includes extracting the sync position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function, // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction. // but it's better to not hide the fact that this is being done in a transaction.
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.DB.BeginTx(context.TODO(), &txReadOnlySnapshot) // TODO: check mattn/go-sqlite3#764
if err != nil { if err != nil {
return return
} }
@ -736,7 +739,7 @@ func (d *Database) CompleteSync(
ctx, res, device.UserID, numRecentEventsPerRoom, ctx, res, device.UserID, numRecentEventsPerRoom,
) )
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
} }
// Use a zero value SyncPosition for fromPos so all EDU states are added. // Use a zero value SyncPosition for fromPos so all EDU states are added.
@ -744,7 +747,7 @@ func (d *Database) CompleteSync(
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res, types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
) )
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
} }
return res, nil return res, nil
@ -770,7 +773,7 @@ func (d *Database) addInvitesToResponse(
ctx, txn, userID, r, ctx, txn, userID, r,
) )
if err != nil { if err != nil {
return err return fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err)
} }
for roomID, inviteEvent := range invites { for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse(inviteEvent) ir := types.NewInviteResponse(inviteEvent)

View file

@ -18,6 +18,7 @@ package sync
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"time" "time"
@ -204,31 +205,34 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// See if we have any new tasks to do for the send-to-device messaging. // See if we have any new tasks to do for the send-to-device messaging.
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, since) events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, since)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
} }
// TODO: handle ignored users // TODO: handle ignored users
if req.since == nil { if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit) res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
if err != nil {
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
}
} else { } else {
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState) res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
}
if err != nil { if err != nil {
return res, err return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
}
} }
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter) res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
if err != nil { if err != nil {
return res, err return res, fmt.Errorf("rp.appendAccountData: %w", err)
} }
res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos) res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
if err != nil { if err != nil {
return res, err return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
} }
err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res) err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
if err != nil { if err != nil {
return res, err return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
} }
// Before we return the sync response, make sure that we take action on // Before we return the sync response, make sure that we take action on
@ -238,7 +242,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// Handle the updates and deletions in the database. // Handle the updates and deletions in the database.
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since) err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since)
if err != nil { if err != nil {
return res, err return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
} }
} }
if len(events) > 0 { if len(events) > 0 {
@ -263,7 +267,7 @@ func (rp *RequestPool) appendDeviceLists(
) (*types.Response, error) { ) (*types.Response, error) {
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to) _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
} }
return data, nil return data, nil
@ -329,7 +333,7 @@ func (rp *RequestPool) appendAccountData(
req.ctx, userID, r, accountDataFilter, req.ctx, userID, r, accountDataFilter,
) )
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
} }
if len(dataTypes) == 0 { if len(dataTypes) == 0 {

View file

@ -61,7 +61,7 @@ type PerformDeviceUpdateResponse struct {
type PerformDeviceDeletionRequest struct { type PerformDeviceDeletionRequest struct {
UserID string UserID string
// The devices to delete // The devices to delete. An empty slice means delete all devices.
DeviceIDs []string DeviceIDs []string
} }
@ -193,7 +193,6 @@ type Device struct {
// Can be used as a secure substitution in places where data needs to be // Can be used as a secure substitution in places where data needs to be
// associated with access tokens. // associated with access tokens.
SessionID int64 SessionID int64
// TODO: display name, last used timestamp, keys, etc
DisplayName string DisplayName string
} }

View file

@ -123,12 +123,21 @@ func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.Pe
if domain != a.ServerName { if domain != a.ServerName {
return fmt.Errorf("cannot PerformDeviceDeletion of remote users: got %s want %s", domain, a.ServerName) return fmt.Errorf("cannot PerformDeviceDeletion of remote users: got %s want %s", domain, a.ServerName)
} }
deletedDeviceIDs := req.DeviceIDs
if len(req.DeviceIDs) == 0 {
var devices []api.Device
devices, err = a.DeviceDB.RemoveAllDevices(ctx, local)
for _, d := range devices {
deletedDeviceIDs = append(deletedDeviceIDs, d.ID)
}
} else {
err = a.DeviceDB.RemoveDevices(ctx, local, req.DeviceIDs) err = a.DeviceDB.RemoveDevices(ctx, local, req.DeviceIDs)
}
if err != nil { if err != nil {
return err return err
} }
// create empty device keys and upload them to delete what was once there and trigger device list changes // create empty device keys and upload them to delete what was once there and trigger device list changes
return a.deviceListUpdate(req.UserID, req.DeviceIDs) return a.deviceListUpdate(req.UserID, deletedDeviceIDs)
} }
func (a *UserInternalAPI) deviceListUpdate(userID string, deviceIDs []string) error { func (a *UserInternalAPI) deviceListUpdate(userID string, deviceIDs []string) error {

View file

@ -35,5 +35,6 @@ type Database interface {
UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error
RemoveDevice(ctx context.Context, deviceID, localpart string) error RemoveDevice(ctx context.Context, deviceID, localpart string) error
RemoveDevices(ctx context.Context, localpart string, devices []string) error RemoveDevices(ctx context.Context, localpart string, devices []string) error
RemoveAllDevices(ctx context.Context, localpart string) error // RemoveAllDevices deleted all devices for this user. Returns the devices deleted.
RemoveAllDevices(ctx context.Context, localpart string) (devices []api.Device, err error)
} }

View file

@ -251,11 +251,10 @@ func (s *devicesStatements) selectDevicesByID(ctx context.Context, deviceIDs []s
} }
func (s *devicesStatements) selectDevicesByLocalpart( func (s *devicesStatements) selectDevicesByLocalpart(
ctx context.Context, localpart string, ctx context.Context, txn *sql.Tx, localpart string,
) ([]api.Device, error) { ) ([]api.Device, error) {
devices := []api.Device{} devices := []api.Device{}
rows, err := sqlutil.TxStmt(txn, s.selectDevicesByLocalpartStmt).QueryContext(ctx, localpart)
rows, err := s.selectDevicesByLocalpartStmt.QueryContext(ctx, localpart)
if err != nil { if err != nil {
return devices, err return devices, err

View file

@ -68,7 +68,7 @@ func (d *Database) GetDeviceByID(
func (d *Database) GetDevicesByLocalpart( func (d *Database) GetDevicesByLocalpart(
ctx context.Context, localpart string, ctx context.Context, localpart string,
) ([]api.Device, error) { ) ([]api.Device, error) {
return d.devices.selectDevicesByLocalpart(ctx, localpart) return d.devices.selectDevicesByLocalpart(ctx, nil, localpart)
} }
func (d *Database) GetDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error) { func (d *Database) GetDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error) {
@ -176,11 +176,16 @@ func (d *Database) RemoveDevices(
// If something went wrong during the deletion, it will return the SQL error. // If something went wrong during the deletion, it will return the SQL error.
func (d *Database) RemoveAllDevices( func (d *Database) RemoveAllDevices(
ctx context.Context, localpart string, ctx context.Context, localpart string,
) error { ) (devices []api.Device, err error) {
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
devices, err = d.devices.selectDevicesByLocalpart(ctx, txn, localpart)
if err != nil {
return err
}
if err := d.devices.deleteDevicesByLocalpart(ctx, txn, localpart); err != sql.ErrNoRows { if err := d.devices.deleteDevicesByLocalpart(ctx, txn, localpart); err != sql.ErrNoRows {
return err return err
} }
return nil return nil
}) })
return
} }

View file

@ -231,11 +231,10 @@ func (s *devicesStatements) selectDeviceByID(
} }
func (s *devicesStatements) selectDevicesByLocalpart( func (s *devicesStatements) selectDevicesByLocalpart(
ctx context.Context, localpart string, ctx context.Context, txn *sql.Tx, localpart string,
) ([]api.Device, error) { ) ([]api.Device, error) {
devices := []api.Device{} devices := []api.Device{}
rows, err := sqlutil.TxStmt(txn, s.selectDevicesByLocalpartStmt).QueryContext(ctx, localpart)
rows, err := s.selectDevicesByLocalpartStmt.QueryContext(ctx, localpart)
if err != nil { if err != nil {
return devices, err return devices, err

View file

@ -72,7 +72,7 @@ func (d *Database) GetDeviceByID(
func (d *Database) GetDevicesByLocalpart( func (d *Database) GetDevicesByLocalpart(
ctx context.Context, localpart string, ctx context.Context, localpart string,
) ([]api.Device, error) { ) ([]api.Device, error) {
return d.devices.selectDevicesByLocalpart(ctx, localpart) return d.devices.selectDevicesByLocalpart(ctx, nil, localpart)
} }
func (d *Database) GetDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error) { func (d *Database) GetDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error) {
@ -180,11 +180,16 @@ func (d *Database) RemoveDevices(
// If something went wrong during the deletion, it will return the SQL error. // If something went wrong during the deletion, it will return the SQL error.
func (d *Database) RemoveAllDevices( func (d *Database) RemoveAllDevices(
ctx context.Context, localpart string, ctx context.Context, localpart string,
) error { ) (devices []api.Device, err error) {
return d.writer.Do(d.db, nil, func(txn *sql.Tx) error { err = d.writer.Do(d.db, nil, func(txn *sql.Tx) error {
devices, err = d.devices.selectDevicesByLocalpart(ctx, txn, localpart)
if err != nil {
return err
}
if err := d.devices.deleteDevicesByLocalpart(ctx, txn, localpart); err != sql.ErrNoRows { if err := d.devices.deleteDevicesByLocalpart(ctx, txn, localpart); err != sql.ErrNoRows {
return err return err
} }
return nil return nil
}) })
return
} }

View file

@ -23,7 +23,7 @@ import (
"github.com/matrix-org/dendrite/userapi/inthttp" "github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus"
) )
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions // AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
@ -34,13 +34,19 @@ func AddInternalRoutes(router *mux.Router, intAPI api.UserInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers // NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(accountDB accounts.Database, deviceDB devices.Database, func NewInternalAPI(
serverName gomatrixserverlib.ServerName, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI) api.UserInternalAPI { accountDB accounts.Database, cfg *config.UserAPI, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
) api.UserInternalAPI {
deviceDB, err := devices.NewDatabase(&cfg.DeviceDatabase, cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to device db")
}
return &internal.UserInternalAPI{ return &internal.UserInternalAPI{
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB, DeviceDB: deviceDB,
ServerName: serverName, ServerName: cfg.Matrix.ServerName,
AppServices: appServices, AppServices: appServices,
KeyAPI: keyAPI, KeyAPI: keyAPI,
} }

View file

@ -15,7 +15,6 @@ import (
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/inthttp" "github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -23,27 +22,31 @@ const (
serverName = gomatrixserverlib.ServerName("example.com") serverName = gomatrixserverlib.ServerName("example.com")
) )
func MustMakeInternalAPI(t *testing.T) (api.UserInternalAPI, accounts.Database, devices.Database) { func MustMakeInternalAPI(t *testing.T) (api.UserInternalAPI, accounts.Database) {
accountDB, err := accounts.NewDatabase(&config.DatabaseOptions{ accountDB, err := accounts.NewDatabase(&config.DatabaseOptions{
ConnectionString: "file::memory:", ConnectionString: "file::memory:",
}, serverName) }, serverName)
if err != nil { if err != nil {
t.Fatalf("failed to create account DB: %s", err) t.Fatalf("failed to create account DB: %s", err)
} }
deviceDB, err := devices.NewDatabase(&config.DatabaseOptions{ cfg := &config.UserAPI{
DeviceDatabase: config.DatabaseOptions{
ConnectionString: "file::memory:", ConnectionString: "file::memory:",
}, serverName) MaxOpenConnections: 1,
if err != nil { MaxIdleConnections: 1,
t.Fatalf("failed to create device DB: %s", err) },
Matrix: &config.Global{
ServerName: serverName,
},
} }
return userapi.NewInternalAPI(accountDB, deviceDB, serverName, nil, nil), accountDB, deviceDB return userapi.NewInternalAPI(accountDB, cfg, nil, nil), accountDB
} }
func TestQueryProfile(t *testing.T) { func TestQueryProfile(t *testing.T) {
aliceAvatarURL := "mxc://example.com/alice" aliceAvatarURL := "mxc://example.com/alice"
aliceDisplayName := "Alice" aliceDisplayName := "Alice"
userAPI, accountDB, _ := MustMakeInternalAPI(t) userAPI, accountDB := MustMakeInternalAPI(t)
_, err := accountDB.CreateAccount(context.TODO(), "alice", "foobar", "") _, err := accountDB.CreateAccount(context.TODO(), "alice", "foobar", "")
if err != nil { if err != nil {
t.Fatalf("failed to make account: %s", err) t.Fatalf("failed to make account: %s", err)