Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/fts2

This commit is contained in:
Till Faelligen 2022-06-13 20:05:27 +02:00
commit 7b5baa6da8
32 changed files with 920 additions and 191 deletions

View file

@ -1,5 +1,34 @@
# Changelog
## Dendrite 0.8.8 (2022-06-09)
### Features
* The performance of state resolution has been increased significantly for larger rooms
* A number of changes have been made to rate limiting:
* Logged in users will now be rate-limited on a per-session basis rather than by remote IP
* Rate limiting no longer applies to admin or appservice users
* It is now possible to configure additional users that are exempt from rate limiting using the `exempt_user_ids` option in the `rate_limiting` section of the Dendrite config
* Setting state is now idempotent via the client API state endpoints
### Fixes
* Room upgrades now properly propagate tombstone events to remote servers
* Room upgrades will no longer send tombstone events if creating the upgraded room fails
* A crash has been fixed when evaluating restricted room joins
## Dendrite 0.8.7 (2022-06-01)
### Features
* Support added for room version 10
### Fixes
* A number of state handling bugs have been fixed, which previously resulted in missing state events, unexpected state deletions, reverted memberships and unexpectedly rejected/soft-failed events in some specific cases
* Fixed destination queue performance issues as a result of missing indexes, which speeds up outbound federation considerably
* A bug which could cause the `/register` endpoint to return HTTP 500 has been fixed
## Dendrite 0.8.6 (2022-05-26)
### Features

View file

@ -261,7 +261,7 @@ func (m *DendriteMonolith) Start() {
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
cfg.ClientAPI.RegistrationDisabled = false
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
if err := cfg.Derive(); err != nil {
if err = cfg.Derive(); err != nil {
panic(err)
}
@ -342,11 +342,23 @@ func (m *DendriteMonolith) Start() {
go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName)
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")))
switch m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")) {
case net.ErrClosed, http.ErrServerClosed:
m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
default:
m.logger.Fatal(err)
}
}()
go func() {
logrus.Info("Listening on ", m.listener.Addr())
logrus.Fatal(http.Serve(m.listener, httpRouter))
switch http.Serve(m.listener, httpRouter) {
case net.ErrClosed, http.ErrServerClosed:
m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
default:
m.logger.Fatal(err)
}
}()
}

View file

