Merge pull request #1 from matrix-org/master

pull request 20210407
This commit is contained in:
davidli18 2021-04-07 11:47:12 +08:00 committed by GitHub
commit 19c8af7058
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
95 changed files with 1007 additions and 438 deletions

1
.gitignore vendored
View file

@ -19,6 +19,7 @@
/_test /_test
/vendor/bin /vendor/bin
/docker/build /docker/build
/logs
# Architecture specific extensions/prefixes # Architecture specific extensions/prefixes
*.[568vq] *.[568vq]

View file

@ -102,7 +102,7 @@ linters-settings:
#local-prefixes: github.com/org/project #local-prefixes: github.com/org/project
gocyclo: gocyclo:
# minimal code complexity to report, 30 by default (but we recommend 10-20) # minimal code complexity to report, 30 by default (but we recommend 10-20)
min-complexity: 13 min-complexity: 25
maligned: maligned:
# print struct with more effective memory layout or not, false by default # print struct with more effective memory layout or not, false by default
suggest-new: true suggest-new: true

View file

@ -1,5 +1,13 @@
# Changelog # Changelog
## Dendrite 0.3.11 (2021-03-02)
### Fixes
- **SECURITY:** A bug in SQLite mode which could cause the registration flow to complete unexpectedly for existing accounts has been fixed (PostgreSQL deployments are not affected)
- A panic in the federation sender has been fixed when shutting down destination queues
- The `/keys/upload` endpoint now correctly returns the number of one-time keys in response to an empty upload request
## Dendrite 0.3.10 (2021-02-17) ## Dendrite 0.3.10 (2021-02-17)
### Features ### Features

View file

@ -16,6 +16,7 @@ package appservice
import ( import (
"context" "context"
"crypto/tls"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -48,6 +49,15 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceQueryAPI { ) appserviceAPI.AppServiceQueryAPI {
client := &http.Client{
Timeout: time.Second * 30,
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: base.Cfg.AppServiceAPI.DisableTLSValidation,
},
},
}
consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
// Create a connection to the appservice postgres DB // Create a connection to the appservice postgres DB
@ -79,9 +89,7 @@ func NewInternalAPI(
// Create appserivce query API with an HTTP client that will be used for all // Create appserivce query API with an HTTP client that will be used for all
// outbound and inbound requests (inbound only for the internal API) // outbound and inbound requests (inbound only for the internal API)
appserviceQueryAPI := &query.AppServiceQueryAPI{ appserviceQueryAPI := &query.AppServiceQueryAPI{
HTTPClient: &http.Client{ HTTPClient: client,
Timeout: time.Second * 30,
},
Cfg: base.Cfg, Cfg: base.Cfg,
} }
@ -98,7 +106,7 @@ func NewInternalAPI(
} }
// Create application service transaction workers // Create application service transaction workers
if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil { if err := workers.SetupTransactionWorkers(client, appserviceDB, workerStates); err != nil {
logrus.WithError(err).Panicf("failed to start app service transaction workers") logrus.WithError(err).Panicf("failed to start app service transaction workers")
} }
return appserviceQueryAPI return appserviceQueryAPI

View file

@ -85,9 +85,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
} }
if output.Type != api.OutputTypeNewRoomEvent { if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil return nil
} }
@ -114,6 +111,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
// Queue this event to be sent off to the application service // Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database") log.WithError(err).Warn("failed to insert incoming event into appservices database")
return err
} else { } else {
// Tell our worker to send out new messages by updating remaining message // Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast // count and waking them up with a broadcast
@ -126,8 +124,43 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
return nil return nil
} }
// appserviceJoinedAtEvent returns a boolean depending on whether a given
// appservice has membership at the time a given event was created.
func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
// TODO: This is only checking the current room state, not the state at
// the event in question. Pretty sure this is what Synapse does too, but
// until we have a lighter way of checking the state before the event that
// doesn't involve state res, then this is probably OK.
membershipReq := &api.QueryMembershipsForRoomRequest{
RoomID: event.RoomID(),
JoinedOnly: true,
}
membershipRes := &api.QueryMembershipsForRoomResponse{}
// XXX: This could potentially race if the state for the event is not known yet
// e.g. the event came over federation but we do not have the full state persisted.
if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
for _, ev := range membershipRes.JoinEvents {
var membership gomatrixserverlib.MemberContent
if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil {
continue
}
if appservice.IsInterestedInUserID(*ev.StateKey) {
return true
}
}
} else {
log.WithFields(log.Fields{
"room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get membership for room")
}
return false
}
// appserviceIsInterestedInEvent returns a boolean depending on whether a given // appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces. // event falls within one of a given application service's namespaces.
//
// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
// No reason to queue events if they'll never be sent to the application // No reason to queue events if they'll never be sent to the application
// service // service
@ -162,5 +195,6 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
}).WithError(err).Errorf("Unable to get aliases for room") }).WithError(err).Errorf("Unable to get aliases for room")
} }
return false // Check if any of the members in the room match the appservice
return s.appserviceJoinedAtEvent(ctx, event, appservice)
} }

View file