@ -170,11 +170,11 @@ func (m *DendriteMonolith) Start() {
go func() {
m.logger.Info("Listening on ", ygg.DerivedServerName())
m.logger.Fatal(m.httpServer.Serve(ygg))
m.logger.Error(m.httpServer.Serve(ygg))
}()
go func() {
logrus.Info("Listening on ", m.listener.Addr())
logrus.Fatal(http.Serve(m.listener, httpRouter))
logrus.Error(http.Serve(m.listener, httpRouter))
}()
go func() {
logrus.Info("Sending wake-up message to known nodes")

View file

@ -44,7 +44,7 @@ func GetAliases(
return util.ErrorResponse(fmt.Errorf("rsAPI.QueryCurrentState: %w", err))
}
visibility := "invite"
visibility := gomatrixserverlib.HistoryVisibilityInvited
if historyVisEvent, ok := stateRes.StateEvents[stateTuple]; ok {
var err error
visibility, err = historyVisEvent.HistoryVisibility()

View file

@ -29,9 +29,10 @@ import (
"sync"
"time"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/tidwall/gjson"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/tokens"
@ -68,9 +69,10 @@ const (
// It shouldn't be passed by value because it contains a mutex.
type sessionsDict struct {
sync.RWMutex
sessions map[string][]authtypes.LoginType
params map[string]registerRequest
timer map[string]*time.Timer
sessions map[string][]authtypes.LoginType
sessionCompletedResult map[string]registerResponse
params map[string]registerRequest
timer map[string]*time.Timer
// deleteSessionToDeviceID protects requests to DELETE /devices/{deviceID} from being abused.
// If a UIA session is started by trying to delete device1, and then UIA is completed by deleting device2,
// the delete request will fail for device2 since the UIA was initiated by trying to delete device1.
@ -115,6 +117,7 @@ func (d *sessionsDict) deleteSession(sessionID string) {
delete(d.params, sessionID)
delete(d.sessions, sessionID)
delete(d.deleteSessionToDeviceID, sessionID)
delete(d.sessionCompletedResult, sessionID)
// stop the timer, e.g. because the registration was completed
if t, ok := d.timer[sessionID]; ok {
if !t.Stop() {
@ -130,6 +133,7 @@ func (d *sessionsDict) deleteSession(sessionID string) {
func newSessionsDict() *sessionsDict {
return &sessionsDict{
sessions: make(map[string][]authtypes.LoginType),
sessionCompletedResult: make(map[string]registerResponse),
params: make(map[string]registerRequest),
timer: make(map[string]*time.Timer),
deleteSessionToDeviceID: make(map[string]string),
@ -173,6 +177,19 @@ func (d *sessionsDict) addDeviceToDelete(sessionID, deviceID string) {
d.deleteSessionToDeviceID[sessionID] = deviceID
}
func (d *sessionsDict) addCompletedRegistration(sessionID string, response registerResponse) {
d.Lock()
defer d.Unlock()
d.sessionCompletedResult[sessionID] = response
}
func (d *sessionsDict) getCompletedRegistration(sessionID string) (registerResponse, bool) {
d.RLock()
defer d.RUnlock()
result, ok := d.sessionCompletedResult[sessionID]
return result, ok
}
func (d *sessionsDict) getDeviceToDelete(sessionID string) (string, bool) {
d.RLock()
defer d.RUnlock()
@ -544,6 +561,14 @@ func Register(
r.DeviceID = data.DeviceID
r.InitialDisplayName = data.InitialDisplayName
r.InhibitLogin = data.InhibitLogin
// Check if the user already registered using this session, if so, return that result
if response, ok := sessions.getCompletedRegistration(sessionID); ok {
return util.JSONResponse{
Code: http.StatusOK,
JSON: response,
}
}
}
if resErr := httputil.UnmarshalJSON(reqBody, &r); resErr != nil {
return *resErr
@ -839,13 +864,6 @@ func completeRegistration(
displayName, deviceID *string,
accType userapi.AccountType,
) util.JSONResponse {
var registrationOK bool
defer func() {
if registrationOK {
sessions.deleteSession(sessionID)
}
}()
if username == "" {
return util.JSONResponse{
Code: http.StatusBadRequest,
@ -886,7 +904,6 @@ func completeRegistration(
// Check whether inhibit_login option is set. If so, don't create an access
// token or a device for this user
if inhibitLogin {
registrationOK = true
return util.JSONResponse{
Code: http.StatusOK,
JSON: registerResponse{
@ -920,15 +937,17 @@ func completeRegistration(
}
}
registrationOK = true
result := registerResponse{
UserID: devRes.Device.UserID,
AccessToken: devRes.Device.AccessToken,
HomeServer: accRes.Account.ServerName,
DeviceID: devRes.Device.ID,
}
sessions.addCompletedRegistration(sessionID, result)
return util.JSONResponse{
Code: http.StatusOK,
JSON: registerResponse{
UserID: devRes.Device.UserID,
AccessToken: devRes.Device.AccessToken,
HomeServer: accRes.Account.ServerName,
DeviceID: devRes.Device.ID,
},
JSON: result,
}
}

View file

@ -89,6 +89,9 @@ func Setup(
"r0.4.0",
"r0.5.0",
"r0.6.1",
"v1.0",
"v1.1",
"v1.2",
}, UnstableFeatures: unstableFeatures},
}
}),
@ -137,7 +140,7 @@ func Setup(
synapseAdminRouter.Handle("/admin/v1/send_server_notice/{txnID}",
httputil.MakeAuthAPI("send_server_notice", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
// not specced, but ensure we're rate limiting requests to this endpoint
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -157,7 +160,7 @@ func Setup(
synapseAdminRouter.Handle("/admin/v1/send_server_notice",
httputil.MakeAuthAPI("send_server_notice", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
// not specced, but ensure we're rate limiting requests to this endpoint
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return SendServerNotice(
@ -187,7 +190,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/join/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -203,7 +206,7 @@ func Setup(
if mscCfg.Enabled("msc2753") {
v3mux.Handle("/peek/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Peek, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -223,7 +226,7 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/join",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -237,7 +240,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/leave",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -271,7 +274,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/invite",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -389,14 +392,14 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/register", httputil.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
return *r
}
return Register(req, userAPI, cfg)
})).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/register/available", httputil.MakeExternalAPI("registerAvailable", func(req *http.Request) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
return *r
}
return RegisterAvailable(req, cfg, userAPI)
@ -470,7 +473,7 @@ func Setup(
v3mux.Handle("/rooms/{roomID}/typing/{userID}",
httputil.MakeAuthAPI("rooms_typing", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -527,7 +530,7 @@ func Setup(
v3mux.Handle("/account/whoami",
httputil.MakeAuthAPI("whoami", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return Whoami(req, device)
@ -536,7 +539,7 @@ func Setup(
v3mux.Handle("/account/password",
httputil.MakeAuthAPI("password", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return Password(req, userAPI, device, cfg)
@ -545,7 +548,7 @@ func Setup(
v3mux.Handle("/account/deactivate",
httputil.MakeAuthAPI("deactivate", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return Deactivate(req, userInteractiveAuth, userAPI, device)
@ -556,7 +559,7 @@ func Setup(
v3mux.Handle("/login",
httputil.MakeExternalAPI("login", func(req *http.Request) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
return *r
}
return Login(req, userAPI, cfg)
@ -664,7 +667,7 @@ func Setup(
v3mux.Handle("/pushrules/{scope}/{kind}/{ruleID}",
httputil.MakeAuthAPI("push_rules", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -730,7 +733,7 @@ func Setup(
v3mux.Handle("/profile/{userID}/avatar_url",
httputil.MakeAuthAPI("profile_avatar_url", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -755,7 +758,7 @@ func Setup(
v3mux.Handle("/profile/{userID}/displayname",
httputil.MakeAuthAPI("profile_displayname", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -794,7 +797,7 @@ func Setup(
v3mux.Handle("/voip/turnServer",
httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return RequestTurnServer(req, device, cfg)
@ -873,7 +876,7 @@ func Setup(
v3mux.Handle("/user/{userID}/openid/request_token",
httputil.MakeAuthAPI("openid_request_token", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -886,7 +889,7 @@ func Setup(
v3mux.Handle("/user_directory/search",
httputil.MakeAuthAPI("userdirectory_search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
postContent := struct {
@ -932,7 +935,7 @@ func Setup(
v3mux.Handle("/rooms/{roomID}/read_markers",
httputil.MakeAuthAPI("rooms_read_markers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -945,7 +948,7 @@ func Setup(
v3mux.Handle("/rooms/{roomID}/forget",
httputil.MakeAuthAPI("rooms_forget", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -1022,7 +1025,7 @@ func Setup(
v3mux.Handle("/pushers/set",
httputil.MakeAuthAPI("set_pushers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return SetPusher(req, device, userAPI)
@ -1080,7 +1083,7 @@ func Setup(
v3mux.Handle("/capabilities",
httputil.MakeAuthAPI("capabilities", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return GetCapabilities(req, rsAPI)
@ -1296,7 +1299,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/rooms/{roomId}/receipt/{receiptType}/{eventId}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))

View file

@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"reflect"
"sync"
"time"
@ -96,14 +97,21 @@ func SendEvent(
mutex.(*sync.Mutex).Lock()
defer mutex.(*sync.Mutex).Unlock()
startedGeneratingEvent := time.Now()
var r map[string]interface{} // must be a JSON object
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
}
if stateKey != nil {
// If the existing/new state content are equal, return the existing event_id, making the request idempotent.
if resp := stateEqual(req.Context(), rsAPI, eventType, *stateKey, roomID, r); resp != nil {
return *resp
}
}
startedGeneratingEvent := time.Now()
// If we're sending a membership update, make sure to strip the authorised
// via key if it is present, otherwise other servers won't be able to auth
// the event if the room is set to the "restricted" join rule.
@ -208,6 +216,37 @@ func SendEvent(
return res
}
// stateEqual compares the new and the existing state event content. If they are equal, returns a *util.JSONResponse
// with the existing event_id, making this an idempotent request.
func stateEqual(ctx context.Context, rsAPI api.ClientRoomserverAPI, eventType, stateKey, roomID string, newContent map[string]interface{}) *util.JSONResponse {
stateRes := api.QueryCurrentStateResponse{}
tuple := gomatrixserverlib.StateKeyTuple{
EventType: eventType,
StateKey: stateKey,
}
err := rsAPI.QueryCurrentState(ctx, &api.QueryCurrentStateRequest{
RoomID: roomID,
StateTuples: []gomatrixserverlib.StateKeyTuple{tuple},
}, &stateRes)
if err != nil {
return nil
}
if existingEvent, ok := stateRes.StateEvents[tuple]; ok {
var existingContent map[string]interface{}
if err = json.Unmarshal(existingEvent.Content(), &existingContent); err != nil {
return nil
}
if reflect.DeepEqual(existingContent, newContent) {
return &util.JSONResponse{
Code: http.StatusOK,
JSON: sendEventResponse{existingEvent.EventID()},
}
}
}
return nil
}
func generateSendEvent(
ctx context.Context,
r map[string]interface{},

View file

@ -28,7 +28,9 @@ import (
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
var filterType = flag.String("filtertype", "", "the event types to filter on")
var difference = flag.Bool("difference", false, "whether to calculate the difference between snapshots")
// nolint:gocyclo
func main() {
ctx := context.Background()
cfg := setup.ParseFlags(true)
@ -36,6 +38,7 @@ func main() {
Type: "std",
Level: "error",
})
cfg.ClientAPI.RegistrationDisabled = true
base := base.NewBaseDendrite(cfg, "ResolveState", base.DisableMetrics)
args := flag.Args()
@ -64,6 +67,59 @@ func main() {
RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion),
})
if *difference {
if len(snapshotNIDs) != 2 {
panic("need exactly two state snapshot NIDs to calculate difference")
}
var removed, added []types.StateEntry
removed, added, err = stateres.DifferenceBetweeenStateSnapshots(ctx, snapshotNIDs[0], snapshotNIDs[1])
if err != nil {
panic(err)
}
var eventNIDs []types.EventNID
for _, entry := range append(removed, added...) {
eventNIDs = append(eventNIDs, entry.EventNID)
}
var eventEntries []types.Event
eventEntries, err = roomserverDB.Events(ctx, eventNIDs)
if err != nil {
panic(err)
}
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(eventEntries))
for _, entry := range eventEntries {
events[entry.EventNID] = entry.Event
}
if len(removed) > 0 {
fmt.Println("Removed:")
for _, r := range removed {
event := events[r.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
if len(removed) > 0 && len(added) > 0 {
fmt.Println()
}
if len(added) > 0 {
fmt.Println("Added:")
for _, a := range added {
event := events[a.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
return
}
var stateEntries []types.StateEntry
for _, snapshotNID := range snapshotNIDs {
var entries []types.StateEntry

View file

@ -160,11 +160,14 @@ client_api:
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot"
# will be released after the cooloff time in milliseconds.
# will be released after the cooloff time in milliseconds. Server administrators
# and appservice users are exempt from rate limiting by default.
rate_limiting:
enabled: true
threshold: 5
cooloff_ms: 500
exempt_user_ids:
# - "@user:domain.com"
# Configuration for the Federation API.
federation_api:

View file

@ -163,11 +163,14 @@ client_api:
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot"
# will be released after the cooloff time in milliseconds.
# will be released after the cooloff time in milliseconds. Server administrators
# and appservice users are exempt from rate limiting by default.
rate_limiting:
enabled: true
threshold: 5
cooloff_ms: 500
exempt_user_ids:
# - "@user:domain.com"
# Configuration for the Federation API.
federation_api:

View file

@ -0,0 +1,68 @@
{
# debug
admin off
email example@example.com
default_sni example.com
# Debug endpoint
# acme_ca https://acme-staging-v02.api.letsencrypt.org/directory
}
#######################################################################
# Snippets
#______________________________________________________________________
(handle_errors_maintenance) {
handle_errors {
@maintenance expression {http.error.status_code} == 502
rewrite @maintenance maintenance.html
root * "/path/to/service/pages"
file_server
}
}
(matrix-well-known-header) {
# Headers
header Access-Control-Allow-Origin "*"
header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS"
header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization"
header Content-Type "application/json"
}
#######################################################################
example.com {
# ...
handle /.well-known/matrix/server {
import matrix-well-known-header
respond `{ "m.server": "matrix.example.com:443" }` 200
}
handle /.well-known/matrix/client {
import matrix-well-known-header
respond `{ "m.homeserver": { "base_url": "https://matrix.example.com" } }` 200
}
import handle_errors_maintenance
}
example.com:8448 {
# server<->server HTTPS traffic
reverse_proxy http://dendrite-host:8008
}
matrix.example.com {
handle /_matrix/* {
# client<->server HTTPS traffic
reverse_proxy http://dendrite-host:8008
}
handle_path /* {
# Client webapp (Element SPA or ...)
file_server {
root /path/to/www/example.com/matrix-web-client/
}
}
}

View file

@ -0,0 +1,71 @@
---
title: Optimise your installation
parent: Installation
has_toc: true
nav_order: 10
permalink: /installation/start/optimisation
---
# Optimise your installation
Now that you have Dendrite running, the following tweaks will improve the reliability
and performance of your installation.
## File descriptor limit
Most platforms have a limit on how many file descriptors a single process can open. All
connections made by Dendrite consume file descriptors — this includes database connections
and network requests to remote homeservers. When participating in large federated rooms
where Dendrite must talk to many remote servers, it is often very easy to exhaust default
limits which are quite low.
We currently recommend setting the file descriptor limit to 65535 to avoid such
issues. Dendrite will log immediately after startup if the file descriptor limit is too low:
```
level=warning msg="IMPORTANT: Process file descriptor limit is currently 1024, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues"
```
UNIX systems have two limits: a hard limit and a soft limit. You can view the soft limit
by running `ulimit -Sn` and the hard limit with `ulimit -Hn`:
```bash
$ ulimit -Hn
1048576
$ ulimit -Sn
1024
```
Increase the soft limit before starting Dendrite:
```bash
ulimit -Sn 65535
```
The log line at startup should no longer appear if the limit is sufficient.
If you are running under a systemd service, you can instead add `LimitNOFILE=65535` option
to the `[Service]` section of your service unit file.
## DNS caching
Dendrite has a built-in DNS cache which significantly reduces the load that Dendrite will
place on your DNS resolver. This may also speed up outbound federation.
Consider enabling the DNS cache by modifying the `global` section of your configuration file:
```yaml
dns_cache:
enabled: true
cache_size: 4096
cache_lifetime: 600s
```
## Time synchronisation
Matrix relies heavily on TLS which requires the system time to be correct. If the clock
drifts then you may find that federation no works reliably (or at all) and clients may
struggle to connect to your Dendrite server.
Ensure that the time is synchronised on your system by enabling NTP sync.

2
go.mod
View file

@ -35,7 +35,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220530084946-3a4b148706bc
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13

4
go.sum
View file

@ -460,8 +460,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220530084946-3a4b148706bc h1:58tT3VznINdRWimb3yYb8QWmTAHX9AAuyOwzdmrp9q4=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220530084946-3a4b148706bc/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a h1:jOkrb6twViAGTHHadA51sQwdloHT0Vx1MCptk9InTHo=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -7,6 +7,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
@ -17,6 +18,7 @@ type RateLimits struct {
enabled bool
requestThreshold int64
cooloffDuration time.Duration
exemptUserIDs map[string]struct{}
}
func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
@ -25,6 +27,10 @@ func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
enabled: cfg.Enabled,
requestThreshold: cfg.Threshold,
cooloffDuration: time.Duration(cfg.CooloffMS) * time.Millisecond,
exemptUserIDs: map[string]struct{}{},
}
for _, userID := range cfg.ExemptUserIDs {
l.exemptUserIDs[userID] = struct{}{}
}
if l.enabled {
go l.clean()
@ -52,7 +58,7 @@ func (l *RateLimits) clean() {
}
}
func (l *RateLimits) Limit(req *http.Request) *util.JSONResponse {
func (l *RateLimits) Limit(req *http.Request, device *userapi.Device) *util.JSONResponse {
// If rate limiting is disabled then do nothing.
if !l.enabled {
return nil
@ -67,9 +73,26 @@ func (l *RateLimits) Limit(req *http.Request) *util.JSONResponse {
// First of all, work out if X-Forwarded-For was sent to us. If not
// then we'll just use the IP address of the caller.
caller := req.RemoteAddr
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
caller = forwardedFor
var caller string
if device != nil {
switch device.AccountType {
case userapi.AccountTypeAdmin:
return nil // don't rate-limit server administrators
case userapi.AccountTypeAppService:
return nil // don't rate-limit appservice users
default:
if _, ok := l.exemptUserIDs[device.UserID]; ok {
// If the user is exempt from rate limiting then do nothing.
return nil
}
caller = device.UserID + device.ID
}
} else {
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
caller = forwardedFor
} else {
caller = req.RemoteAddr
}
}
// Look up the caller's channel, if they have one.

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 8
VersionPatch = 6
VersionPatch = 8
VersionTag = "" // example: "rc1"
)

View file

@ -374,7 +374,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
// fetch stale device lists
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
if err != nil {
logger.WithError(err).Error("failed to load stale device lists")
logger.WithError(err).Error("Failed to load stale device lists")
return waitTime, true
}
failCount := 0
@ -399,7 +399,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
}
} else {
waitTime = time.Hour
logger.WithError(err).WithField("user_id", userID).Warn("GetUserDevices returned unknown error type")
logger.WithError(err).WithField("user_id", userID).Debug("GetUserDevices returned unknown error type")
}
continue
}
@ -422,12 +422,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
}
err = u.updateDeviceList(&res)
if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
logger.WithError(err).WithField("user_id", userID).Error("Fetched device list but failed to store/emit it")
failCount += 1
}
}
if failCount > 0 {
logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Error("failed to query device keys for some users")
logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Warn("Failed to query device keys for some users")
}
for _, userID := range userIDs {
// always clear the channel to unblock Update calls regardless of success/failure

View file

@ -62,7 +62,7 @@ func Setup(
uploadHandler := httputil.MakeAuthAPI(
"upload", userAPI,
func(req *http.Request, dev *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, dev); r != nil {
return *r
}
return Upload(req, cfg, dev, db, activeThumbnailGeneration)
@ -70,7 +70,7 @@ func Setup(
)
configHandler := httputil.MakeAuthAPI("config", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
respondSize := &cfg.MaxFileSizeBytes
@ -126,7 +126,7 @@ func makeDownloadAPI(
// Ratelimit requests
// NOTSPEC: The spec says everything at /media/ should be rate limited, but this causes issues with thumbnails (#2243)
if name != "thumbnail" {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
if err := json.NewEncoder(w).Encode(r); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return

View file

@ -161,6 +161,8 @@ type OutputNewRoomEvent struct {
// The transaction ID of the send request if sent by a local user and one
// was specified
TransactionID *TransactionID `json:"transaction_id,omitempty"`
// The history visibility of the event.
HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"`
}
func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
@ -187,7 +189,8 @@ func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.Headere
// should build their current room state up from OutputNewRoomEvents only.
type OutputOldRoomEvent struct {
// The Event.
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"`
}
// An OutputNewInviteEvent is written whenever an invite becomes active.

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
@ -75,6 +76,11 @@ func (r *Inputer) processRoomEvent(
default:
}
span, ctx := opentracing.StartSpanFromContext(ctx, "processRoomEvent")
span.SetTag("room_id", input.Event.RoomID())
span.SetTag("event_id", input.Event.EventID())
defer span.Finish()
// Measure how long it takes to process this event.
started := time.Now()
defer func() {
@ -289,6 +295,22 @@ func (r *Inputer) processRoomEvent(
}
}
// Get the state before the event so that we can work out if the event was
// allowed at the time, and also to get the history visibility. We won't
// bother doing this if the event was already rejected as it just ends up
// burning CPU time.
historyVisibility := gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive.
if rejectionErr == nil && !isRejected && !softfail {
var err error
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
if err != nil {
return fmt.Errorf("r.processStateBefore: %w", err)
}
if rejectionErr != nil {
isRejected = true
}
}
// Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected || softfail)
if err != nil {
@ -354,6 +376,7 @@ func (r *Inputer) processRoomEvent(
input.SendAsServer, // send as server
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
historyVisibility, // the history visibility before the event
); err != nil {
return fmt.Errorf("r.updateLatestEvents: %w", err)
}
@ -362,7 +385,8 @@ func (r *Inputer) processRoomEvent(
{
Type: api.OutputTypeOldRoomEvent,
OldRoomEvent: &api.OutputOldRoomEvent{
Event: headered,
Event: headered,
HistoryVisibility: historyVisibility,
},
},
})
@ -396,6 +420,100 @@ func (r *Inputer) processRoomEvent(
return nil
}
// processStateBefore works out what the state is before the event and
// then checks the event auths against the state at the time. It also
// tries to determine what the history visibility was of the event at
// the time, so that it can be sent in the output event to downstream
// components.
// nolint:nakedret
func (r *Inputer) processStateBefore(
ctx context.Context,
input *api.InputRoomEvent,
missingPrev bool,
) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) {
historyVisibility = gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive.
event := input.Event.Unwrap()
isCreateEvent := event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("")
var stateBeforeEvent []*gomatrixserverlib.Event
switch {
case isCreateEvent:
// There's no state before a create event so there is nothing
// else to do.
return
case input.HasState:
// If we're overriding the state then we need to go and retrieve
// them from the database. It's a hard error if they are missing.
stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs)
if err != nil {
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
}
stateBeforeEvent = make([]*gomatrixserverlib.Event, 0, len(stateEvents))
for _, entry := range stateEvents {
stateBeforeEvent = append(stateBeforeEvent, entry.Event)
}
case missingPrev:
// We don't know all of the prev events, so we can't work out
// the state before the event. Reject it in that case.
rejectionErr = fmt.Errorf("event %q has missing prev events", event.EventID())
return
case len(event.PrevEventIDs()) == 0:
// There should be prev events since it's not a create event.
// A non-create event that claims to have no prev events is
// invalid, so reject it.
rejectionErr = fmt.Errorf("event %q must have prev events", event.EventID())
return
default:
// For all non-create events, there must be prev events, so we'll
// ask the query API for the relevant tuples needed for auth. We
// will include the history visibility here even though we don't
// actually need it for auth, because we want to send it in the
// output events.
tuplesNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event}).Tuples()
tuplesNeeded = append(tuplesNeeded, gomatrixserverlib.StateKeyTuple{
EventType: gomatrixserverlib.MRoomHistoryVisibility,
StateKey: "",
})
stateBeforeReq := &api.QueryStateAfterEventsRequest{
RoomID: event.RoomID(),
PrevEventIDs: event.PrevEventIDs(),
StateToFetch: tuplesNeeded,
}
stateBeforeRes := &api.QueryStateAfterEventsResponse{}
if err := r.Queryer.QueryStateAfterEvents(ctx, stateBeforeReq, stateBeforeRes); err != nil {
return "", nil, fmt.Errorf("r.Queryer.QueryStateAfterEvents: %w", err)
}
switch {
case !stateBeforeRes.RoomExists:
rejectionErr = fmt.Errorf("room %q does not exist", event.RoomID())
return
case !stateBeforeRes.PrevEventsExist:
rejectionErr = fmt.Errorf("prev events of %q are not known", event.EventID())
return
default:
stateBeforeEvent = gomatrixserverlib.UnwrapEventHeaders(stateBeforeRes.StateEvents)
}
}
// At this point, stateBeforeEvent should be populated either by
// the supplied state in the input request, or from the prev events.
// Check whether the event is allowed or not.
stateBeforeAuth := gomatrixserverlib.NewAuthEvents(stateBeforeEvent)
if rejectionErr = gomatrixserverlib.Allowed(event, &stateBeforeAuth); rejectionErr != nil {
return
}
// Work out what the history visibility was at the time of the
// event.
for _, event := range stateBeforeEvent {
if event.Type() != gomatrixserverlib.MRoomHistoryVisibility || !event.StateKeyEquals("") {
continue
}
if hisVis, err := event.HistoryVisibility(); err == nil {
historyVisibility = hisVis
break
}
}
return
}
// fetchAuthEvents will check to see if any of the
// auth events specified by the given event are unknown. If they are
// then we will go off and request them from the federation and then
@ -411,6 +529,9 @@ func (r *Inputer) fetchAuthEvents(
known map[string]*types.Event,
servers []gomatrixserverlib.ServerName,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "fetchAuthEvents")
defer span.Finish()
unknown := map[string]struct{}{}
authEventIDs := event.AuthEventIDs()
if len(authEventIDs) == 0 {
@ -526,6 +647,9 @@ func (r *Inputer) calculateAndSetState(
event *gomatrixserverlib.Event,
isRejected bool,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "calculateAndSetState")
defer span.Finish()
var succeeded bool
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
if err != nil {

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
@ -55,7 +56,11 @@ func (r *Inputer) updateLatestEvents(
sendAsServer string,
transactionID *api.TransactionID,
rewritesState bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "updateLatestEvents")
defer span.Finish()
var succeeded bool
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
if err != nil {
@ -65,15 +70,16 @@ func (r *Inputer) updateLatestEvents(
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
u := latestEventsUpdater{
ctx: ctx,
api: r,
updater: updater,
roomInfo: roomInfo,
stateAtEvent: stateAtEvent,
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
rewritesState: rewritesState,
ctx: ctx,
api: r,
updater: updater,
roomInfo: roomInfo,
stateAtEvent: stateAtEvent,
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
rewritesState: rewritesState,
historyVisibility: historyVisibility,
}
if err = u.doUpdateLatestEvents(); err != nil {
@ -115,6 +121,8 @@ type latestEventsUpdater struct {
// The snapshots of current state before and after processing this event
oldStateNID types.StateSnapshotNID
newStateNID types.StateSnapshotNID
// The history visibility of the event itself (from the state before the event).
historyVisibility gomatrixserverlib.HistoryVisibility
}
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
@ -200,13 +208,16 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
}
func (u *latestEventsUpdater) latestState() error {
span, ctx := opentracing.StartSpanFromContext(u.ctx, "processEventWithMissingState")
defer span.Finish()
var err error
roomState := state.NewStateResolution(u.updater, u.roomInfo)
// Work out if the state at the extremities has actually changed
// or not. If they haven't then we won't bother doing all of the
// hard work.
if u.event.StateKey() == nil {
if !u.stateAtEvent.IsStateEvent() {
stateChanged := false
oldStateNIDs := make([]types.StateSnapshotNID, 0, len(u.oldLatest))
newStateNIDs := make([]types.StateSnapshotNID, 0, len(u.latest))
@ -234,26 +245,19 @@ func (u *latestEventsUpdater) latestState() error {
}
}
// Take the old set of extremities and the new set of extremities and
// mash them together into a list. This may or may not include the new event
// from the input path, depending on whether it became a forward extremity
// or not. We'll then run state resolution across all of them to determine
// the new current state of the room. Including the old extremities here
// ensures that new forward extremities with bad state snapshots (from
// possible malicious actors) can't completely corrupt the room state
// away from what it was before.
combinedExtremities := types.StateAtEventAndReferences(append(u.oldLatest, u.latest...))
combinedExtremities = combinedExtremities[:util.SortAndUnique(combinedExtremities)]
latestStateAtEvents := make([]types.StateAtEvent, len(combinedExtremities))
for i := range combinedExtremities {
latestStateAtEvents[i] = combinedExtremities[i].StateAtEvent
// Get a list of the current latest events. This may or may not
// include the new event from the input path, depending on whether
// it is a forward extremity or not.
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
for i := range u.latest {
latestStateAtEvents[i] = u.latest[i].StateAtEvent
}
// Takes the NIDs of the latest events and creates a state snapshot
// of the state after the events. The snapshot state will be resolved
// using the correct state resolution algorithm for the room.
u.newStateNID, err = roomState.CalculateAndStoreStateAfterEvents(
u.ctx, latestStateAtEvents,
ctx, latestStateAtEvents,
)
if err != nil {
return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err)
@ -265,7 +269,7 @@ func (u *latestEventsUpdater) latestState() error {
// another list of added ones. Replacing a value for a state-key tuple
// will result one removed (the old event) and one added (the new event).
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
u.ctx, u.oldStateNID, u.newStateNID,
ctx, u.oldStateNID, u.newStateNID,
)
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
@ -285,7 +289,7 @@ func (u *latestEventsUpdater) latestState() error {
// Also work out the state before the event removes and the event
// adds.
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = roomState.DifferenceBetweeenStateSnapshots(
u.ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
)
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweeenStateSnapshots: %w", err)
@ -301,6 +305,9 @@ func (u *latestEventsUpdater) calculateLatest(
newEvent *gomatrixserverlib.Event,
newStateAndRef types.StateAtEventAndReference,
) (bool, error) {
span, _ := opentracing.StartSpanFromContext(u.ctx, "calculateLatest")
defer span.Finish()
// First of all, get a list of all of the events in our current
// set of forward extremities.
existingRefs := make(map[string]*types.StateAtEventAndReference)
@ -362,12 +369,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
}
ore := api.OutputNewRoomEvent{
Event: u.event.Headered(u.roomInfo.RoomVersion),
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
SendAsServer: u.sendAsServer,
Event: u.event.Headered(u.roomInfo.RoomVersion),
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
SendAsServer: u.sendAsServer,
HistoryVisibility: u.historyVisibility,
}
eventIDMap, err := u.stateEventMap()

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
// updateMembership updates the current membership and the invites for each
@ -34,6 +35,9 @@ func (r *Inputer) updateMemberships(
updater *shared.RoomUpdater,
removed, added []types.StateEntry,
) ([]api.OutputEvent, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "updateMemberships")
defer span.Finish()
changes := membershipChanges(removed, added)
var eventNIDs []types.EventNID
for _, change := range changes {

View file

@ -15,6 +15,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
@ -59,6 +60,9 @@ type missingStateReq struct {
func (t *missingStateReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "processEventWithMissingState")
defer span.Finish()
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@ -235,6 +239,9 @@ func (t *missingStateReq) processEventWithMissingState(
}
func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupResolvedStateBeforeEvent")
defer span.Finish()
type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver.
// That's because the state will have been through state resolution once
@ -310,6 +317,9 @@ func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateAfterEvent")
defer span.Finish()
// try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
if respState != nil {
@ -361,6 +371,9 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.Event) *gomatrixs
}
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *parsedRespState {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateAfterEventLocally")
defer span.Finish()
var res parsedRespState
roomInfo, err := t.db.RoomInfo(ctx, roomID)
if err != nil {
@ -435,12 +448,17 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
// the server supports.
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateBeforeEvent")
defer span.Finish()
// Attempt to fetch the missing state using /state_ids and /events
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
}
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "resolveStatesAndCheck")
defer span.Finish()
var authEventList []*gomatrixserverlib.Event
var stateEventList []*gomatrixserverlib.Event
for _, state := range states {
@ -484,6 +502,9 @@ retryAllowedState:
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getMissingEvents")
defer span.Finish()
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID)
if err != nil {
@ -608,6 +629,9 @@ func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e *gomatrixserve
func (t *missingStateReq) lookupMissingStateViaState(
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
) (respState *parsedRespState, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaState")
defer span.Finish()
state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion)
if err != nil {
return nil, err
@ -637,6 +661,9 @@ func (t *missingStateReq) lookupMissingStateViaState(
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaStateIDs")
defer span.Finish()
util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event
stateIDs, err := t.federation.LookupStateIDs(ctx, t.origin, roomID, eventID)
@ -799,6 +826,9 @@ func (t *missingStateReq) createRespStateFromStateIDs(
}
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.Event, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupEvent")
defer span.Finish()
if localFirst {
// fetch from the roomserver
events, err := t.db.EventsFromIDs(ctx, []string{missingEventID})

View file

@ -105,13 +105,13 @@ func (r *Upgrader) performRoomUpgrade(
return "", pErr
}
// 5. Send the tombstone event to the old room (must do this before we set the new canonical_alias)
if pErr = r.sendHeaderedEvent(ctx, tombstoneEvent); pErr != nil {
// Send the setup events to the new room
if pErr = r.sendInitialEvents(ctx, evTime, userID, newRoomID, string(req.RoomVersion), eventsToMake); pErr != nil {
return "", pErr
}
// Send the setup events to the new room
if pErr = r.sendInitialEvents(ctx, evTime, userID, newRoomID, string(req.RoomVersion), eventsToMake); pErr != nil {
// 5. Send the tombstone event to the old room
if pErr = r.sendHeaderedEvent(ctx, tombstoneEvent, string(r.Cfg.Matrix.ServerName)); pErr != nil {
return "", pErr
}
@ -147,7 +147,7 @@ func (r *Upgrader) getRoomPowerLevels(ctx context.Context, roomID string) (*goma
if err != nil {
util.GetLogger(ctx).WithError(err).Error()
return nil, &api.PerformError{
Msg: "powerLevel event was not actually a power level event",
Msg: "Power level event was invalid or malformed",
}
}
return powerLevelContent, nil
@ -182,7 +182,7 @@ func (r *Upgrader) restrictOldRoomPowerLevels(ctx context.Context, evTime time.T
return resErr
}
} else {
if resErr = r.sendHeaderedEvent(ctx, restrictedPowerLevelsHeadered); resErr != nil {
if resErr = r.sendHeaderedEvent(ctx, restrictedPowerLevelsHeadered, api.DoNotSendToOtherServers); resErr != nil {
return resErr
}
}
@ -198,7 +198,7 @@ func moveLocalAliases(ctx context.Context,
aliasRes := api.GetAliasesForRoomIDResponse{}
if err = URSAPI.GetAliasesForRoomID(ctx, &aliasReq, &aliasRes); err != nil {
return &api.PerformError{
Msg: "Could not get aliases for old room",
Msg: fmt.Sprintf("Failed to get old room aliases: %s", err),
}
}
@ -207,7 +207,7 @@ func moveLocalAliases(ctx context.Context,
removeAliasRes := api.RemoveRoomAliasResponse{}
if err = URSAPI.RemoveRoomAlias(ctx, &removeAliasReq, &removeAliasRes); err != nil {
return &api.PerformError{
Msg: "api.RemoveRoomAlias failed",
Msg: fmt.Sprintf("Failed to remove old room alias: %s", err),
}
}
@ -215,7 +215,7 @@ func moveLocalAliases(ctx context.Context,
setAliasRes := api.SetRoomAliasResponse{}
if err = URSAPI.SetRoomAlias(ctx, &setAliasReq, &setAliasRes); err != nil {
return &api.PerformError{
Msg: "api.SetRoomAlias failed",
Msg: fmt.Sprintf("Failed to set new room alias: %s", err),
}
}
}
@ -253,7 +253,7 @@ func (r *Upgrader) clearOldCanonicalAliasEvent(ctx context.Context, oldRoom *api
return resErr
}
} else {
if resErr = r.sendHeaderedEvent(ctx, emptyCanonicalAliasEvent); resErr != nil {
if resErr = r.sendHeaderedEvent(ctx, emptyCanonicalAliasEvent, api.DoNotSendToOtherServers); resErr != nil {
return resErr
}
}
@ -509,7 +509,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
err = builder.SetContent(e.Content)
if err != nil {
return &api.PerformError{
Msg: "builder.SetContent failed",
Msg: fmt.Sprintf("Failed to set content of new %q event: %s", builder.Type, err),
}
}
if i > 0 {
@ -519,13 +519,13 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
event, err = r.buildEvent(&builder, &authEvents, evTime, gomatrixserverlib.RoomVersion(newVersion))
if err != nil {
return &api.PerformError{
Msg: "buildEvent failed",
Msg: fmt.Sprintf("Failed to build new %q event: %s", builder.Type, err),
}
}
if err = gomatrixserverlib.Allowed(event, &authEvents); err != nil {
return &api.PerformError{
Msg: "gomatrixserverlib.Allowed failed",
Msg: fmt.Sprintf("Failed to auth new %q event: %s", builder.Type, err),
}
}
@ -534,7 +534,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
err = authEvents.AddEvent(event)
if err != nil {
return &api.PerformError{
Msg: "authEvents.AddEvent failed",
Msg: fmt.Sprintf("Failed to add new %q event to auth set: %s", builder.Type, err),
}
}
}
@ -550,7 +550,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
}
if err = api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
return &api.PerformError{
Msg: "api.SendInputRoomEvents failed",
Msg: fmt.Sprintf("Failed to send new room %q to roomserver: %s", newRoomID, err),
}
}
return nil
@ -582,7 +582,7 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user
err := builder.SetContent(event.Content)
if err != nil {
return nil, &api.PerformError{
Msg: "builder.SetContent failed",
Msg: fmt.Sprintf("Failed to set new %q event content: %s", builder.Type, err),
}
}
var queryRes api.QueryLatestEventsAndStateResponse
@ -607,7 +607,7 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user
}
} else if err != nil {
return nil, &api.PerformError{
Msg: "eventutil.BuildEvent failed",
Msg: fmt.Sprintf("Failed to build new %q event: %s", builder.Type, err),
}
}
// check to see if this user can perform this operation
@ -619,7 +619,7 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user
if err = gomatrixserverlib.Allowed(headeredEvent.Event, &provider); err != nil {
return nil, &api.PerformError{
Code: api.PerformErrorNotAllowed,
Msg: err.Error(), // TODO: Is this error string comprehensible to the client?
Msg: fmt.Sprintf("Failed to auth new %q event: %s", builder.Type, err), // TODO: Is this error string comprehensible to the client?
}
}
@ -666,17 +666,18 @@ func createTemporaryPowerLevels(powerLevelContent *gomatrixserverlib.PowerLevelC
func (r *Upgrader) sendHeaderedEvent(
ctx context.Context,
headeredEvent *gomatrixserverlib.HeaderedEvent,
sendAsServer string,
) *api.PerformError {
var inputs []api.InputRoomEvent
inputs = append(inputs, api.InputRoomEvent{
Kind: api.KindNew,
Event: headeredEvent,
Origin: r.Cfg.Matrix.ServerName,
SendAsServer: api.DoNotSendToOtherServers,
SendAsServer: sendAsServer,
})
if err := api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
return &api.PerformError{
Msg: "api.SendInputRoomEvents failed",
Msg: fmt.Sprintf("Failed to send new %q event to roomserver: %s", headeredEvent.Type(), err),
}
}
@ -703,7 +704,7 @@ func (r *Upgrader) buildEvent(
r.Cfg.Matrix.PrivateKey, roomVersion,
)
if err != nil {
return nil, fmt.Errorf("cannot build event %s : Builder failed to build. %w", builder.Type, err)
return nil, err
}
return event, nil
}

View file

@ -778,11 +778,18 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
} else if !allowRestrictedJoins {
return nil
}
// Start off by populating the "resident" flag in the response. If we
// come across any rooms in the request that are missing, we will unset
// the flag.
res.Resident = true
// Get the join rules to work out if the join rule is "restricted".
joinRulesEvent, err := r.DB.GetStateEvent(ctx, req.RoomID, gomatrixserverlib.MRoomJoinRules, "")
if err != nil {
return fmt.Errorf("r.DB.GetStateEvent: %w", err)
}
if joinRulesEvent == nil {
return nil
}
var joinRules gomatrixserverlib.JoinRuleContent
if err = json.Unmarshal(joinRulesEvent.Content(), &joinRules); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
@ -792,10 +799,6 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
if !res.Restricted {
return nil
}
// Start off by populating the "resident" flag in the response. If we
// come across any rooms in the request that are missing, we will unset
// the flag.
res.Resident = true
// If the user is already invited to the room then the join is allowed
// but we don't specify an authorised via user, since the event auth
// will allow the join anyway.

View file

@ -20,9 +20,11 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/matrix-org/dendrite/roomserver/types"
@ -39,6 +41,7 @@ type StateResolutionStorage interface {
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
}
type StateResolution struct {
@ -61,6 +64,9 @@ func NewStateResolution(db StateResolutionStorage, roomInfo *types.RoomInfo) Sta
func (v *StateResolution) LoadStateAtSnapshot(
ctx context.Context, stateNID types.StateSnapshotNID,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtSnapshot")
defer span.Finish()
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
if err != nil {
return nil, err
@ -99,6 +105,9 @@ func (v *StateResolution) LoadStateAtSnapshot(
func (v *StateResolution) LoadStateAtEvent(
ctx context.Context, eventID string,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtEvent")
defer span.Finish()
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
if err != nil {
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %s", eventID, err)
@ -121,6 +130,9 @@ func (v *StateResolution) LoadStateAtEvent(
func (v *StateResolution) LoadCombinedStateAfterEvents(
ctx context.Context, prevStates []types.StateAtEvent,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadCombinedStateAfterEvents")
defer span.Finish()
stateNIDs := make([]types.StateSnapshotNID, len(prevStates))
for i, state := range prevStates {
stateNIDs[i] = state.BeforeStateSnapshotNID
@ -193,6 +205,9 @@ func (v *StateResolution) LoadCombinedStateAfterEvents(
func (v *StateResolution) DifferenceBetweeenStateSnapshots(
ctx context.Context, oldStateNID, newStateNID types.StateSnapshotNID,
) (removed, added []types.StateEntry, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.DifferenceBetweeenStateSnapshots")
defer span.Finish()
if oldStateNID == newStateNID {
// If the snapshot NIDs are the same then nothing has changed
return nil, nil, nil
@ -254,6 +269,9 @@ func (v *StateResolution) LoadStateAtSnapshotForStringTuples(
stateNID types.StateSnapshotNID,
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtSnapshotForStringTuples")
defer span.Finish()
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
if err != nil {
return nil, err
@ -268,6 +286,9 @@ func (v *StateResolution) stringTuplesToNumericTuples(
ctx context.Context,
stringTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateKeyTuple, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.stringTuplesToNumericTuples")
defer span.Finish()
eventTypes := make([]string, len(stringTuples))
stateKeys := make([]string, len(stringTuples))
for i := range stringTuples {
@ -310,6 +331,9 @@ func (v *StateResolution) loadStateAtSnapshotForNumericTuples(
stateNID types.StateSnapshotNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateAtSnapshotForNumericTuples")
defer span.Finish()
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
if err != nil {
return nil, err
@ -358,6 +382,9 @@ func (v *StateResolution) LoadStateAfterEventsForStringTuples(
prevStates []types.StateAtEvent,
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAfterEventsForStringTuples")
defer span.Finish()
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
if err != nil {
return nil, err
@ -370,6 +397,9 @@ func (v *StateResolution) loadStateAfterEventsForNumericTuples(
prevStates []types.StateAtEvent,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateAfterEventsForNumericTuples")
defer span.Finish()
if len(prevStates) == 1 {
// Fast path for a single event.
prevState := prevStates[0]
@ -542,6 +572,9 @@ func (v *StateResolution) CalculateAndStoreStateBeforeEvent(
event *gomatrixserverlib.Event,
isRejected bool,
) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.CalculateAndStoreStateBeforeEvent")
defer span.Finish()
// Load the state at the prev events.
prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs())
if err != nil {
@ -558,6 +591,9 @@ func (v *StateResolution) CalculateAndStoreStateAfterEvents(
ctx context.Context,
prevStates []types.StateAtEvent,
) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.CalculateAndStoreStateAfterEvents")
defer span.Finish()
metrics := calculateStateMetrics{startTime: time.Now(), prevEventLength: len(prevStates)}
if len(prevStates) == 0 {
@ -630,6 +666,9 @@ func (v *StateResolution) calculateAndStoreStateAfterManyEvents(
prevStates []types.StateAtEvent,
metrics calculateStateMetrics,
) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.calculateAndStoreStateAfterManyEvents")
defer span.Finish()
state, algorithm, conflictLength, err :=
v.calculateStateAfterManyEvents(ctx, v.roomInfo.RoomVersion, prevStates)
metrics.algorithm = algorithm
@ -648,6 +687,9 @@ func (v *StateResolution) calculateStateAfterManyEvents(
ctx context.Context, roomVersion gomatrixserverlib.RoomVersion,
prevStates []types.StateAtEvent,
) (state []types.StateEntry, algorithm string, conflictLength int, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.calculateStateAfterManyEvents")
defer span.Finish()
var combined []types.StateEntry
// Conflict resolution.
// First stage: load the state after each of the prev events.
@ -659,15 +701,13 @@ func (v *StateResolution) calculateStateAfterManyEvents(
}
// Collect all the entries with the same type and key together.
// We don't care about the order here because the conflict resolution
// algorithm doesn't depend on the order of the prev events.
// Remove duplicate entires.
// This is done so findDuplicateStateKeys can work in groups.
// We remove duplicates (same type, state key and event NID) too.
combined = combined[:util.SortAndUnique(stateEntrySorter(combined))]
// Find the conflicts
conflicts := findDuplicateStateKeys(combined)
if len(conflicts) > 0 {
if conflicts := findDuplicateStateKeys(combined); len(conflicts) > 0 {
conflictMap := stateEntryMap(conflicts)
conflictLength = len(conflicts)
// 5) There are conflicting state events, for each conflict workout
@ -676,7 +716,7 @@ func (v *StateResolution) calculateStateAfterManyEvents(
// Work out which entries aren't conflicted.
var notConflicted []types.StateEntry
for _, entry := range combined {
if _, ok := stateEntryMap(conflicts).lookup(entry.StateKeyTuple); !ok {
if _, ok := conflictMap.lookup(entry.StateKeyTuple); !ok {
notConflicted = append(notConflicted, entry)
}
}
@ -689,7 +729,7 @@ func (v *StateResolution) calculateStateAfterManyEvents(
return
}
algorithm = "full_state_with_conflicts"
state = resolved[:util.SortAndUnique(stateEntrySorter(resolved))]
state = resolved
} else {
algorithm = "full_state_no_conflicts"
// 6) There weren't any conflicts
@ -702,6 +742,9 @@ func (v *StateResolution) resolveConflicts(
ctx context.Context, version gomatrixserverlib.RoomVersion,
notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflicts")
defer span.Finish()
stateResAlgo, err := version.StateResAlgorithm()
if err != nil {
return nil, err
@ -726,6 +769,8 @@ func (v *StateResolution) resolveConflictsV1(
ctx context.Context,
notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflictsV1")
defer span.Finish()
// Load the conflicted events
conflictedEvents, eventIDMap, err := v.loadStateEvents(ctx, conflicted)
@ -789,6 +834,9 @@ func (v *StateResolution) resolveConflictsV2(
ctx context.Context,
notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflictsV2")
defer span.Finish()
estimate := len(conflicted) + len(notConflicted)
eventIDMap := make(map[string]types.StateEntry, estimate)
@ -816,51 +864,47 @@ func (v *StateResolution) resolveConflictsV2(
authEvents := make([]*gomatrixserverlib.Event, 0, estimate*3)
gotAuthEvents := make(map[string]struct{}, estimate*3)
authDifference := make([]*gomatrixserverlib.Event, 0, estimate)
knownAuthEvents := make(map[string]types.Event, estimate*3)
// For each conflicted event, let's try and get the needed auth events.
neededStateKeys := make([]string, 16)
authEntries := make([]types.StateEntry, 16)
for _, conflictedEvent := range conflictedEvents {
// Work out which auth events we need to load.
key := conflictedEvent.EventID()
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{conflictedEvent})
if err = func() error {
span, sctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadAuthEvents")
defer span.Finish()
// Find the numeric IDs for the necessary state keys.
neededStateKeys = neededStateKeys[:0]
neededStateKeys = append(neededStateKeys, needed.Member...)
neededStateKeys = append(neededStateKeys, needed.ThirdPartyInvite...)
stateKeyNIDMap, err := v.db.EventStateKeyNIDs(ctx, neededStateKeys)
if err != nil {
return nil, err
loader := authEventLoader{
v: v,
lookupFromDB: make([]string, 0, len(conflictedEvents)*3),
lookupFromMem: make([]string, 0, len(conflictedEvents)*3),
lookedUpEvents: make([]types.Event, 0, len(conflictedEvents)*3),
eventMap: map[string]types.Event{},
}
for _, conflictedEvent := range conflictedEvents {
// Work out which auth events we need to load.
key := conflictedEvent.EventID()
// Load the necessary auth events.
tuplesNeeded := v.stateKeyTuplesNeeded(stateKeyNIDMap, needed)
authEntries = authEntries[:0]
for _, tuple := range tuplesNeeded {
if eventNID, ok := stateEntryMap(notConflicted).lookup(tuple); ok {
authEntries = append(authEntries, types.StateEntry{
StateKeyTuple: tuple,
EventNID: eventNID,
})
}
}
// Store the newly found auth events in the auth set for this event.
authSets[key], _, err = v.loadStateEvents(ctx, authEntries)
if err != nil {
return nil, err
}
// Only add auth events into the authEvents slice once, otherwise the
// check for the auth difference can become expensive and produce
// duplicate entries, which just waste memory and CPU time.
for _, event := range authSets[key] {
if _, ok := gotAuthEvents[event.EventID()]; !ok {
authEvents = append(authEvents, event)
gotAuthEvents[event.EventID()] = struct{}{}
// Store the newly found auth events in the auth set for this event.
var authEventMap map[string]types.StateEntry
authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, conflictedEvent, knownAuthEvents)
if err != nil {
return err
}
for k, v := range authEventMap {
eventIDMap[k] = v
}
// Only add auth events into the authEvents slice once, otherwise the
// check for the auth difference can become expensive and produce
// duplicate entries, which just waste memory and CPU time.
for _, event := range authSets[key] {
if _, ok := gotAuthEvents[event.EventID()]; !ok {
authEvents = append(authEvents, event)
gotAuthEvents[event.EventID()] = struct{}{}
}
}
}
return nil
}(); err != nil {
return nil, err
}
// Kill the reference to this so that the GC may pick it up, since we no
@ -891,25 +935,35 @@ func (v *StateResolution) resolveConflictsV2(
// Look through all of the auth events that we've been given and work out if
// there are any events which don't appear in all of the auth sets. If they
// don't then we add them to the auth difference.
for _, event := range authEvents {
if !isInAllAuthLists(event) {
authDifference = append(authDifference, event)
func() {
span, _ := opentracing.StartSpanFromContext(ctx, "isInAllAuthLists")
defer span.Finish()
for _, event := range authEvents {
if !isInAllAuthLists(event) {
authDifference = append(authDifference, event)
}
}
}
}()
// Resolve the conflicts.
resolvedEvents := gomatrixserverlib.ResolveStateConflictsV2(
conflictedEvents,
nonConflictedEvents,
authEvents,
authDifference,
)
resolvedEvents := func() []*gomatrixserverlib.Event {
span, _ := opentracing.StartSpanFromContext(ctx, "gomatrixserverlib.ResolveStateConflictsV2")
defer span.Finish()
return gomatrixserverlib.ResolveStateConflictsV2(
conflictedEvents,
nonConflictedEvents,
authEvents,
authDifference,
)
}()
// Map from the full events back to numeric state entries.
for _, resolvedEvent := range resolvedEvents {
entry, ok := eventIDMap[resolvedEvent.EventID()]
if !ok {
panic(fmt.Errorf("missing state entry for event ID %q", resolvedEvent.EventID()))
return nil, fmt.Errorf("missing state entry for event ID %q", resolvedEvent.EventID())
}
notConflicted = append(notConflicted, entry)
}
@ -968,6 +1022,9 @@ func (v *StateResolution) stateKeyTuplesNeeded(stateKeyNIDMap map[string]types.E
func (v *StateResolution) loadStateEvents(
ctx context.Context, entries []types.StateEntry,
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateEvents")
defer span.Finish()
result := make([]*gomatrixserverlib.Event, 0, len(entries))
eventEntries := make([]types.StateEntry, 0, len(entries))
eventNIDs := make([]types.EventNID, 0, len(entries))
@ -996,6 +1053,127 @@ func (v *StateResolution) loadStateEvents(
return result, eventIDMap, nil
}
type authEventLoader struct {
sync.Mutex
v *StateResolution
lookupFromDB []string // scratch space
lookupFromMem []string // scratch space
lookedUpEvents []types.Event // scratch space
eventMap map[string]types.Event
}
// loadAuthEvents loads all of the auth events for a given event recursively,
// along with a map that contains state entries for all of the auth events.
func (l *authEventLoader) loadAuthEvents(
ctx context.Context, event *gomatrixserverlib.Event, eventMap map[string]types.Event,
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
l.Lock()
defer l.Unlock()
authEvents := []types.Event{} // our returned list
included := map[string]struct{}{} // dedupes authEvents above
queue := event.AuthEventIDs()
for i := 0; i < len(queue); i++ {
// Reuse the same underlying memory, since it reduces the
// amount of allocations we make the more times we call
// loadAuthEvents.
l.lookupFromDB = l.lookupFromDB[:0]
l.lookupFromMem = l.lookupFromMem[:0]
l.lookedUpEvents = l.lookedUpEvents[:0]
// Separate out the list of events in the queue based on if
// we think we already know the event in memory or not.
for _, authEventID := range queue {
if _, ok := included[authEventID]; ok {
continue
}
if _, ok := eventMap[authEventID]; ok {
l.lookupFromMem = append(l.lookupFromMem, authEventID)
} else {
l.lookupFromDB = append(l.lookupFromDB, authEventID)
}
}
// If there's nothing to do, stop here.
if len(l.lookupFromDB) == 0 && len(l.lookupFromMem) == 0 {
break
}
// If we need to get events from the database, go and fetch
// those now.
if len(l.lookupFromDB) > 0 {
eventsFromDB, err := l.v.db.EventsFromIDs(ctx, l.lookupFromDB)
if err != nil {
return nil, nil, fmt.Errorf("v.db.EventsFromIDs: %w", err)
}
l.lookedUpEvents = append(l.lookedUpEvents, eventsFromDB...)
for _, event := range eventsFromDB {
eventMap[event.EventID()] = event
}
}
// Fill in the gaps with events that we already have in memory.
if len(l.lookupFromMem) > 0 {
for _, eventID := range l.lookupFromMem {
l.lookedUpEvents = append(l.lookedUpEvents, eventMap[eventID])
}
}
// From the events that we've retrieved, work out which auth
// events to look up on the next iteration.
add := map[string]struct{}{}
for _, event := range l.lookedUpEvents {
authEvents = append(authEvents, event)
included[event.EventID()] = struct{}{}
for _, authEventID := range event.AuthEventIDs() {
if _, ok := included[authEventID]; ok {
continue
}
add[authEventID] = struct{}{}
}
}
for authEventID := range add {
queue = append(queue, authEventID)
}
}
authEventTypes := map[string]struct{}{}
authEventStateKeys := map[string]struct{}{}
for _, authEvent := range authEvents {
authEventTypes[authEvent.Type()] = struct{}{}
authEventStateKeys[*authEvent.StateKey()] = struct{}{}
}
lookupAuthEventTypes := make([]string, 0, len(authEventTypes))
lookupAuthEventStateKeys := make([]string, 0, len(authEventStateKeys))
for eventType := range authEventTypes {
lookupAuthEventTypes = append(lookupAuthEventTypes, eventType)
}
for eventStateKey := range authEventStateKeys {
lookupAuthEventStateKeys = append(lookupAuthEventStateKeys, eventStateKey)
}
eventTypes, err := l.v.db.EventTypeNIDs(ctx, lookupAuthEventTypes)
if err != nil {
return nil, nil, fmt.Errorf("v.db.EventTypeNIDs: %w", err)
}
eventStateKeys, err := l.v.db.EventStateKeyNIDs(ctx, lookupAuthEventStateKeys)
if err != nil {
return nil, nil, fmt.Errorf("v.db.EventStateKeyNIDs: %w", err)
}
stateEntryMap := map[string]types.StateEntry{}
for _, authEvent := range authEvents {
stateEntryMap[authEvent.EventID()] = types.StateEntry{
EventNID: authEvent.EventNID,
StateKeyTuple: types.StateKeyTuple{
EventTypeNID: eventTypes[authEvent.Type()],
EventStateKeyNID: eventStateKeys[*authEvent.StateKey()],
},
}
}
nakedEvents := make([]*gomatrixserverlib.Event, 0, len(authEvents))
for _, authEvent := range authEvents {
nakedEvents = append(nakedEvents, authEvent.Event)
}
return nakedEvents, stateEntryMap, nil
}
// findDuplicateStateKeys finds the state entries where the state key tuple appears more than once in a sorted list.
// Returns a sorted list of those state entries.
func findDuplicateStateKeys(a []types.StateEntry) []types.StateEntry {

View file

@ -192,6 +192,10 @@ func (u *RoomUpdater) StateAtEventIDs(
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
}
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, false)
}
func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true)
}

View file

@ -823,13 +823,39 @@ func (d *Database) handleRedactions(
return nil, "", nil
}
// Get the power level from the database, so we can verify the user is allowed to redact the event
powerLevels, err := d.GetStateEvent(ctx, event.RoomID(), gomatrixserverlib.MRoomPowerLevels, "")
if err != nil {
return nil, "", fmt.Errorf("d.GetStateEvent: %w", err)
}
pl, err := powerLevels.PowerLevels()
if err != nil {
return nil, "", fmt.Errorf("unable to get powerlevels for room: %w", err)
}
redactUser := pl.UserLevel(redactionEvent.Sender())
switch {
case redactUser >= pl.Redact:
// The power level of the redaction events sender is greater than or equal to the redact level.
case redactedEvent.Origin() == redactionEvent.Origin() && redactedEvent.Sender() == redactionEvent.Sender():
// The domain of the redaction events sender matches that of the original events sender.
default:
return nil, "", nil
}
// mark the event as redacted
if redactionsArePermanent {
redactedEvent.Event = redactedEvent.Redact()
}
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
if err != nil {
return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err)
}
if redactionsArePermanent {
redactedEvent.Event = redactedEvent.Redact()
// NOTSPEC: sytest relies on this unspecced field existing :(
err = redactedEvent.SetUnsignedField("redacted_by", redactionEvent.EventID())
if err != nil {
return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err)
}
// overwrite the eventJSON table
err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON())

View file

@ -15,8 +15,21 @@ func platformSanityChecks() {
// If we run out of file descriptors, we might run into problems accessing
// PostgreSQL amongst other things. Complain at startup if we think the
// number of file descriptors is too low.
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 {
warn := func(rLimit *syscall.Rlimit) {
logrus.Warnf("IMPORTANT: Process file descriptor limit is currently %d, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues", rLimit.Cur)
}
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 {
// The file descriptor count is too low. Let's try to raise it.
rLimit.Cur = 65535
if err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
// We failed to raise it, so log an error.
logrus.WithError(err).Warn("IMPORTANT: Failed to raise the file descriptor limit")
warn(&rLimit)
} else if err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 {
// We think we successfully raised the limit, but a second call to
// get the limit told us that we didn't succeed. Log an error.
warn(&rLimit)
}
}
}

View file

@ -134,6 +134,10 @@ type RateLimiting struct {
// The cooloff period in milliseconds after a request before the "slot"
// is freed again
CooloffMS int64 `yaml:"cooloff_ms"`
// A list of users that are exempt from rate limiting, i.e. if you want
// to run Mjolnir or other bots.
ExemptUserIDs []string `yaml:"exempt_user_ids"`
}
func (r *RateLimiting) Verify(configErrs *ConfigErrors) {

View file

@ -97,7 +97,7 @@ func Context(
state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
// verify the user is allowed to see the context for this room/event
for _, x := range state {
var hisVis string
var hisVis gomatrixserverlib.HistoryVisibility
hisVis, err = x.HistoryVisibility()
if err != nil {
continue

View file

@ -715,4 +715,9 @@ Presence can be set from sync
PUT /rooms/:room_id/redact/:event_id/:txn_id is idempotent
Unnamed room comes with a name summary
Named room comes with just joined member count summary
Room summary only has 5 heroes
Room summary only has 5 heroes
registration is idempotent, with username specified
Setting state twice is idempotent
Joining room twice is idempotent
Inbound federation can return missing events for shared visibility
Inbound federation ignores redactions from invalid servers room > v3