@ -20,7 +20,6 @@ import (
"context" "context"
"net/http" "net/http"
"net/url" "net/url"
"time"
"github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -47,11 +46,6 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias") span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias")
defer span.Finish() defer span.Finish()
// Create an HTTP client if one does not already exist
if a.HTTPClient == nil {
a.HTTPClient = makeHTTPClient()
}
// Determine which application service should handle this request // Determine which application service should handle this request
for _, appservice := range a.Cfg.Derived.ApplicationServices { for _, appservice := range a.Cfg.Derived.ApplicationServices {
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) { if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
@ -115,11 +109,6 @@ func (a *AppServiceQueryAPI) UserIDExists(
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID") span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID")
defer span.Finish() defer span.Finish()
// Create an HTTP client if one does not already exist
if a.HTTPClient == nil {
a.HTTPClient = makeHTTPClient()
}
// Determine which application service should handle this request // Determine which application service should handle this request
for _, appservice := range a.Cfg.Derived.ApplicationServices { for _, appservice := range a.Cfg.Derived.ApplicationServices {
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) { if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
@ -169,10 +158,3 @@ func (a *AppServiceQueryAPI) UserIDExists(
response.UserIDExists = false response.UserIDExists = false
return nil return nil
} }
// makeHTTPClient creates an HTTP client with certain options that will be used for all query requests to application services
func makeHTTPClient() *http.Client {
return &http.Client{
Timeout: time.Second * 30,
}
}

View file

@ -34,8 +34,6 @@ import (
var ( var (
// Maximum size of events sent in each transaction. // Maximum size of events sent in each transaction.
transactionBatchSize = 50 transactionBatchSize = 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 60
) )
// SetupTransactionWorkers spawns a separate goroutine for each application // SetupTransactionWorkers spawns a separate goroutine for each application
@ -44,6 +42,7 @@ var (
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also // size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available. // handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers( func SetupTransactionWorkers(
client *http.Client,
appserviceDB storage.Database, appserviceDB storage.Database,
workerStates []types.ApplicationServiceWorkerState, workerStates []types.ApplicationServiceWorkerState,
) error { ) error {
@ -51,7 +50,7 @@ func SetupTransactionWorkers(
for _, workerState := range workerStates { for _, workerState := range workerStates {
// Don't create a worker if this AS doesn't want to receive events // Don't create a worker if this AS doesn't want to receive events
if workerState.AppService.URL != "" { if workerState.AppService.URL != "" {
go worker(appserviceDB, workerState) go worker(client, appserviceDB, workerState)
} }
} }
return nil return nil
@ -59,17 +58,12 @@ func SetupTransactionWorkers(
// worker is a goroutine that sends any queued events to the application service // worker is a goroutine that sends any queued events to the application service
// it is given. // it is given.
func worker(db storage.Database, ws types.ApplicationServiceWorkerState) { func worker(client *http.Client, db storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"appservice": ws.AppService.ID, "appservice": ws.AppService.ID,
}).Info("starting application service") }).Info("Starting application service")
ctx := context.Background() ctx := context.Background()
// Create a HTTP client for sending requests to app services
client := &http.Client{
Timeout: transactionTimeout,
}
// Initial check for any leftover events to send from last time // Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil { if err != nil {

View file

@ -14,7 +14,7 @@ RUN go build -trimpath -o bin/ ./cmd/generate-keys
FROM alpine:latest FROM alpine:latest
COPY --from=base /build/bin/* /usr/bin COPY --from=base /build/bin/* /usr/bin/
VOLUME /etc/dendrite VOLUME /etc/dendrite
WORKDIR /etc/dendrite WORKDIR /etc/dendrite

View file

@ -14,7 +14,7 @@ RUN go build -trimpath -o bin/ ./cmd/generate-keys
FROM alpine:latest FROM alpine:latest
COPY --from=base /build/bin/* /usr/bin COPY --from=base /build/bin/* /usr/bin/
VOLUME /etc/dendrite VOLUME /etc/dendrite
WORKDIR /etc/dendrite WORKDIR /etc/dendrite

View file

@ -12,6 +12,7 @@ services:
- 8448:8448 - 8448:8448
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
- ./media:/var/dendrite/media
networks: networks:
- internal - internal

View file

@ -15,6 +15,7 @@ services:
command: mediaapi command: mediaapi
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
- ./media:/var/dendrite/media
networks: networks:
- internal - internal

View file

@ -218,6 +218,7 @@ func createRoom(
// check it's free TODO: This races but is better than nothing // check it's free TODO: This races but is better than nothing
hasAliasReq := roomserverAPI.GetRoomIDForAliasRequest{ hasAliasReq := roomserverAPI.GetRoomIDForAliasRequest{
Alias: roomAlias, Alias: roomAlias,
IncludeAppservices: false,
} }
var aliasResp roomserverAPI.GetRoomIDForAliasResponse var aliasResp roomserverAPI.GetRoomIDForAliasResponse

View file

@ -61,9 +61,12 @@ func DirectoryRoom(
var res roomDirectoryResponse var res roomDirectoryResponse
// Query the roomserver API to check if the alias exists locally. // Query the roomserver API to check if the alias exists locally.
queryReq := roomserverAPI.GetRoomIDForAliasRequest{Alias: roomAlias} queryReq := &roomserverAPI.GetRoomIDForAliasRequest{
var queryRes roomserverAPI.GetRoomIDForAliasResponse Alias: roomAlias,
if err = rsAPI.GetRoomIDForAlias(req.Context(), &queryReq, &queryRes); err != nil { IncludeAppservices: true,
}
queryRes := &roomserverAPI.GetRoomIDForAliasResponse{}
if err = rsAPI.GetRoomIDForAlias(req.Context(), queryReq, queryRes); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsAPI.GetRoomIDForAlias failed") util.GetLogger(req.Context()).WithError(err).Error("rsAPI.GetRoomIDForAlias failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -103,8 +103,22 @@ func GetEvent(
} }
} }
var appService *config.ApplicationService
if device.AppserviceID != "" {
for _, as := range cfg.Derived.ApplicationServices {
if as.ID == device.AppserviceID {
appService = &as
break
}
}
}
for _, stateEvent := range stateResp.StateEvents { for _, stateEvent := range stateResp.StateEvents {
if !stateEvent.StateKeyEquals(device.UserID) { if appService != nil {
if !appService.IsInterestedInUserID(*stateEvent.StateKey()) {
continue
}
} else if !stateEvent.StateKeyEquals(device.UserID) {
continue continue
} }
membership, err := stateEvent.Membership() membership, err := stateEvent.Membership()

View file

@ -38,7 +38,10 @@ func UploadKeys(req *http.Request, keyAPI api.KeyInternalAPI, device *userapi.De
return *resErr return *resErr
} }
uploadReq := &api.PerformUploadKeysRequest{} uploadReq := &api.PerformUploadKeysRequest{
DeviceID: device.ID,
UserID: device.UserID,
}
if r.DeviceKeys != nil { if r.DeviceKeys != nil {
uploadReq.DeviceKeys = []api.DeviceKeys{ uploadReq.DeviceKeys = []api.DeviceKeys{
{ {

View file

@ -91,7 +91,6 @@ func GetAvatarURL(
} }
// SetAvatarURL implements PUT /profile/{userID}/avatar_url // SetAvatarURL implements PUT /profile/{userID}/avatar_url
// nolint:gocyclo
func SetAvatarURL( func SetAvatarURL(
req *http.Request, accountDB accounts.Database, req *http.Request, accountDB accounts.Database,
device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI, device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI,
@ -209,7 +208,6 @@ func GetDisplayName(
} }
// SetDisplayName implements PUT /profile/{userID}/displayname // SetDisplayName implements PUT /profile/{userID}/displayname
// nolint:gocyclo
func SetDisplayName( func SetDisplayName(
req *http.Request, accountDB accounts.Database, req *http.Request, accountDB accounts.Database,
device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI, device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI,

View file

@ -496,12 +496,33 @@ func Register(
r.Username = strconv.FormatInt(id, 10) r.Username = strconv.FormatInt(id, 10)
} }
// Is this an appservice registration? It will be if the access
// token is supplied
accessToken, accessTokenErr := auth.ExtractAccessToken(req)
// Squash username to all lowercase letters // Squash username to all lowercase letters
r.Username = strings.ToLower(r.Username) r.Username = strings.ToLower(r.Username)
switch {
case r.Type == authtypes.LoginTypeApplicationService && accessTokenErr == nil:
// Spec-compliant case (the access_token is specified and the login type
// is correctly set, so it's an appservice registration)
if resErr = validateApplicationServiceUsername(r.Username); resErr != nil {
return *resErr
}
case accessTokenErr == nil:
// Non-spec-compliant case (the access_token is specified but the login
// type is not known or specified)
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("A known registration type (e.g. m.login.application_service) must be specified if an access_token is provided"),
}
default:
// Spec-compliant case (neither the access_token nor the login type are
// specified, so it's a normal user registration)
if resErr = validateUsername(r.Username); resErr != nil { if resErr = validateUsername(r.Username); resErr != nil {
return *resErr return *resErr
} }
}
if resErr = validatePassword(r.Password); resErr != nil { if resErr = validatePassword(r.Password); resErr != nil {
return *resErr return *resErr
} }
@ -513,7 +534,7 @@ func Register(
"session_id": r.Auth.Session, "session_id": r.Auth.Session,
}).Info("Processing registration request") }).Info("Processing registration request")
return handleRegistrationFlow(req, r, sessionID, cfg, userAPI) return handleRegistrationFlow(req, r, sessionID, cfg, userAPI, accessToken, accessTokenErr)
} }
func handleGuestRegistration( func handleGuestRegistration(
@ -579,6 +600,8 @@ func handleRegistrationFlow(
sessionID string, sessionID string,
cfg *config.ClientAPI, cfg *config.ClientAPI,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
accessToken string,
accessTokenErr error,
) util.JSONResponse { ) util.JSONResponse {
// TODO: Shared secret registration (create new user scripts) // TODO: Shared secret registration (create new user scripts)
// TODO: Enable registration config flag // TODO: Enable registration config flag
@ -588,12 +611,12 @@ func handleRegistrationFlow(
// TODO: Handle mapping registrationRequest parameters into session parameters // TODO: Handle mapping registrationRequest parameters into session parameters
// TODO: email / msisdn auth types. // TODO: email / msisdn auth types.
accessToken, accessTokenErr := auth.ExtractAccessToken(req)
// Appservices are special and are not affected by disabled // Appservices are special and are not affected by disabled
// registration or user exclusivity. // registration or user exclusivity. We'll go onto the appservice
if r.Auth.Type == authtypes.LoginTypeApplicationService || // registration flow if a valid access token was provided or if
(r.Auth.Type == "" && accessTokenErr == nil) { // the login type specifically requests it.
if r.Type == authtypes.LoginTypeApplicationService && accessTokenErr == nil {
return handleApplicationServiceRegistration( return handleApplicationServiceRegistration(
accessToken, accessTokenErr, req, r, cfg, userAPI, accessToken, accessTokenErr, req, r, cfg, userAPI,
) )

View file

@ -161,7 +161,6 @@ func OnIncomingStateRequest(ctx context.Context, device *userapi.Device, rsAPI a
// state to see if there is an event with that type and state key, if there // state to see if there is an event with that type and state key, if there
// is then (by default) we return the content, otherwise a 404. // is then (by default) we return the content, otherwise a 404.
// If eventFormat=true, sends the whole event else just the content. // If eventFormat=true, sends the whole event else just the content.
// nolint:gocyclo
func OnIncomingStateTypeRequest( func OnIncomingStateTypeRequest(
ctx context.Context, device *userapi.Device, rsAPI api.RoomserverInternalAPI, ctx context.Context, device *userapi.Device, rsAPI api.RoomserverInternalAPI,
roomID, evType, stateKey string, eventFormat bool, roomID, evType, stateKey string, eventFormat bool,

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/crypto/bcrypt"
) )
const usage = `Usage: %s const usage = `Usage: %s
@ -57,7 +58,7 @@ func main() {
accountDB, err := accounts.NewDatabase(&config.DatabaseOptions{ accountDB, err := accounts.NewDatabase(&config.DatabaseOptions{
ConnectionString: cfg.UserAPI.AccountDatabase.ConnectionString, ConnectionString: cfg.UserAPI.AccountDatabase.ConnectionString,
}, cfg.Global.ServerName) }, cfg.Global.ServerName, bcrypt.DefaultCost)
if err != nil { if err != nil {
logrus.Fatalln("Failed to connect to the database:", err.Error()) logrus.Fatalln("Failed to connect to the database:", err.Error())
} }

View file

@ -52,7 +52,6 @@ var (
instancePeer = flag.String("peer", "", "an internet Yggdrasil peer to connect to") instancePeer = flag.String("peer", "", "an internet Yggdrasil peer to connect to")
) )
// nolint:gocyclo
func main() { func main() {
flag.Parse() flag.Parse()
internal.SetupPprof() internal.SetupPprof()

View file

@ -73,7 +73,6 @@ func (n *Node) DialerContext(ctx context.Context, network, address string) (net.
return n.Dialer(network, address) return n.Dialer(network, address)
} }
// nolint:gocyclo
func Setup(instanceName, storageDirectory string) (*Node, error) { func Setup(instanceName, storageDirectory string) (*Node, error) {
n := &Node{ n := &Node{
core: &yggdrasil.Core{}, core: &yggdrasil.Core{},

View file

@ -128,7 +128,6 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
} }
// Implements http.Transport.DialContext // Implements http.Transport.DialContext
// nolint:gocyclo
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) { func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
s, ok1 := n.sessions.Load(address) s, ok1 := n.sessions.Load(address)
session, ok2 := s.(*session) session, ok2 := s.(*session)

View file

@ -20,7 +20,6 @@ var requestFrom = flag.String("from", "", "the server name that the request shou
var requestKey = flag.String("key", "matrix_key.pem", "the private key to use when signing the request") var requestKey = flag.String("key", "matrix_key.pem", "the private key to use when signing the request")
var requestPost = flag.Bool("post", false, "send a POST request instead of GET (pipe input into stdin or type followed by Ctrl-D)") var requestPost = flag.Bool("post", false, "send a POST request instead of GET (pipe input into stdin or type followed by Ctrl-D)")
// nolint:gocyclo
func main() { func main() {
flag.Parse() flag.Parse()

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"golang.org/x/crypto/bcrypt"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -61,12 +62,14 @@ func main() {
} }
if *defaultsForCI { if *defaultsForCI {
cfg.AppServiceAPI.DisableTLSValidation = true
cfg.ClientAPI.RateLimiting.Enabled = false cfg.ClientAPI.RateLimiting.Enabled = false
cfg.FederationSender.DisableTLSValidation = true cfg.FederationSender.DisableTLSValidation = true
cfg.MSCs.MSCs = []string{"msc2836", "msc2946", "msc2444", "msc2753"} cfg.MSCs.MSCs = []string{"msc2836", "msc2946", "msc2444", "msc2753"}
cfg.Logging[0].Level = "trace" cfg.Logging[0].Level = "trace"
// don't hit matrix.org when running tests!!! // don't hit matrix.org when running tests!!!
cfg.SigningKeyServer.KeyPerspectives = config.KeyPerspectives{} cfg.SigningKeyServer.KeyPerspectives = config.KeyPerspectives{}
cfg.UserAPI.BCryptCost = bcrypt.MinCost
} }
j, err := yaml.Marshal(cfg) j, err := yaml.Marshal(cfg)

View file

@ -24,7 +24,6 @@ import (
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as") var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
// nolint:gocyclo
func main() { func main() {
ctx := context.Background() ctx := context.Background()
cfg := setup.ParseFlags(true) cfg := setup.ParseFlags(true)

View file

@ -125,6 +125,11 @@ app_service_api:
max_idle_conns: 2 max_idle_conns: 2
conn_max_lifetime: -1 conn_max_lifetime: -1
# Disable the validation of TLS certificates of appservices. This is
# not recommended in production since it may allow appservice traffic
# to be sent to an unverified endpoint.
disable_tls_validation: false
# Appservice configuration files to load into this homeserver. # Appservice configuration files to load into this homeserver.
config_files: [] config_files: []
@ -235,7 +240,7 @@ media_api:
listen: http://[::]:8074 listen: http://[::]:8074
database: database:
connection_string: file:mediaapi.db connection_string: file:mediaapi.db
max_open_conns: 10 max_open_conns: 5
max_idle_conns: 2 max_idle_conns: 2
conn_max_lifetime: -1 conn_max_lifetime: -1
@ -273,7 +278,7 @@ mscs:
mscs: [] mscs: []
database: database:
connection_string: file:mscs.db connection_string: file:mscs.db
max_open_conns: 10 max_open_conns: 5
max_idle_conns: 2 max_idle_conns: 2
conn_max_lifetime: -1 conn_max_lifetime: -1
@ -335,6 +340,13 @@ sync_api:
# Configuration for the User API. # Configuration for the User API.
user_api: user_api:
# The cost when hashing passwords on registration/login. Default: 10. Min: 4, Max: 31
# See https://pkg.go.dev/golang.org/x/crypto/bcrypt for more information.
# Setting this lower makes registration/login consume less CPU resources at the cost of security
# should the database be compromised. Setting this higher makes registration/login consume more
# CPU resources but makes it harder to brute force password hashes.
# This value can be low if performing tests or on embedded Dendrite instances (e.g WASM builds)
# bcrypt_cost: 10
internal_api: internal_api:
listen: http://localhost:7781 listen: http://localhost:7781
connect: http://localhost:7781 connect: http://localhost:7781
@ -371,4 +383,4 @@ logging:
- type: file - type: file
level: info level: info
params: params:
path: /var/log/dendrite path: ./logs

View file

@ -29,7 +29,6 @@ import (
) )
// MakeJoin implements the /make_join API // MakeJoin implements the /make_join API
// nolint:gocyclo
func MakeJoin( func MakeJoin(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
@ -161,7 +160,6 @@ func MakeJoin(
// SendJoin implements the /send_join API // SendJoin implements the /send_join API
// The make-join send-join dance makes much more sense as a single // The make-join send-join dance makes much more sense as a single
// flow so the cyclomatic complexity is high: // flow so the cyclomatic complexity is high:
// nolint:gocyclo
func SendJoin( func SendJoin(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,

View file

@ -25,7 +25,6 @@ import (
) )
// MakeLeave implements the /make_leave API // MakeLeave implements the /make_leave API
// nolint:gocyclo
func MakeLeave( func MakeLeave(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
@ -118,7 +117,6 @@ func MakeLeave(
} }
// SendLeave implements the /send_leave API // SendLeave implements the /send_leave API
// nolint:gocyclo
func SendLeave( func SendLeave(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,

View file

@ -111,7 +111,6 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
} }
// due to lots of switches // due to lots of switches
// nolint:gocyclo
func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}

View file

@ -53,9 +53,12 @@ func RoomAliasToID(
var resp gomatrixserverlib.RespDirectory var resp gomatrixserverlib.RespDirectory
if domain == cfg.Matrix.ServerName { if domain == cfg.Matrix.ServerName {
queryReq := roomserverAPI.GetRoomIDForAliasRequest{Alias: roomAlias} queryReq := &roomserverAPI.GetRoomIDForAliasRequest{
var queryRes roomserverAPI.GetRoomIDForAliasResponse Alias: roomAlias,
if err = rsAPI.GetRoomIDForAlias(httpReq.Context(), &queryReq, &queryRes); err != nil { IncludeAppservices: true,
}
queryRes := &roomserverAPI.GetRoomIDForAliasResponse{}
if err = rsAPI.GetRoomIDForAlias(httpReq.Context(), queryReq, queryRes); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("aliasAPI.GetRoomIDForAlias failed") util.GetLogger(httpReq.Context()).WithError(err).Error("aliasAPI.GetRoomIDForAlias failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -92,12 +93,13 @@ func Setup(
v2keysmux.Handle("/query", notaryKeys).Methods(http.MethodPost) v2keysmux.Handle("/query", notaryKeys).Methods(http.MethodPost)
v2keysmux.Handle("/query/{serverName}/{keyID}", notaryKeys).Methods(http.MethodGet) v2keysmux.Handle("/query/{serverName}/{keyID}", notaryKeys).Methods(http.MethodGet)
mu := internal.NewMutexByRoom()
v1fedmux.Handle("/send/{txnID}", httputil.MakeFedAPI( v1fedmux.Handle("/send/{txnID}", httputil.MakeFedAPI(
"federation_send", cfg.Matrix.ServerName, keys, wakeup, "federation_send", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send( return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, eduAPI, keyAPI, keys, federation, cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu,
) )
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)

View file

@ -23,16 +23,71 @@ import (
"sync" "sync"
"time" "time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
const (
// Event was passed to the roomserver
MetricsOutcomeOK = "ok"
// Event failed to be processed
MetricsOutcomeFail = "fail"
// Event failed auth checks
MetricsOutcomeRejected = "rejected"
// Terminated the transaction
MetricsOutcomeFatal = "fatal"
// The event has missing auth_events we need to fetch
MetricsWorkMissingAuthEvents = "missing_auth_events"
// No work had to be done as we had all prev/auth events
MetricsWorkDirect = "direct"
// The event has missing prev_events we need to call /g_m_e for
MetricsWorkMissingPrevEvents = "missing_prev_events"
)
var (
pduCountTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_pdus",
Help: "Number of incoming PDUs from remote servers with labels for success",
},
[]string{"status"}, // 'success' or 'total'
)
eduCountTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_edus",
Help: "Number of incoming EDUs from remote servers",
},
)
processEventSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "process_event",
Help: "How long it takes to process an incoming event and what work had to be done for it",
},
[]string{"work", "outcome"},
)
)
func init() {
prometheus.MustRegister(
pduCountTotal, eduCountTotal, processEventSummary,
)
}
// Send implements /_matrix/federation/v1/send/{txnID} // Send implements /_matrix/federation/v1/send/{txnID}
func Send( func Send(
httpReq *http.Request, httpReq *http.Request,
@ -44,6 +99,7 @@ func Send(
keyAPI keyapi.KeyInternalAPI, keyAPI keyapi.KeyInternalAPI,
keys gomatrixserverlib.JSONVerifier, keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
mu *internal.MutexByRoom,
) util.JSONResponse { ) util.JSONResponse {
t := txnReq{ t := txnReq{
rsAPI: rsAPI, rsAPI: rsAPI,
@ -53,6 +109,7 @@ func Send(
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool), newEvents: make(map[string]bool),
keyAPI: keyAPI, keyAPI: keyAPI,
roomsMu: mu,
} }
var txnEvents struct { var txnEvents struct {
@ -109,12 +166,14 @@ type txnReq struct {
federation txnFederationClient federation txnFederationClient
servers []gomatrixserverlib.ServerName servers []gomatrixserverlib.ServerName
serversMutex sync.RWMutex serversMutex sync.RWMutex
roomsMu *internal.MutexByRoom
// local cache of events for auth checks, etc - this may include events // local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of. // which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent haveEvents map[string]*gomatrixserverlib.HeaderedEvent
// new events which the roomserver does not know about // new events which the roomserver does not know about
newEvents map[string]bool newEvents map[string]bool
newEventsMutex sync.RWMutex newEventsMutex sync.RWMutex
work string // metrics
} }
// A subset of FederationClient functionality that txn requires. Useful for testing. // A subset of FederationClient functionality that txn requires. Useful for testing.
@ -133,6 +192,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
pdus := []*gomatrixserverlib.HeaderedEvent{} pdus := []*gomatrixserverlib.HeaderedEvent{}
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
pduCountTotal.WithLabelValues("total").Inc()
var header struct { var header struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
} }
@ -186,6 +246,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
// Process the events. // Process the events.
for _, e := range pdus { for _, e := range pdus {
evStart := time.Now()
if err := t.processEvent(ctx, e.Unwrap()); err != nil { if err := t.processEvent(ctx, e.Unwrap()); err != nil {
// If the error is due to the event itself being bad then we skip // If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the // it and move onto the next event. We report an error so that the
@ -203,27 +264,40 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
// If we bail and stop processing then we risk wedging incoming // If we bail and stop processing then we risk wedging incoming
// transactions from that server forever. // transactions from that server forever.
if isProcessingErrorFatal(err) { if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in // Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely. // our server so we should bail processing the transaction entirely.
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
jsonErr := util.ErrorResponse(err) jsonErr := util.ErrorResponse(err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
return nil, &jsonErr return nil, &jsonErr
} else { } else {
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest // Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := "" errMsg := ""
outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed) _, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected { if !rejected {
errMsg = err.Error() errMsg = err.Error()
outcome = MetricsOutcomeFail
} }
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping", "Failed to process incoming federation event, skipping",
) )
processEventSummary.WithLabelValues(t.work, outcome).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
results[e.EventID()] = gomatrixserverlib.PDUResult{ results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg, Error: errMsg,
} }
} }
} else { } else {
results[e.EventID()] = gomatrixserverlib.PDUResult{} results[e.EventID()] = gomatrixserverlib.PDUResult{}
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
} }
} }
@ -279,9 +353,9 @@ func (t *txnReq) haveEventIDs() map[string]bool {
return result return result
} }
// nolint:gocyclo
func (t *txnReq) processEDUs(ctx context.Context) { func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs { for _, e := range t.EDUs {
eduCountTotal.Inc()
switch e.Type { switch e.Type {
case gomatrixserverlib.MTyping: case gomatrixserverlib.MTyping:
// https://matrix.org/docs/spec/server_server/latest#typing-notifications // https://matrix.org/docs/spec/server_server/latest#typing-notifications
@ -424,7 +498,10 @@ func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserver
} }
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
t.roomsMu.Lock(e.RoomID())
defer t.roomsMu.Unlock(e.RoomID())
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
t.work = "" // reset from previous event
// Work out if the roomserver knows everything it needs to know to auth // Work out if the roomserver knows everything it needs to know to auth
// the event. This includes the prev_events and auth_events. // the event. This includes the prev_events and auth_events.
@ -453,6 +530,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
} }
if len(stateResp.MissingAuthEventIDs) > 0 { if len(stateResp.MissingAuthEventIDs) > 0 {
t.work = MetricsWorkMissingAuthEvents
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs)) logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil { if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil {
return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err) return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err)
@ -460,9 +538,11 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
} }
if len(stateResp.MissingPrevEventIDs) > 0 { if len(stateResp.MissingPrevEventIDs) > 0 {
t.work = MetricsWorkMissingPrevEvents
logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs)) logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs))
return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion) return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion)
} }
t.work = MetricsWorkDirect
// pass the event to the roomserver which will do auth checks // pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
@ -540,7 +620,6 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
return gomatrixserverlib.Allowed(e, &authUsingState) return gomatrixserverlib.Allowed(e, &authUsingState)
} }
// nolint:gocyclo
func (t *txnReq) processEventWithMissingState(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { func (t *txnReq) processEventWithMissingState(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
// Do this with a fresh context, so that we keep working even if the // Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote // original request times out. With any luck, by the time the remote
@ -758,7 +837,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
queryReq := api.QueryEventsByIDRequest{ queryReq := api.QueryEventsByIDRequest{
EventIDs: missingEventList, EventIDs: missingEventList,
} }
util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList) util.GetLogger(ctx).WithField("count", len(missingEventList)).Infof("Fetching missing auth events")
var queryRes api.QueryEventsByIDResponse var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil return nil
@ -828,11 +907,6 @@ retryAllowedState:
}, nil }, nil
} }
// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should
// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
// This means that we may recursively call this function, as we spider back up prev_events.
// nolint:gocyclo
func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, err error) { func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, err error) {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e}) needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e})
@ -935,7 +1009,6 @@ func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID
return &state, nil return &state, nil
} }
// nolint:gocyclo
func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*gomatrixserverlib.RespState, error) { *gomatrixserverlib.RespState, error) {
util.GetLogger(ctx).Infof("lookupMissingStateViaStateIDs %s", eventID) util.GetLogger(ctx).Infof("lookupMissingStateViaStateIDs %s", eventID)

View file

@ -9,6 +9,7 @@ import (
"time" "time"
eduAPI "github.com/matrix-org/dendrite/eduserver/api" eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/internal/test"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -370,6 +371,7 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat
federation: fedClient, federation: fedClient,
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool), newEvents: make(map[string]bool),
roomsMu: internal.NewMutexByRoom(),
} }
t.PDUs = pdus t.PDUs = pdus
t.Origin = testOrigin t.Origin = testOrigin

View file

@ -212,8 +212,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
if receiptServerName != t.ServerName { if receiptServerName != t.ServerName {
log.WithField("other_server", receiptServerName).Info("Suppressing receipt notif: originated elsewhere") return nil // don't log, very spammy as it logs for each remote receipt
return nil
} }
joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID) joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID)

View file

@ -173,7 +173,6 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
// getPendingFromDatabase will look at the database and see if // getPendingFromDatabase will look at the database and see if
// there are any persisted events that haven't been sent to this // there are any persisted events that haven't been sent to this
// destination yet. If so, they will be queued up. // destination yet. If so, they will be queued up.
// nolint:gocyclo
func (oq *destinationQueue) getPendingFromDatabase() { func (oq *destinationQueue) getPendingFromDatabase() {
// Check to see if there's anything to do for this server // Check to see if there's anything to do for this server
// in the database. // in the database.
@ -238,7 +237,6 @@ func (oq *destinationQueue) getPendingFromDatabase() {
} }
// backgroundSend is the worker goroutine for sending events. // backgroundSend is the worker goroutine for sending events.
// nolint:gocyclo
func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) backgroundSend() {
// Check if a worker is already running, and if it isn't, then // Check if a worker is already running, and if it isn't, then
// mark it as started. // mark it as started.
@ -353,7 +351,6 @@ func (oq *destinationQueue) backgroundSend() {
// nextTransaction creates a new transaction from the pending event // nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or // queue and sends it. Returns true if a transaction was sent or
// false otherwise. // false otherwise.
// nolint:gocyclo
func (oq *destinationQueue) nextTransaction( func (oq *destinationQueue) nextTransaction(
pdus []*queuedPDU, pdus []*queuedPDU,
edus []*queuedEDU, edus []*queuedEDU,

35
go.mod
View file

@ -2,12 +2,13 @@ module github.com/matrix-org/dendrite
require ( require (
github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Shopify/sarama v1.27.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/Shopify/sarama v1.28.0
github.com/getsentry/sentry-go v0.10.0
github.com/gologme/log v1.2.0 github.com/gologme/log v1.2.0
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/lib/pq v1.8.0 github.com/lib/pq v1.9.0
github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-circuit v0.4.0 github.com/libp2p/go-libp2p-circuit v0.4.0
github.com/libp2p/go-libp2p-core v0.8.3 github.com/libp2p/go-libp2p-core v0.8.3
@ -21,27 +22,27 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
github.com/matrix-org/gomatrixserverlib v0.0.0-20210216163908-bab1f2be20d0 github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.2 github.com/mattn/go-sqlite3 v1.14.6
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6 github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
github.com/opentracing/opentracing-go v1.2.0 github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/pressly/goose v2.7.0-rc5+incompatible github.com/pressly/goose v2.7.0+incompatible
github.com/prometheus/client_golang v1.7.1 github.com/prometheus/client_golang v1.9.0
github.com/sirupsen/logrus v1.7.0 github.com/sirupsen/logrus v1.8.0
github.com/tidwall/gjson v1.6.7 github.com/tidwall/gjson v1.6.8
github.com/tidwall/sjson v1.1.4 github.com/tidwall/sjson v1.1.5
github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa
go.uber.org/atomic v1.6.0 go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
gopkg.in/h2non/bimg.v1 v1.1.4 gopkg.in/h2non/bimg.v1 v1.1.5
gopkg.in/yaml.v2 v2.3.0 gopkg.in/yaml.v2 v2.4.0
) )
go 1.13 go 1.13

538
go.sum

File diff suppressed because it is too large Load diff

View file

@ -16,6 +16,7 @@ package httputil
import ( import (
"context" "context"
"fmt"
"io" "io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -25,6 +26,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/getsentry/sentry-go"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
federationsenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationsenderAPI "github.com/matrix-org/dendrite/federationsender/api"
@ -59,8 +61,29 @@ func MakeAuthAPI(
logger := util.GetLogger((req.Context())) logger := util.GetLogger((req.Context()))
logger = logger.WithField("user_id", device.UserID) logger = logger.WithField("user_id", device.UserID)
req = req.WithContext(util.ContextWithLogger(req.Context(), logger)) req = req.WithContext(util.ContextWithLogger(req.Context(), logger))
// add the user to Sentry, if enabled
hub := sentry.GetHubFromContext(req.Context())
if hub != nil {
hub.Scope().SetTag("user_id", device.UserID)
hub.Scope().SetTag("device_id", device.ID)
}
defer func() {
if r := recover(); r != nil {
if hub != nil {
hub.CaptureException(fmt.Errorf("%s panicked", req.URL.Path))
}
// re-panic to return the 500
panic(r)
}
}()
return f(req, device) jsonRes := f(req, device)
// do not log 4xx as errors as they are client fails, not server fails
if hub != nil && jsonRes.Code >= 500 {
hub.Scope().SetExtra("response", jsonRes)
hub.CaptureException(fmt.Errorf("%s returned HTTP %d", req.URL.Path, jsonRes.Code))
}
return jsonRes
} }
return MakeExternalAPI(metricsName, h) return MakeExternalAPI(metricsName, h)
} }
@ -195,13 +218,34 @@ func MakeFedAPI(
if fedReq == nil { if fedReq == nil {
return errResp return errResp
} }
// add the user to Sentry, if enabled
hub := sentry.GetHubFromContext(req.Context())
if hub != nil {
hub.Scope().SetTag("origin", string(fedReq.Origin()))
hub.Scope().SetTag("uri", fedReq.RequestURI())
}
defer func() {
if r := recover(); r != nil {
if hub != nil {
hub.CaptureException(fmt.Errorf("%s panicked", req.URL.Path))
}
// re-panic to return the 500
panic(r)
}
}()
go wakeup.Wakeup(req.Context(), fedReq.Origin()) go wakeup.Wakeup(req.Context(), fedReq.Origin())
vars, err := URLDecodeMapValues(mux.Vars(req)) vars, err := URLDecodeMapValues(mux.Vars(req))
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.MatrixErrorResponse(400, "M_UNRECOGNISED", "badly encoded query params")
} }
return f(req, fedReq, vars) jsonRes := f(req, fedReq, vars)
// do not log 4xx as errors as they are client fails, not server fails
if hub != nil && jsonRes.Code >= 500 {
hub.Scope().SetExtra("response", jsonRes)
hub.CaptureException(fmt.Errorf("%s returned HTTP %d", req.URL.Path, jsonRes.Code))
}
return jsonRes
} }
return MakeExternalAPI(metricsName, h) return MakeExternalAPI(metricsName, h)
} }

38
internal/mutex.go Normal file
View file

@ -0,0 +1,38 @@
package internal
import "sync"
type MutexByRoom struct {
mu *sync.Mutex // protects the map
roomToMu map[string]*sync.Mutex
}
func NewMutexByRoom() *MutexByRoom {
return &MutexByRoom{
mu: &sync.Mutex{},
roomToMu: make(map[string]*sync.Mutex),
}
}
func (m *MutexByRoom) Lock(roomID string) {
m.mu.Lock()
roomMu := m.roomToMu[roomID]
if roomMu == nil {
roomMu = &sync.Mutex{}
}
m.roomToMu[roomID] = roomMu
m.mu.Unlock()
// don't lock inside m.mu else we can deadlock
roomMu.Lock()
}
func (m *MutexByRoom) Unlock(roomID string) {
m.mu.Lock()
roomMu := m.roomToMu[roomID]
if roomMu == nil {
panic("MutexByRoom: Unlock before Lock")
}
m.mu.Unlock()
roomMu.Unlock()
}

View file

@ -17,7 +17,7 @@ var build string
const ( const (
VersionMajor = 0 VersionMajor = 0
VersionMinor = 3 VersionMinor = 3
VersionPatch = 10 VersionPatch = 11
VersionTag = "" // example: "rc1" VersionTag = "" // example: "rc1"
) )

View file

@ -108,6 +108,8 @@ type OneTimeKeysCount struct {
// PerformUploadKeysRequest is the request to PerformUploadKeys // PerformUploadKeysRequest is the request to PerformUploadKeys
type PerformUploadKeysRequest struct { type PerformUploadKeysRequest struct {
UserID string // Required - User performing the request
DeviceID string // Optional - Device performing the request, for fetching OTK count
DeviceKeys []DeviceKeys DeviceKeys []DeviceKeys
OneTimeKeys []OneTimeKeys OneTimeKeys []OneTimeKeys
// OnlyDisplayNameUpdates should be `true` if ALL the DeviceKeys are present to update // OnlyDisplayNameUpdates should be `true` if ALL the DeviceKeys are present to update

View file

@ -26,9 +26,28 @@ import (
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var (
deviceListUpdateCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "keyserver",
Name: "device_list_update",
Help: "Number of times we have attempted to update device lists from this server",
},
[]string{"server"},
)
)
func init() {
prometheus.MustRegister(
deviceListUpdateCount,
)
}
// DeviceListUpdater handles device list updates from remote servers. // DeviceListUpdater handles device list updates from remote servers.
// //
// In the case where we have the prev_id for an update, the updater just stores the update (after acquiring a per-user lock). // In the case where we have the prev_id for an update, the updater just stores the update (after acquiring a per-user lock).
@ -319,6 +338,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
} }
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) { func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
deviceListUpdateCount.WithLabelValues(string(serverName)).Inc()
requestTimeout := time.Second * 30 // max amount of time we want to spend on each request requestTimeout := time.Second * 30 // max amount of time we want to spend on each request
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel() defer cancel()
@ -330,16 +350,16 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
logger.WithError(err).Error("failed to load stale device lists") logger.WithError(err).Error("failed to load stale device lists")
return waitTime, true return waitTime, true
} }
hasFailures := false failCount := 0
for _, userID := range userIDs { for _, userID := range userIDs {
if ctx.Err() != nil { if ctx.Err() != nil {
// we've timed out, give up and go to the back of the queue to let another server be processed. // we've timed out, give up and go to the back of the queue to let another server be processed.
hasFailures = true failCount += 1
break break
} }
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
if err != nil { if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") failCount += 1
fcerr, ok := err.(*fedsenderapi.FederationClientError) fcerr, ok := err.(*fedsenderapi.FederationClientError)
if ok { if ok {
if fcerr.RetryAfter > 0 { if fcerr.RetryAfter > 0 {
@ -347,21 +367,26 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
} else if fcerr.Blacklisted { } else if fcerr.Blacklisted {
waitTime = time.Hour * 8 waitTime = time.Hour * 8
} }
} else {
waitTime = time.Hour
logger.WithError(err).Warn("GetUserDevices returned unknown error type")
} }
hasFailures = true
continue continue
} }
err = u.updateDeviceList(&res) err = u.updateDeviceList(&res)
if err != nil { if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
hasFailures = true failCount += 1
} }
} }
if failCount > 0 {
logger.WithField("total", len(userIDs)).WithField("failed", failCount).Error("failed to query device keys for some users")
}
for _, userID := range userIDs { for _, userID := range userIDs {
// always clear the channel to unblock Update calls regardless of success/failure // always clear the channel to unblock Update calls regardless of success/failure
u.clearChannel(userID) u.clearChannel(userID)
} }
return waitTime, hasFailures return waitTime, failCount > 0
} }
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error { func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {

View file

@ -513,6 +513,23 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per
} }
func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) { func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
if req.UserID == "" {
res.Error = &api.KeyError{
Err: "user ID missing",
}
}
if req.DeviceID != "" && len(req.OneTimeKeys) == 0 {
counts, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
if err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("a.DB.OneTimeKeysCount: %s", err),
}
}
if counts != nil {
res.OneTimeKeyCounts = append(res.OneTimeKeyCounts, *counts)
}
return
}
for _, key := range req.OneTimeKeys { for _, key := range req.OneTimeKeys {
// grab existing keys based on (user/device/algorithm/key ID) // grab existing keys based on (user/device/algorithm/key ID)
keyIDsWithAlgorithms := make([]string, len(key.KeyJSON)) keyIDsWithAlgorithms := make([]string, len(key.KeyJSON))
@ -521,9 +538,9 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
keyIDsWithAlgorithms[i] = keyIDWithAlgo keyIDsWithAlgorithms[i] = keyIDWithAlgo
i++ i++
} }
existingKeys, err := a.DB.ExistingOneTimeKeys(ctx, key.UserID, key.DeviceID, keyIDsWithAlgorithms) existingKeys, err := a.DB.ExistingOneTimeKeys(ctx, req.UserID, req.DeviceID, keyIDsWithAlgorithms)
if err != nil { if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{ res.KeyError(req.UserID, req.DeviceID, &api.KeyError{
Err: "failed to query existing one-time keys: " + err.Error(), Err: "failed to query existing one-time keys: " + err.Error(),
}) })
continue continue
@ -531,8 +548,8 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
for keyIDWithAlgo := range existingKeys { for keyIDWithAlgo := range existingKeys {
// if keys exist and the JSON doesn't match, error out as the key already exists // if keys exist and the JSON doesn't match, error out as the key already exists
if !bytes.Equal(existingKeys[keyIDWithAlgo], key.KeyJSON[keyIDWithAlgo]) { if !bytes.Equal(existingKeys[keyIDWithAlgo], key.KeyJSON[keyIDWithAlgo]) {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{ res.KeyError(req.UserID, req.DeviceID, &api.KeyError{
Err: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", key.UserID, key.DeviceID, keyIDWithAlgo), Err: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", req.UserID, req.DeviceID, keyIDWithAlgo),
}) })
continue continue
} }
@ -540,8 +557,8 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
// store one-time keys // store one-time keys
counts, err := a.DB.StoreOneTimeKeys(ctx, key) counts, err := a.DB.StoreOneTimeKeys(ctx, key)
if err != nil { if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{ res.KeyError(req.UserID, req.DeviceID, &api.KeyError{
Err: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", key.UserID, key.DeviceID, err.Error()), Err: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", req.UserID, req.DeviceID, err.Error()),
}) })
continue continue
} }

View file

@ -34,6 +34,9 @@ type SetRoomAliasResponse struct {
type GetRoomIDForAliasRequest struct { type GetRoomIDForAliasRequest struct {
// Alias we want to lookup // Alias we want to lookup
Alias string `json:"alias"` Alias string `json:"alias"`
// Should we ask appservices for their aliases as a part of
// the request?
IncludeAppservices bool `json:"include_appservices"`
} }
// GetRoomIDForAliasResponse is a response to GetRoomIDForAlias // GetRoomIDForAliasResponse is a response to GetRoomIDForAlias

View file

@ -151,7 +151,9 @@ type QueryMembershipsForRoomRequest struct {
JoinedOnly bool `json:"joined_only"` JoinedOnly bool `json:"joined_only"`
// ID of the room to fetch memberships from // ID of the room to fetch memberships from
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
// ID of the user sending the request // Optional - ID of the user sending the request, for checking if the
// user is allowed to see the memberships. If not specified then all
// room memberships will be returned.
Sender string `json:"sender"` Sender string `json:"sender"`
} }

View file

@ -172,7 +172,6 @@ func IsServerBannedFromRoom(ctx context.Context, rsAPI RoomserverInternalAPI, ro
// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the // PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the
// published room directory. // published room directory.
// due to lots of switches // due to lots of switches
// nolint:gocyclo
func PopulatePublicRooms(ctx context.Context, roomIDs []string, rsAPI RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { func PopulatePublicRooms(ctx context.Context, roomIDs []string, rsAPI RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}

View file

@ -88,31 +88,35 @@ func (r *RoomserverInternalAPI) GetRoomIDForAlias(
) error { ) error {
// Look up the room ID in the database // Look up the room ID in the database
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias) roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
if err != nil { if err == nil && roomID != "" {
return err response.RoomID = roomID
return nil
} }
if r.asAPI != nil { // appservice component is wired in // Check appservice on err, but only if the appservice API is
if roomID == "" { // wired in and no room ID was found.
if r.asAPI != nil && request.IncludeAppservices && roomID == "" {
// No room found locally, try our application services by making a call to // No room found locally, try our application services by making a call to
// the appservice component // the appservice component
aliasReq := asAPI.RoomAliasExistsRequest{Alias: request.Alias} aliasReq := &asAPI.RoomAliasExistsRequest{
var aliasResp asAPI.RoomAliasExistsResponse Alias: request.Alias,
if err = r.asAPI.RoomAliasExists(ctx, &aliasReq, &aliasResp); err != nil { }
aliasRes := &asAPI.RoomAliasExistsResponse{}
if err = r.asAPI.RoomAliasExists(ctx, aliasReq, aliasRes); err != nil {
return err return err
} }
if aliasResp.AliasExists { if aliasRes.AliasExists {
roomID, err = r.DB.GetRoomIDForAlias(ctx, request.Alias) roomID, err = r.DB.GetRoomIDForAlias(ctx, request.Alias)
if err != nil { if err != nil {
return err return err
} }
} response.RoomID = roomID
return nil
} }
} }
response.RoomID = roomID return err
return nil
} }
// GetAliasesForRoomID implements alias.RoomserverInternalAPI // GetAliasesForRoomID implements alias.RoomserverInternalAPI

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
asAPI "github.com/matrix-org/dendrite/appservice/api" asAPI "github.com/matrix-org/dendrite/appservice/api"
fsAPI "github.com/matrix-org/dendrite/federationsender/api" fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
@ -89,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
Cfg: r.Cfg, Cfg: r.Cfg,
DB: r.DB, DB: r.DB,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
RSAPI: r,
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.Peeker = &perform.Peeker{ r.Peeker = &perform.Peeker{
@ -144,6 +146,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
) error { ) error {
outputEvents, err := r.Inviter.PerformInvite(ctx, req, res) outputEvents, err := r.Inviter.PerformInvite(ctx, req, res)
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }
if len(outputEvents) == 0 { if len(outputEvents) == 0 {
@ -159,6 +162,7 @@ func (r *RoomserverInternalAPI) PerformLeave(
) error { ) error {
outputEvents, err := r.Leaver.PerformLeave(ctx, req, res) outputEvents, err := r.Leaver.PerformLeave(ctx, req, res)
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }
if len(outputEvents) == 0 { if len(outputEvents) == 0 {

View file

@ -270,7 +270,6 @@ func CheckServerAllowedToSeeEvent(
} }
// TODO: Remove this when we have tests to assert correctness of this function // TODO: Remove this when we have tests to assert correctness of this function
// nolint:gocyclo
func ScanEventTree( func ScanEventTree(
ctx context.Context, db storage.Database, info types.RoomInfo, front []string, visited map[string]bool, limit int, ctx context.Context, db storage.Database, info types.RoomInfo, front []string, visited map[string]bool, limit int,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,

View file

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/hooks"
"github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -64,6 +65,8 @@ func (w *inputWorker) start() {
_, task.err = w.r.processRoomEvent(task.ctx, task.event) _, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil { if task.err == nil {
hooks.Run(hooks.KindNewEventPersisted, task.event.Event) hooks.Run(hooks.KindNewEventPersisted, task.event.Event)
} else {
sentry.CaptureException(task.err)
} }
task.wg.Done() task.wg.Done()
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):

View file

@ -381,7 +381,6 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr
// It returns a list of servers which can be queried for backfill requests. These servers // It returns a list of servers which can be queried for backfill requests. These servers
// will be servers that are in the room already. The entries at the beginning are preferred servers // will be servers that are in the room already. The entries at the beginning are preferred servers
// and will be tried first. An empty list will fail the request. // and will be tried first. An empty list will fail the request.
// nolint:gocyclo
func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName { func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName {
// eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use // eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use
// its successor, so look it up. // its successor, so look it up.

View file

@ -37,7 +37,6 @@ type Inviter struct {
Inputer *input.Inputer Inputer *input.Inputer
} }
// nolint:gocyclo
func (r *Inviter) PerformInvite( func (r *Inviter) PerformInvite(
ctx context.Context, ctx context.Context,
req *api.PerformInviteRequest, req *api.PerformInviteRequest,

View file

@ -21,9 +21,11 @@ import (
"strings" "strings"
"time" "time"
"github.com/getsentry/sentry-go"
fsAPI "github.com/matrix-org/dendrite/federationsender/api" fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
rsAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
@ -36,6 +38,7 @@ type Joiner struct {
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
Cfg *config.RoomServer Cfg *config.RoomServer
FSAPI fsAPI.FederationSenderInternalAPI FSAPI fsAPI.FederationSenderInternalAPI
RSAPI rsAPI.RoomserverInternalAPI
DB storage.Database DB storage.Database
Inputer *input.Inputer Inputer *input.Inputer
@ -49,6 +52,7 @@ func (r *Joiner) PerformJoin(
) { ) {
roomID, joinedVia, err := r.performJoin(ctx, req) roomID, joinedVia, err := r.performJoin(ctx, req)
if err != nil { if err != nil {
sentry.CaptureException(err)
perr, ok := err.(*api.PerformError) perr, ok := err.(*api.PerformError)
if ok { if ok {
res.Error = perr res.Error = perr
@ -121,11 +125,17 @@ func (r *Joiner) performJoinRoomByAlias(
roomID = dirRes.RoomID roomID = dirRes.RoomID
req.ServerNames = append(req.ServerNames, dirRes.ServerNames...) req.ServerNames = append(req.ServerNames, dirRes.ServerNames...)
} else { } else {
var getRoomReq = rsAPI.GetRoomIDForAliasRequest{
Alias: req.RoomIDOrAlias,
IncludeAppservices: true,
}
var getRoomRes = rsAPI.GetRoomIDForAliasResponse{}
// Otherwise, look up if we know this room alias locally. // Otherwise, look up if we know this room alias locally.
roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias) err = r.RSAPI.GetRoomIDForAlias(ctx, &getRoomReq, &getRoomRes)
if err != nil { if err != nil {
return "", "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err) return "", "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err)
} }
roomID = getRoomRes.RoomID
} }
// If the room ID is empty then we failed to look up the alias. // If the room ID is empty then we failed to look up the alias.
@ -139,11 +149,20 @@ func (r *Joiner) performJoinRoomByAlias(
} }
// TODO: Break this function up a bit // TODO: Break this function up a bit
// nolint:gocyclo
func (r *Joiner) performJoinRoomByID( func (r *Joiner) performJoinRoomByID(
ctx context.Context, ctx context.Context,
req *api.PerformJoinRequest, req *api.PerformJoinRequest,
) (string, gomatrixserverlib.ServerName, error) { ) (string, gomatrixserverlib.ServerName, error) {
// The original client request ?server_name=... may include this HS so filter that out so we
// don't attempt to make_join with ourselves
for i := 0; i < len(req.ServerNames); i++ {
if req.ServerNames[i] == r.Cfg.Matrix.ServerName {
// delete this entry
req.ServerNames = append(req.ServerNames[:i], req.ServerNames[i+1:]...)
i--
}
}
// Get the domain part of the room ID. // Get the domain part of the room ID.
_, domain, err := gomatrixserverlib.SplitID('!', req.RoomIDOrAlias) _, domain, err := gomatrixserverlib.SplitID('!', req.RoomIDOrAlias)
if err != nil { if err != nil {

View file

@ -49,7 +49,6 @@ func (r *Queryer) QueryLatestEventsAndState(
} }
// QueryStateAfterEvents implements api.RoomserverInternalAPI // QueryStateAfterEvents implements api.RoomserverInternalAPI
// nolint:gocyclo
func (r *Queryer) QueryStateAfterEvents( func (r *Queryer) QueryStateAfterEvents(
ctx context.Context, ctx context.Context,
request *api.QueryStateAfterEventsRequest, request *api.QueryStateAfterEventsRequest,
@ -242,6 +241,30 @@ func (r *Queryer) QueryMembershipsForRoom(
if err != nil { if err != nil {
return err return err
} }
if info == nil {
return nil
}
// If no sender is specified then we will just return the entire
// set of memberships for the room, regardless of whether a specific
// user is allowed to see them or not.
if request.Sender == "" {
var events []types.Event
var eventNIDs []types.EventNID
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, request.JoinedOnly, false)
if err != nil {
return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err)
}
events, err = r.DB.Events(ctx, eventNIDs)
if err != nil {
return fmt.Errorf("r.DB.Events: %w", err)
}
for _, event := range events {
clientEvent := gomatrixserverlib.ToClientEvent(event.Event, gomatrixserverlib.FormatAll)
response.JoinEvents = append(response.JoinEvents, clientEvent)
}
return nil
}
membershipEventNID, stillInRoom, isRoomforgotten, err := r.DB.GetMembership(ctx, info.RoomNID, request.Sender) membershipEventNID, stillInRoom, isRoomforgotten, err := r.DB.GetMembership(ctx, info.RoomNID, request.Sender)
if err != nil { if err != nil {
@ -372,7 +395,6 @@ func (r *Queryer) QueryServerAllowedToSeeEvent(
} }
// QueryMissingEvents implements api.RoomserverInternalAPI // QueryMissingEvents implements api.RoomserverInternalAPI
// nolint:gocyclo
func (r *Queryer) QueryMissingEvents( func (r *Queryer) QueryMissingEvents(
ctx context.Context, ctx context.Context,
request *api.QueryMissingEventsRequest, request *api.QueryMissingEventsRequest,

View file

@ -770,7 +770,6 @@ func (v *StateResolution) resolveConflictsV1(
// Returns a list that combines the entries without conflicts with the result of state resolution for the entries with conflicts. // Returns a list that combines the entries without conflicts with the result of state resolution for the entries with conflicts.
// The returned list is sorted by state key tuple. // The returned list is sorted by state key tuple.
// Returns an error if there was a problem talking to the database. // Returns an error if there was a problem talking to the database.
// nolint:gocyclo
func (v *StateResolution) resolveConflictsV2( func (v *StateResolution) resolveConflictsV2(
ctx context.Context, ctx context.Context,
notConflicted, conflicted []types.StateEntry, notConflicted, conflicted []types.StateEntry,

View file

@ -412,7 +412,6 @@ func (d *Database) GetLatestEventsForUpdate(
return updater, err return updater, err
} }
// nolint:gocyclo
func (d *Database) StoreEvent( func (d *Database) StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event, ctx context.Context, event *gomatrixserverlib.Event,
txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, isRejected bool, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, isRejected bool,
@ -672,7 +671,6 @@ func extractRoomVersionFromCreateEvent(event *gomatrixserverlib.Event) (
// to cross-reference with other tables when loading. // to cross-reference with other tables when loading.
// //
// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction. // Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction.
// nolint:gocyclo
func (d *Database) handleRedactions( func (d *Database) handleRedactions(
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event, ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event,
) (*gomatrixserverlib.Event, string, error) { ) (*gomatrixserverlib.Event, string, error) {
@ -802,7 +800,6 @@ func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event {
// GetStateEvent returns the current state event of a given type for a given room with a given state key // GetStateEvent returns the current state event of a given type for a given room with a given state key
// If no event could be found, returns nil // If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error // If there was an issue during the retrieval, returns an error
// nolint:gocyclo
func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) { func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) {
roomInfo, err := d.RoomInfo(ctx, roomID) roomInfo, err := d.RoomInfo(ctx, roomID)
if err != nil { if err != nil {
@ -893,7 +890,6 @@ func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership
// GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match. // GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match.
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned. // If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
// nolint:gocyclo
func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) { func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) {
eventTypes := make([]string, 0, len(tuples)) eventTypes := make([]string, 0, len(tuples))
for _, tuple := range tuples { for _, tuple := range tuples {

View file

@ -27,6 +27,8 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -114,6 +116,21 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
logrus.WithError(err).Panicf("failed to start opentracing") logrus.WithError(err).Panicf("failed to start opentracing")
} }
if cfg.Global.Sentry.Enabled {
logrus.Info("Setting up Sentry for debugging...")
err = sentry.Init(sentry.ClientOptions{
Dsn: cfg.Global.Sentry.DSN,
Environment: cfg.Global.Sentry.Environment,
Debug: true,
ServerName: string(cfg.Global.ServerName),
Release: "dendrite@" + internal.VersionString(),
AttachStacktrace: true,
})
if err != nil {
logrus.WithError(err).Panic("failed to start Sentry")
}
}
cache, err := caching.NewInMemoryLRUCache(true) cache, err := caching.NewInMemoryLRUCache(true)
if err != nil { if err != nil {
logrus.WithError(err).Warnf("Failed to create cache") logrus.WithError(err).Warnf("Failed to create cache")
@ -263,7 +280,7 @@ func (b *BaseDendrite) KeyServerHTTPClient() keyserverAPI.KeyInternalAPI {
// CreateAccountsDB creates a new instance of the accounts database. Should only // CreateAccountsDB creates a new instance of the accounts database. Should only
// be called once per component. // be called once per component.
func (b *BaseDendrite) CreateAccountsDB() accounts.Database { func (b *BaseDendrite) CreateAccountsDB() accounts.Database {
db, err := accounts.NewDatabase(&b.Cfg.UserAPI.AccountDatabase, b.Cfg.Global.ServerName) db, err := accounts.NewDatabase(&b.Cfg.UserAPI.AccountDatabase, b.Cfg.Global.ServerName, b.Cfg.UserAPI.BCryptCost)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to accounts db") logrus.WithError(err).Panicf("failed to connect to accounts db")
} }
@ -316,7 +333,6 @@ func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationCli
// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on // SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on
// ApiMux under /api/ and adds a prometheus handler under /metrics. // ApiMux under /api/ and adds a prometheus handler under /metrics.
// nolint:gocyclo
func (b *BaseDendrite) SetupAndServeHTTP( func (b *BaseDendrite) SetupAndServeHTTP(
internalHTTPAddr, externalHTTPAddr config.HTTPAddress, internalHTTPAddr, externalHTTPAddr config.HTTPAddress,
certFile, keyFile *string, certFile, keyFile *string,
@ -354,10 +370,26 @@ func (b *BaseDendrite) SetupAndServeHTTP(
internalRouter.Handle("/metrics", httputil.WrapHandlerInBasicAuth(promhttp.Handler(), b.Cfg.Global.Metrics.BasicAuth)) internalRouter.Handle("/metrics", httputil.WrapHandlerInBasicAuth(promhttp.Handler(), b.Cfg.Global.Metrics.BasicAuth))
} }
externalRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(b.PublicClientAPIMux) var clientHandler http.Handler
clientHandler = b.PublicClientAPIMux
if b.Cfg.Global.Sentry.Enabled {
sentryHandler := sentryhttp.New(sentryhttp.Options{
Repanic: true,
})
clientHandler = sentryHandler.Handle(b.PublicClientAPIMux)
}
var federationHandler http.Handler
federationHandler = b.PublicFederationAPIMux
if b.Cfg.Global.Sentry.Enabled {
sentryHandler := sentryhttp.New(sentryhttp.Options{
Repanic: true,
})
federationHandler = sentryHandler.Handle(b.PublicFederationAPIMux)
}
externalRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(clientHandler)
if !b.Cfg.Global.DisableFederation { if !b.Cfg.Global.DisableFederation {
externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux) externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux)
externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(b.PublicFederationAPIMux) externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(federationHandler)
} }
externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux) externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux)
@ -437,6 +469,11 @@ func (b *BaseDendrite) WaitForShutdown() {
b.ProcessContext.ShutdownDendrite() b.ProcessContext.ShutdownDendrite()
b.ProcessContext.WaitForComponentsToFinish() b.ProcessContext.WaitForComponentsToFinish()
if b.Cfg.Global.Sentry.Enabled {
if !sentry.Flush(time.Second * 5) {
logrus.Warnf("failed to flush all Sentry events!")
}
}
logrus.Warnf("Dendrite is exiting now") logrus.Warnf("Dendrite is exiting now")
} }

View file

@ -33,13 +33,17 @@ type AppServiceAPI struct {
Database DatabaseOptions `yaml:"database"` Database DatabaseOptions `yaml:"database"`
// DisableTLSValidation disables the validation of X.509 TLS certs
// on appservice endpoints. This is not recommended in production!
DisableTLSValidation bool `yaml:"disable_tls_validation"`
ConfigFiles []string `yaml:"config_files"` ConfigFiles []string `yaml:"config_files"`
} }
func (c *AppServiceAPI) Defaults() { func (c *AppServiceAPI) Defaults() {
c.InternalAPI.Listen = "http://localhost:7777" c.InternalAPI.Listen = "http://localhost:7777"
c.InternalAPI.Connect = "http://localhost:7777" c.InternalAPI.Connect = "http://localhost:7777"
c.Database.Defaults() c.Database.Defaults(5)
c.Database.ConnectionString = "file:appservice.db" c.Database.ConnectionString = "file:appservice.db"
} }
@ -193,7 +197,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error {
// setupRegexps will create regex objects for exclusive and non-exclusive // setupRegexps will create regex objects for exclusive and non-exclusive
// usernames, aliases and rooms of all application services, so that other // usernames, aliases and rooms of all application services, so that other
// methods can quickly check if a particular string matches any of them. // methods can quickly check if a particular string matches any of them.
func setupRegexps(_ *AppServiceAPI, derived *Derived) (err error) { func setupRegexps(asAPI *AppServiceAPI, derived *Derived) (err error) {
// Combine all exclusive namespaces for later string checking // Combine all exclusive namespaces for later string checking
var exclusiveUsernameStrings, exclusiveAliasStrings []string var exclusiveUsernameStrings, exclusiveAliasStrings []string
@ -201,6 +205,16 @@ func setupRegexps(_ *AppServiceAPI, derived *Derived) (err error) {
// its contents to the overall exlusive regex string. Room regex // its contents to the overall exlusive regex string. Room regex
// not necessary as we aren't denying exclusive room ID creation // not necessary as we aren't denying exclusive room ID creation
for _, appservice := range derived.ApplicationServices { for _, appservice := range derived.ApplicationServices {
// The sender_localpart can be considered an exclusive regex for a single user, so let's do that
// to simplify the code
var senderUserIDSlice = []string{fmt.Sprintf("@%s:%s", appservice.SenderLocalpart, asAPI.Matrix.ServerName)}
usersSlice, found := appservice.NamespaceMap["users"]
if !found {
usersSlice = []ApplicationServiceNamespace{}
appservice.NamespaceMap["users"] = usersSlice
}
appendExclusiveNamespaceRegexs(&senderUserIDSlice, usersSlice)
for key, namespaceSlice := range appservice.NamespaceMap { for key, namespaceSlice := range appservice.NamespaceMap {
switch key { switch key {
case "users": case "users":

View file

@ -25,7 +25,7 @@ type FederationSender struct {
func (c *FederationSender) Defaults() { func (c *FederationSender) Defaults() {
c.InternalAPI.Listen = "http://localhost:7775" c.InternalAPI.Listen = "http://localhost:7775"
c.InternalAPI.Connect = "http://localhost:7775" c.InternalAPI.Connect = "http://localhost:7775"
c.Database.Defaults() c.Database.Defaults(10)
c.Database.ConnectionString = "file:federationsender.db" c.Database.ConnectionString = "file:federationsender.db"
c.FederationMaxRetries = 16 c.FederationMaxRetries = 16

View file

@ -49,6 +49,9 @@ type Global struct {
// Metrics configuration // Metrics configuration
Metrics Metrics `yaml:"metrics"` Metrics Metrics `yaml:"metrics"`
// Sentry configuration
Sentry Sentry `yaml:"sentry"`
// DNS caching options for all outbound HTTP requests // DNS caching options for all outbound HTTP requests
DNSCache DNSCacheOptions `yaml:"dns_cache"` DNSCache DNSCacheOptions `yaml:"dns_cache"`
} }
@ -63,6 +66,7 @@ func (c *Global) Defaults() {
c.Kafka.Defaults() c.Kafka.Defaults()
c.Metrics.Defaults() c.Metrics.Defaults()
c.DNSCache.Defaults() c.DNSCache.Defaults()
c.Sentry.Defaults()
} }
func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
@ -71,6 +75,7 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
c.Kafka.Verify(configErrs, isMonolith) c.Kafka.Verify(configErrs, isMonolith)
c.Metrics.Verify(configErrs, isMonolith) c.Metrics.Verify(configErrs, isMonolith)
c.Sentry.Verify(configErrs, isMonolith)
c.DNSCache.Verify(configErrs, isMonolith) c.DNSCache.Verify(configErrs, isMonolith)
} }
@ -111,6 +116,24 @@ func (c *Metrics) Defaults() {
func (c *Metrics) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *Metrics) Verify(configErrs *ConfigErrors, isMonolith bool) {
} }
// The configuration to use for Sentry error reporting
type Sentry struct {
Enabled bool `yaml:"enabled"`
// The DSN to connect to e.g "https://examplePublicKey@o0.ingest.sentry.io/0"
// See https://docs.sentry.io/platforms/go/configuration/options/
DSN string `yaml:"dsn"`
// The environment e.g "production"
// See https://docs.sentry.io/platforms/go/configuration/environments/
Environment string `yaml:"environment"`
}
func (c *Sentry) Defaults() {
c.Enabled = false
}
func (c *Sentry) Verify(configErrs *ConfigErrors, isMonolith bool) {
}
type DatabaseOptions struct { type DatabaseOptions struct {
// The connection string, file:filename.db or postgres://server.... // The connection string, file:filename.db or postgres://server....
ConnectionString DataSource `yaml:"connection_string"` ConnectionString DataSource `yaml:"connection_string"`
@ -122,8 +145,8 @@ type DatabaseOptions struct {
ConnMaxLifetimeSeconds int `yaml:"conn_max_lifetime"` ConnMaxLifetimeSeconds int `yaml:"conn_max_lifetime"`
} }
func (c *DatabaseOptions) Defaults() { func (c *DatabaseOptions) Defaults(conns int) {
c.MaxOpenConnections = 100 c.MaxOpenConnections = conns
c.MaxIdleConnections = 2 c.MaxIdleConnections = 2
c.ConnMaxLifetimeSeconds = -1 c.ConnMaxLifetimeSeconds = -1
} }

View file

@ -36,7 +36,7 @@ func (k *Kafka) TopicFor(name string) string {
func (c *Kafka) Defaults() { func (c *Kafka) Defaults() {
c.UseNaffka = true c.UseNaffka = true
c.Database.Defaults() c.Database.Defaults(10)
c.Addresses = []string{"localhost:2181"} c.Addresses = []string{"localhost:2181"}
c.Database.ConnectionString = DataSource("file:naffka.db") c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite" c.TopicPrefix = "Dendrite"

View file

@ -11,7 +11,7 @@ type KeyServer struct {
func (c *KeyServer) Defaults() { func (c *KeyServer) Defaults() {
c.InternalAPI.Listen = "http://localhost:7779" c.InternalAPI.Listen = "http://localhost:7779"
c.InternalAPI.Connect = "http://localhost:7779" c.InternalAPI.Connect = "http://localhost:7779"
c.Database.Defaults() c.Database.Defaults(10)
c.Database.ConnectionString = "file:keyserver.db" c.Database.ConnectionString = "file:keyserver.db"
} }

View file

@ -39,7 +39,7 @@ func (c *MediaAPI) Defaults() {
c.InternalAPI.Listen = "http://localhost:7774" c.InternalAPI.Listen = "http://localhost:7774"
c.InternalAPI.Connect = "http://localhost:7774" c.InternalAPI.Connect = "http://localhost:7774"
c.ExternalAPI.Listen = "http://[::]:8074" c.ExternalAPI.Listen = "http://[::]:8074"
c.Database.Defaults() c.Database.Defaults(5)
c.Database.ConnectionString = "file:mediaapi.db" c.Database.ConnectionString = "file:mediaapi.db"
defaultMaxFileSizeBytes := FileSizeBytes(10485760) defaultMaxFileSizeBytes := FileSizeBytes(10485760)

View file

@ -14,7 +14,7 @@ type MSCs struct {
} }
func (c *MSCs) Defaults() { func (c *MSCs) Defaults() {
c.Database.Defaults() c.Database.Defaults(5)
c.Database.ConnectionString = "file:mscs.db" c.Database.ConnectionString = "file:mscs.db"
} }

View file

@ -11,7 +11,7 @@ type RoomServer struct {
func (c *RoomServer) Defaults() { func (c *RoomServer) Defaults() {
c.InternalAPI.Listen = "http://localhost:7770" c.InternalAPI.Listen = "http://localhost:7770"
c.InternalAPI.Connect = "http://localhost:7770" c.InternalAPI.Connect = "http://localhost:7770"
c.Database.Defaults() c.Database.Defaults(10)
c.Database.ConnectionString = "file:roomserver.db" c.Database.ConnectionString = "file:roomserver.db"
} }

View file

@ -22,7 +22,7 @@ type SigningKeyServer struct {
func (c *SigningKeyServer) Defaults() { func (c *SigningKeyServer) Defaults() {
c.InternalAPI.Listen = "http://localhost:7780" c.InternalAPI.Listen = "http://localhost:7780"
c.InternalAPI.Connect = "http://localhost:7780" c.InternalAPI.Connect = "http://localhost:7780"
c.Database.Defaults() c.Database.Defaults(10)
c.Database.ConnectionString = "file:signingkeyserver.db" c.Database.ConnectionString = "file:signingkeyserver.db"
} }

View file

@ -15,7 +15,7 @@ func (c *SyncAPI) Defaults() {
c.InternalAPI.Listen = "http://localhost:7773" c.InternalAPI.Listen = "http://localhost:7773"
c.InternalAPI.Connect = "http://localhost:7773" c.InternalAPI.Connect = "http://localhost:7773"
c.ExternalAPI.Listen = "http://localhost:8073" c.ExternalAPI.Listen = "http://localhost:8073"
c.Database.Defaults() c.Database.Defaults(10)
c.Database.ConnectionString = "file:syncapi.db" c.Database.ConnectionString = "file:syncapi.db"
} }

View file

@ -1,10 +1,15 @@
package config package config
import "golang.org/x/crypto/bcrypt"
type UserAPI struct { type UserAPI struct {
Matrix *Global `yaml:"-"` Matrix *Global `yaml:"-"`
InternalAPI InternalAPIOptions `yaml:"internal_api"` InternalAPI InternalAPIOptions `yaml:"internal_api"`
// The cost when hashing passwords.
BCryptCost int `yaml:"bcrypt_cost"`
// The Account database stores the login details and account information // The Account database stores the login details and account information
// for local users. It is accessed by the UserAPI. // for local users. It is accessed by the UserAPI.
AccountDatabase DatabaseOptions `yaml:"account_database"` AccountDatabase DatabaseOptions `yaml:"account_database"`
@ -16,10 +21,11 @@ type UserAPI struct {
func (c *UserAPI) Defaults() { func (c *UserAPI) Defaults() {
c.InternalAPI.Listen = "http://localhost:7781" c.InternalAPI.Listen = "http://localhost:7781"
c.InternalAPI.Connect = "http://localhost:7781" c.InternalAPI.Connect = "http://localhost:7781"
c.AccountDatabase.Defaults() c.AccountDatabase.Defaults(10)
c.DeviceDatabase.Defaults() c.DeviceDatabase.Defaults(10)
c.AccountDatabase.ConnectionString = "file:userapi_accounts.db" c.AccountDatabase.ConnectionString = "file:userapi_accounts.db"
c.DeviceDatabase.ConnectionString = "file:userapi_devices.db" c.DeviceDatabase.ConnectionString = "file:userapi_devices.db"
c.BCryptCost = bcrypt.DefaultCost
} }
func (c *UserAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *UserAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {

View file

@ -238,7 +238,6 @@ func federatedEventRelationship(
} }
} }
// nolint:gocyclo
func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) { func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) {
var res gomatrixserverlib.MSC2836EventRelationshipsResponse var res gomatrixserverlib.MSC2836EventRelationshipsResponse
var returnEvents []*gomatrixserverlib.HeaderedEvent var returnEvents []*gomatrixserverlib.HeaderedEvent

View file

@ -70,11 +70,11 @@ func Enable(
} }
}) })
base.PublicClientAPIMux.Handle("/unstable/rooms/{roomID}/spaces", base.PublicClientAPIMux.Handle("/unstable/org.matrix.msc2946/rooms/{roomID}/spaces",
httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI, fsAPI, base.Cfg.Global.ServerName)), httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI, fsAPI, base.Cfg.Global.ServerName)),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
base.PublicFederationAPIMux.Handle("/unstable/spaces/{roomID}", httputil.MakeExternalAPI( base.PublicFederationAPIMux.Handle("/unstable/org.matrix.msc2946/spaces/{roomID}", httputil.MakeExternalAPI(
"msc2946_fed_spaces", func(req *http.Request) util.JSONResponse { "msc2946_fed_spaces", func(req *http.Request) util.JSONResponse {
fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest(
req, time.Now(), base.Cfg.Global.ServerName, keyRing, req, time.Now(), base.Cfg.Global.ServerName, keyRing,
@ -217,7 +217,6 @@ func (w *walker) markSent(id string) {
w.inMemoryBatchCache[w.callerID()] = m w.inMemoryBatchCache[w.callerID()] = m
} }
// nolint:gocyclo
func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse { func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
var res gomatrixserverlib.MSC2946SpacesResponse var res gomatrixserverlib.MSC2946SpacesResponse
// Begin walking the graph starting with the room ID in the request in a queue of unvisited rooms // Begin walking the graph starting with the room ID in the request in a queue of unvisited rooms

View file

@ -309,7 +309,7 @@ func postSpaces(t *testing.T, expectCode int, accessToken, roomID string, req *g
t.Fatalf("failed to marshal request: %s", err) t.Fatalf("failed to marshal request: %s", err)
} }
httpReq, err := http.NewRequest( httpReq, err := http.NewRequest(
"POST", "http://localhost:8010/_matrix/client/unstable/rooms/"+url.PathEscape(roomID)+"/spaces", "POST", "http://localhost:8010/_matrix/client/unstable/org.matrix.msc2946/rooms/"+url.PathEscape(roomID)+"/spaces",
bytes.NewBuffer(data), bytes.NewBuffer(data),
) )
httpReq.Header.Set("Authorization", "Bearer "+accessToken) httpReq.Header.Set("Authorization", "Bearer "+accessToken)

View file

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -78,6 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("client API server output log: message parse failure") log.WithError(err).Errorf("client API server output log: message parse failure")
sentry.CaptureException(err)
return nil return nil
} }
@ -90,6 +92,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
context.TODO(), string(msg.Key), output.RoomID, output.Type, context.TODO(), string(msg.Key), output.RoomID, output.Type,
) )
if err != nil { if err != nil {
sentry.CaptureException(err)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": output.Type, "type": output.Type,
"room_id": output.RoomID, "room_id": output.RoomID,

View file

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -78,6 +79,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return nil return nil
} }
@ -90,6 +92,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
output.Timestamp, output.Timestamp,
) )
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }

View file

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -82,11 +83,13 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return err return err
} }
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID) _, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }
if domain != s.serverName { if domain != s.serverName {
@ -104,6 +107,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent, context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent,
) )
if err != nil { if err != nil {
sentry.CaptureException(err)
log.WithError(err).Errorf("failed to store send-to-device message") log.WithError(err).Errorf("failed to store send-to-device message")
return err return err
} }

View file

@ -18,6 +18,7 @@ import (
"encoding/json" "encoding/json"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -83,6 +84,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return nil return nil
} }

View file

@ -20,6 +20,7 @@ import (
"sync" "sync"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -107,6 +108,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server") log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server")
sentry.CaptureException(err)
return err return err
} }
// work out who we need to notify about the new key // work out who we need to notify about the new key
@ -116,6 +118,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}, &queryRes) }, &queryRes)
if err != nil { if err != nil {
log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
return err return err
} }
// make sure we get our own key updates too! // make sure we get our own key updates too!

View file

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -148,18 +149,21 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ev, err := s.updateStateEvent(ev) ev, err := s.updateStateEvent(ev)
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }
for i := range addsStateEvents { for i := range addsStateEvents {
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i]) addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }
} }
if msg.RewritesState { if msg.RewritesState {
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil { if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
sentry.CaptureException(err)
return fmt.Errorf("s.db.PurgeRoom: %w", err) return fmt.Errorf("s.db.PurgeRoom: %w", err)
} }
} }
@ -187,6 +191,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
sentry.CaptureException(err)
return err return err
} }
@ -279,6 +284,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
} }
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil { if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": msg.Event.EventID(), "event_id": msg.Event.EventID(),
@ -300,6 +306,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
) error { ) error {
pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID) pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID)
if err != nil { if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": msg.EventID, "event_id": msg.EventID,
@ -320,6 +327,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
) error { ) error {
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil { if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
log.ErrorKey: err, log.ErrorKey: err,

View file

@ -46,7 +46,6 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response // DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information. // be already filled in with join/leave information.
// nolint:gocyclo
func DeviceListCatchup( func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
userID string, res *types.Response, from, to types.LogPosition, userID string, res *types.Response, from, to types.LogPosition,
@ -137,7 +136,6 @@ func DeviceListCatchup(
} }
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
// nolint:gocyclo
func TrackChangedUsers( func TrackChangedUsers(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) { ) (changed, left []string, err error) {

View file

@ -61,7 +61,6 @@ const defaultMessagesLimit = 10
// OnIncomingMessagesRequest implements the /messages endpoint from the // OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API. // client-server API.
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
// nolint:gocyclo
func OnIncomingMessagesRequest( func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string, device *userapi.Device, req *http.Request, db storage.Database, roomID string, device *userapi.Device,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
@ -306,7 +305,6 @@ func (r *messagesReq) retrieveEvents() (
return clientEvents, start, end, err return clientEvents, start, end, err
} }
// nolint:gocyclo
func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user // user shouldn't see, we check the recent events and remove any prior to the join event of the user

View file

@ -36,7 +36,6 @@ type SyncServerDatasource struct {
} }
// NewDatabase creates a new sync server database // NewDatabase creates a new sync server database
// nolint:gocyclo
func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource var d SyncServerDatasource
var err error var err error

View file

@ -661,7 +661,6 @@ func (d *Database) fetchMissingStateEvents(
// exclusive of oldPos, inclusive of newPos, for the rooms in which // exclusive of oldPos, inclusive of newPos, for the rooms in which
// the user has new membership events. // the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it. // A list of joined room IDs is also returned in case the caller needs it.
// nolint:gocyclo
func (d *Database) GetStateDeltas( func (d *Database) GetStateDeltas(
ctx context.Context, device *userapi.Device, ctx context.Context, device *userapi.Device,
r types.Range, userID string, r types.Range, userID string,
@ -773,7 +772,6 @@ func (d *Database) GetStateDeltas(
// requests with full_state=true. // requests with full_state=true.
// Fetches full state for all joined rooms and uses selectStateInRange to get // Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms. // updates for other rooms.
// nolint:gocyclo
func (d *Database) GetStateDeltasForFullStateSync( func (d *Database) GetStateDeltasForFullStateSync(
ctx context.Context, device *userapi.Device, ctx context.Context, device *userapi.Device,
r types.Range, userID string, r types.Range, userID string,

View file

@ -23,7 +23,6 @@ const (
// fields might come from either a StateFilter or an EventFilter, // fields might come from either a StateFilter or an EventFilter,
// and it's easier just to have the caller extract the relevant // and it's easier just to have the caller extract the relevant
// parts. // parts.
// nolint:gocyclo
func prepareWithFilters( func prepareWithFilters(
db *sql.DB, txn *sql.Tx, query string, params []interface{}, db *sql.DB, txn *sql.Tx, query string, params []interface{},
senders, notsenders, types, nottypes []string, excludeEventIDs []string, senders, notsenders, types, nottypes []string, excludeEventIDs []string,

View file

@ -52,7 +52,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
return &d, nil return &d, nil
} }
// nolint:gocyclo
func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) { func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) {
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
return err return err

View file

@ -137,7 +137,6 @@ func (p *PDUStreamProvider) CompleteSync(
return to return to
} }
// nolint:gocyclo
func (p *PDUStreamProvider) IncrementalSync( func (p *PDUStreamProvider) IncrementalSync(
ctx context.Context, ctx context.Context,
req *types.SyncRequest, req *types.SyncRequest,
@ -254,7 +253,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return nil return nil
} }
// nolint:gocyclo
func (p *PDUStreamProvider) getJoinResponseForCompleteSync( func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context, ctx context.Context,
roomID string, roomID string,

View file

@ -67,3 +67,9 @@ Forgotten room messages cannot be paginated
# Blacklisted due to flakiness # Blacklisted due to flakiness
Can re-join room if re-invited Can re-join room if re-invited
# Blacklisted due to flakiness after #1774
Local device key changes get to remote servers with correct prev_id
# Flakey
Local device key changes appear in /keys/changes

View file

@ -143,7 +143,6 @@ Local new device changes appear in v2 /sync
Local update device changes appear in v2 /sync Local update device changes appear in v2 /sync
Get left notifs for other users in sync and /keys/changes when user leaves Get left notifs for other users in sync and /keys/changes when user leaves
Local device key changes get to remote servers Local device key changes get to remote servers
Local device key changes get to remote servers with correct prev_id
Server correctly handles incoming m.device_list_update Server correctly handles incoming m.device_list_update
If remote user leaves room, changes device and rejoins we see update in sync If remote user leaves room, changes device and rejoins we see update in sync
If remote user leaves room, changes device and rejoins we see update in /keys/changes If remote user leaves room, changes device and rejoins we see update in /keys/changes
@ -511,3 +510,10 @@ Can pass a JSON filter as a query parameter
Local room members can get room messages Local room members can get room messages
Remote room members can get room messages Remote room members can get room messages
Guest users can send messages to guest_access rooms if joined Guest users can send messages to guest_access rooms if joined
AS can create a user
AS can create a user with an underscore
AS can create a user with inhibit_login
AS can set avatar for ghosted users
AS can set displayname for ghosted users
Ghost user must register before joining room
Inviting an AS-hosted user asks the AS server

View file

@ -241,6 +241,9 @@ type Device struct {
LastSeenTS int64 LastSeenTS int64
LastSeenIP string LastSeenIP string
UserAgent string UserAgent string
// If the device is for an appservice user,
// this is the appservice ID.
AppserviceID string
} }
// Account represents a Matrix account on this home server. // Account represents a Matrix account on this home server.

View file

@ -161,6 +161,7 @@ func (a *UserInternalAPI) deviceListUpdate(userID string, deviceIDs []string) er
var uploadRes keyapi.PerformUploadKeysResponse var uploadRes keyapi.PerformUploadKeysResponse
a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{ a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{
UserID: userID,
DeviceKeys: deviceKeys, DeviceKeys: deviceKeys,
}, &uploadRes) }, &uploadRes)
if uploadRes.Error != nil { if uploadRes.Error != nil {
@ -217,6 +218,7 @@ func (a *UserInternalAPI) PerformDeviceUpdate(ctx context.Context, req *api.Perf
// display name has changed: update the device key // display name has changed: update the device key
var uploadRes keyapi.PerformUploadKeysResponse var uploadRes keyapi.PerformUploadKeysResponse
a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{ a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{
UserID: req.RequestingUserID,
DeviceKeys: []keyapi.DeviceKeys{ DeviceKeys: []keyapi.DeviceKeys{
{ {
DeviceID: dev.ID, DeviceID: dev.ID,
@ -380,6 +382,7 @@ func (a *UserInternalAPI) queryAppServiceToken(ctx context.Context, token, appSe
ID: types.AppServiceDeviceID, ID: types.AppServiceDeviceID,
// AS dummy device has AS's token. // AS dummy device has AS's token.
AccessToken: token, AccessToken: token,
AppserviceID: appService.ID,
} }
localpart, err := userutil.ParseUsernameParam(appServiceUserID, &a.ServerName) localpart, err := userutil.ParseUsernameParam(appServiceUserID, &a.ServerName)

View file

@ -44,10 +44,11 @@ type Database struct {
accountDatas accountDataStatements accountDatas accountDataStatements
threepids threepidStatements threepids threepidStatements
serverName gomatrixserverlib.ServerName serverName gomatrixserverlib.ServerName
bcryptCost int
} }
// NewDatabase creates a new accounts and profiles database // NewDatabase creates a new accounts and profiles database
func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName) (*Database, error) { func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int) (*Database, error) {
db, err := sqlutil.Open(dbProperties) db, err := sqlutil.Open(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err
@ -56,6 +57,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
serverName: serverName, serverName: serverName,
db: db, db: db,
writer: sqlutil.NewDummyWriter(), writer: sqlutil.NewDummyWriter(),
bcryptCost: bcryptCost,
} }
// Create tables before executing migrations so we don't fail if the table is missing, // Create tables before executing migrations so we don't fail if the table is missing,
@ -131,7 +133,7 @@ func (d *Database) SetDisplayName(
func (d *Database) SetPassword( func (d *Database) SetPassword(
ctx context.Context, localpart, plaintextPassword string, ctx context.Context, localpart, plaintextPassword string,
) error { ) error {
hash, err := hashPassword(plaintextPassword) hash, err := d.hashPassword(plaintextPassword)
if err != nil { if err != nil {
return err return err
} }
@ -170,24 +172,26 @@ func (d *Database) CreateAccount(
func (d *Database) createAccount( func (d *Database) createAccount(
ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string, ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string,
) (*api.Account, error) { ) (*api.Account, error) {
var account *api.Account
var err error var err error
// Generate a password hash if this is not a password-less user // Generate a password hash if this is not a password-less user
hash := "" hash := ""
if plaintextPassword != "" { if plaintextPassword != "" {
hash, err = hashPassword(plaintextPassword) hash, err = d.hashPassword(plaintextPassword)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
if err := d.profiles.insertProfile(ctx, txn, localpart); err != nil { if account, err = d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID); err != nil {
if sqlutil.IsUniqueConstraintViolationErr(err) { if sqlutil.IsUniqueConstraintViolationErr(err) {
return nil, sqlutil.ErrUserExists return nil, sqlutil.ErrUserExists
} }
return nil, err return nil, err
} }
if err = d.profiles.insertProfile(ctx, txn, localpart); err != nil {
if err := d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{ return nil, err
}
if err = d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{
"global": { "global": {
"content": [], "content": [],
"override": [], "override": [],
@ -198,7 +202,7 @@ func (d *Database) createAccount(
}`)); err != nil { }`)); err != nil {
return nil, err return nil, err
} }
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID) return account, nil
} }
// SaveAccountData saves new account data for a given user and a given room. // SaveAccountData saves new account data for a given user and a given room.
@ -244,8 +248,8 @@ func (d *Database) GetNewNumericLocalpart(
return d.accounts.selectNewNumericLocalpart(ctx, nil) return d.accounts.selectNewNumericLocalpart(ctx, nil)
} }
func hashPassword(plaintext string) (hash string, err error) { func (d *Database) hashPassword(plaintext string) (hash string, err error) {
hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost) hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), d.bcryptCost)
return string(hashBytes), err return string(hashBytes), err
} }

View file

@ -1,27 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package sqlite3
import (
"errors"
"github.com/mattn/go-sqlite3"
)
func isConstraintError(err error) bool {
return errors.Is(err, sqlite3.ErrConstraint)
}

View file

@ -42,6 +42,7 @@ type Database struct {
accountDatas accountDataStatements accountDatas accountDataStatements
threepids threepidStatements threepids threepidStatements
serverName gomatrixserverlib.ServerName serverName gomatrixserverlib.ServerName
bcryptCost int
accountsMu sync.Mutex accountsMu sync.Mutex
profilesMu sync.Mutex profilesMu sync.Mutex
@ -50,7 +51,7 @@ type Database struct {
} }
// NewDatabase creates a new accounts and profiles database // NewDatabase creates a new accounts and profiles database
func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName) (*Database, error) { func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int) (*Database, error) {
db, err := sqlutil.Open(dbProperties) db, err := sqlutil.Open(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err
@ -59,6 +60,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
serverName: serverName, serverName: serverName,
db: db, db: db,
writer: sqlutil.NewExclusiveWriter(), writer: sqlutil.NewExclusiveWriter(),
bcryptCost: bcryptCost,
} }
// Create tables before executing migrations so we don't fail if the table is missing, // Create tables before executing migrations so we don't fail if the table is missing,
@ -143,7 +145,7 @@ func (d *Database) SetDisplayName(
func (d *Database) SetPassword( func (d *Database) SetPassword(
ctx context.Context, localpart, plaintextPassword string, ctx context.Context, localpart, plaintextPassword string,
) error { ) error {
hash, err := hashPassword(plaintextPassword) hash, err := d.hashPassword(plaintextPassword)
if err != nil { if err != nil {
return err return err
} }
@ -204,22 +206,22 @@ func (d *Database) createAccount(
ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string, ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string,
) (*api.Account, error) { ) (*api.Account, error) {
var err error var err error
var account *api.Account
// Generate a password hash if this is not a password-less user // Generate a password hash if this is not a password-less user
hash := "" hash := ""
if plaintextPassword != "" { if plaintextPassword != "" {
hash, err = hashPassword(plaintextPassword) hash, err = d.hashPassword(plaintextPassword)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
if err := d.profiles.insertProfile(ctx, txn, localpart); err != nil { if account, err = d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID); err != nil {
if isConstraintError(err) {
return nil, sqlutil.ErrUserExists return nil, sqlutil.ErrUserExists
} }
if err = d.profiles.insertProfile(ctx, txn, localpart); err != nil {
return nil, err return nil, err
} }
if err = d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{
if err := d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{
"global": { "global": {
"content": [], "content": [],
"override": [], "override": [],
@ -230,7 +232,7 @@ func (d *Database) createAccount(
}`)); err != nil { }`)); err != nil {
return nil, err return nil, err
} }
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID) return account, nil
} }
// SaveAccountData saves new account data for a given user and a given room. // SaveAccountData saves new account data for a given user and a given room.
@ -278,8 +280,8 @@ func (d *Database) GetNewNumericLocalpart(
return d.accounts.selectNewNumericLocalpart(ctx, nil) return d.accounts.selectNewNumericLocalpart(ctx, nil)
} }
func hashPassword(plaintext string) (hash string, err error) { func (d *Database) hashPassword(plaintext string) (hash string, err error) {
hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost) hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), d.bcryptCost)
return string(hashBytes), err return string(hashBytes), err
} }

View file

@ -27,12 +27,12 @@ import (
// NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme) // NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme)
// and sets postgres connection parameters // and sets postgres connection parameters
func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName) (Database, error) { func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties, serverName) return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost)
case dbProperties.ConnectionString.IsPostgres(): case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(dbProperties, serverName) return postgres.NewDatabase(dbProperties, serverName, bcryptCost)
default: default:
return nil, fmt.Errorf("unexpected database type") return nil, fmt.Errorf("unexpected database type")
} }

View file

@ -25,10 +25,11 @@ import (
func NewDatabase( func NewDatabase(
dbProperties *config.DatabaseOptions, dbProperties *config.DatabaseOptions,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
bcryptCost int,
) (Database, error) { ) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties, serverName) return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost)
case dbProperties.ConnectionString.IsPostgres(): case dbProperties.ConnectionString.IsPostgres():
return nil, fmt.Errorf("can't use Postgres implementation") return nil, fmt.Errorf("can't use Postgres implementation")
default: default:

View file

@ -37,7 +37,6 @@ func AddInternalRoutes(router *mux.Router, intAPI api.UserInternalAPI) {
func NewInternalAPI( func NewInternalAPI(
accountDB accounts.Database, cfg *config.UserAPI, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI, accountDB accounts.Database, cfg *config.UserAPI, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
) api.UserInternalAPI { ) api.UserInternalAPI {
deviceDB, err := devices.NewDatabase(&cfg.DeviceDatabase, cfg.Matrix.ServerName) deviceDB, err := devices.NewDatabase(&cfg.DeviceDatabase, cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to device db") logrus.WithError(err).Panicf("failed to connect to device db")

View file

@ -16,6 +16,7 @@ import (
"github.com/matrix-org/dendrite/userapi/inthttp" "github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt"
) )
const ( const (
@ -25,7 +26,7 @@ const (
func MustMakeInternalAPI(t *testing.T) (api.UserInternalAPI, accounts.Database) { func MustMakeInternalAPI(t *testing.T) (api.UserInternalAPI, accounts.Database) {
accountDB, err := accounts.NewDatabase(&config.DatabaseOptions{ accountDB, err := accounts.NewDatabase(&config.DatabaseOptions{
ConnectionString: "file::memory:", ConnectionString: "file::memory:",
}, serverName) }, serverName, bcrypt.MinCost)
if err != nil { if err != nil {
t.Fatalf("failed to create account DB: %s", err) t.Fatalf("failed to create account DB: %s", err)
} }