Merge branch 'main' into s7evink/idempotentregister

This commit is contained in:
Neil Alexander 2022-06-09 11:01:53 +01:00 committed by GitHub
commit 242051aedd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 715 additions and 1059 deletions

View file

@ -1,5 +1,36 @@
# Changelog
## Dendrite 0.8.7 (2022-06-01)
### Features
* Support added for room version 10
### Fixes
* A number of state handling bugs have been fixed, which previously resulted in missing state events, unexpected state deletions, reverted memberships and unexpectedly rejected/soft-failed events in some specific cases
* Fixed destination queue performance issues as a result of missing indexes, which speeds up outbound federation considerably
* A bug which could cause the `/register` endpoint to return HTTP 500 has been fixed
## Dendrite 0.8.6 (2022-05-26)
### Features
* Room versions 8 and 9 are now marked as stable
* Dendrite can now assist remote users to join restricted rooms via `/make_join` and `/send_join`
### Fixes
* The sync API no longer returns immediately on `/sync` requests unnecessarily if it can be avoided
* A race condition has been fixed in the sync API when updating presence via `/sync`
* A race condition has been fixed sending E2EE keys to remote servers over federation when joining rooms
* The `trusted_private_chat` preset should now grant power level 100 to all participant users, which should improve the user experience of direct messages
* Invited users are now authed correctly in restricted rooms
* The `join_authorised_by_users_server` key is now correctly stripped in restricted rooms when updating the membership event
* Appservices should now receive invite events correctly
* Device list updates should no longer contain optional fields with `null` values
* The `/deactivate` endpoint has been fixed to no longer confuse Element with incorrect completed flows
## Dendrite 0.8.5 (2022-05-13)
### Features

View file

@ -96,10 +96,9 @@ than features that massive deployments may be interested in (User Directory, Ope
This means Dendrite supports amongst others:
- Core room functionality (creating rooms, invites, auth rules)
- Full support for room versions 1 to 7
- Experimental support for room versions 8 to 9
- Room versions 1 to 10 supported
- Backfilling locally and via federation
- Accounts, Profiles and Devices
- Accounts, profiles and devices
- Published room lists
- Typing
- Media APIs

View file

@ -261,7 +261,7 @@ func (m *DendriteMonolith) Start() {
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
cfg.ClientAPI.RegistrationDisabled = false
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
if err := cfg.Derive(); err != nil {
if err = cfg.Derive(); err != nil {
panic(err)
}
@ -342,11 +342,23 @@ func (m *DendriteMonolith) Start() {
go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName)
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")))
switch m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")) {
case net.ErrClosed, http.ErrServerClosed:
m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
default:
m.logger.Fatal(err)
}
}()
go func() {
logrus.Info("Listening on ", m.listener.Addr())
logrus.Fatal(http.Serve(m.listener, httpRouter))
switch http.Serve(m.listener, httpRouter) {
case net.ErrClosed, http.ErrServerClosed:
m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
default:
m.logger.Fatal(err)
}
}()
}

View file

@ -170,11 +170,11 @@ func (m *DendriteMonolith) Start() {
go func() {
m.logger.Info("Listening on ", ygg.DerivedServerName())
m.logger.Fatal(m.httpServer.Serve(ygg))
m.logger.Error(m.httpServer.Serve(ygg))
}()
go func() {
logrus.Info("Listening on ", m.listener.Addr())
logrus.Fatal(http.Serve(m.listener, httpRouter))
logrus.Error(http.Serve(m.listener, httpRouter))
}()
go func() {
logrus.Info("Sending wake-up message to known nodes")

View file

@ -89,6 +89,9 @@ func Setup(
"r0.4.0",
"r0.5.0",
"r0.6.1",
"v1.0",
"v1.1",
"v1.2",
}, UnstableFeatures: unstableFeatures},
}
}),
@ -137,7 +140,7 @@ func Setup(
synapseAdminRouter.Handle("/admin/v1/send_server_notice/{txnID}",
httputil.MakeAuthAPI("send_server_notice", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
// not specced, but ensure we're rate limiting requests to this endpoint
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -157,7 +160,7 @@ func Setup(
synapseAdminRouter.Handle("/admin/v1/send_server_notice",
httputil.MakeAuthAPI("send_server_notice", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
// not specced, but ensure we're rate limiting requests to this endpoint
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return SendServerNotice(
@ -187,7 +190,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/join/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -203,7 +206,7 @@ func Setup(
if mscCfg.Enabled("msc2753") {
v3mux.Handle("/peek/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Peek, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -223,7 +226,7 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/join",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -237,7 +240,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/leave",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -271,7 +274,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/invite",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -389,14 +392,14 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/register", httputil.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
return *r
}
return Register(req, userAPI, cfg)
})).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/register/available", httputil.MakeExternalAPI("registerAvailable", func(req *http.Request) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
return *r
}
return RegisterAvailable(req, cfg, userAPI)
@ -470,7 +473,7 @@ func Setup(
v3mux.Handle("/rooms/{roomID}/typing/{userID}",
httputil.MakeAuthAPI("rooms_typing", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -527,7 +530,7 @@ func Setup(
v3mux.Handle("/account/whoami",
httputil.MakeAuthAPI("whoami", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return Whoami(req, device)
@ -536,7 +539,7 @@ func Setup(
v3mux.Handle("/account/password",
httputil.MakeAuthAPI("password", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return Password(req, userAPI, device, cfg)
@ -545,7 +548,7 @@ func Setup(
v3mux.Handle("/account/deactivate",
httputil.MakeAuthAPI("deactivate", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return Deactivate(req, userInteractiveAuth, userAPI, device)
@ -556,7 +559,7 @@ func Setup(
v3mux.Handle("/login",
httputil.MakeExternalAPI("login", func(req *http.Request) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
return *r
}
return Login(req, userAPI, cfg)
@ -664,7 +667,7 @@ func Setup(
v3mux.Handle("/pushrules/{scope}/{kind}/{ruleID}",
httputil.MakeAuthAPI("push_rules", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -730,7 +733,7 @@ func Setup(
v3mux.Handle("/profile/{userID}/avatar_url",
httputil.MakeAuthAPI("profile_avatar_url", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -755,7 +758,7 @@ func Setup(
v3mux.Handle("/profile/{userID}/displayname",
httputil.MakeAuthAPI("profile_displayname", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -794,7 +797,7 @@ func Setup(
v3mux.Handle("/voip/turnServer",
httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return RequestTurnServer(req, device, cfg)
@ -873,7 +876,7 @@ func Setup(
v3mux.Handle("/user/{userID}/openid/request_token",
httputil.MakeAuthAPI("openid_request_token", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -886,7 +889,7 @@ func Setup(
v3mux.Handle("/user_directory/search",
httputil.MakeAuthAPI("userdirectory_search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
postContent := struct {
@ -932,7 +935,7 @@ func Setup(
v3mux.Handle("/rooms/{roomID}/read_markers",
httputil.MakeAuthAPI("rooms_read_markers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -945,7 +948,7 @@ func Setup(
v3mux.Handle("/rooms/{roomID}/forget",
httputil.MakeAuthAPI("rooms_forget", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -1022,7 +1025,7 @@ func Setup(
v3mux.Handle("/pushers/set",
httputil.MakeAuthAPI("set_pushers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return SetPusher(req, device, userAPI)
@ -1080,7 +1083,7 @@ func Setup(
v3mux.Handle("/capabilities",
httputil.MakeAuthAPI("capabilities", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
return GetCapabilities(req, rsAPI)
@ -1296,7 +1299,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/rooms/{roomId}/receipt/{receiptType}/{eventId}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))

View file

@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"reflect"
"sync"
"time"
@ -96,14 +97,21 @@ func SendEvent(
mutex.(*sync.Mutex).Lock()
defer mutex.(*sync.Mutex).Unlock()
startedGeneratingEvent := time.Now()
var r map[string]interface{} // must be a JSON object
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
}
if stateKey != nil {
// If the existing/new state content are equal, return the existing event_id, making the request idempotent.
if resp := stateEqual(req.Context(), rsAPI, eventType, *stateKey, roomID, r); resp != nil {
return *resp
}
}
startedGeneratingEvent := time.Now()
// If we're sending a membership update, make sure to strip the authorised
// via key if it is present, otherwise other servers won't be able to auth
// the event if the room is set to the "restricted" join rule.
@ -208,6 +216,37 @@ func SendEvent(
return res
}
// stateEqual compares the new and the existing state event content. If they are equal, returns a *util.JSONResponse
// with the existing event_id, making this an idempotent request.
func stateEqual(ctx context.Context, rsAPI api.ClientRoomserverAPI, eventType, stateKey, roomID string, newContent map[string]interface{}) *util.JSONResponse {
stateRes := api.QueryCurrentStateResponse{}
tuple := gomatrixserverlib.StateKeyTuple{
EventType: eventType,
StateKey: stateKey,
}
err := rsAPI.QueryCurrentState(ctx, &api.QueryCurrentStateRequest{
RoomID: roomID,
StateTuples: []gomatrixserverlib.StateKeyTuple{tuple},
}, &stateRes)
if err != nil {
return nil
}
if existingEvent, ok := stateRes.StateEvents[tuple]; ok {
var existingContent map[string]interface{}
if err = json.Unmarshal(existingEvent.Content(), &existingContent); err != nil {
return nil
}
if reflect.DeepEqual(existingContent, newContent) {
return &util.JSONResponse{
Code: http.StatusOK,
JSON: sendEventResponse{existingEvent.EventID()},
}
}
}
return nil
}
func generateSendEvent(
ctx context.Context,
r map[string]interface{},

View file

@ -4,13 +4,17 @@ import (
"context"
"flag"
"fmt"
"os"
"sort"
"strconv"
"strings"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
)
@ -23,11 +27,17 @@ import (
// e.g. ./resolve-state --roomversion=5 1254 1235 1282
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
var filterType = flag.String("filtertype", "", "the event types to filter on")
func main() {
ctx := context.Background()
cfg := setup.ParseFlags(true)
args := os.Args[1:]
cfg.Logging = append(cfg.Logging[:0], config.LogrusHook{
Type: "std",
Level: "error",
})
base := base.NewBaseDendrite(cfg, "ResolveState", base.DisableMetrics)
args := flag.Args()
fmt.Println("Room version", *roomVersion)
@ -45,30 +55,28 @@ func main() {
panic(err)
}
roomserverDB, err := storage.Open(nil, &cfg.RoomServer.Database, cache)
roomserverDB, err := storage.Open(base, &cfg.RoomServer.Database, cache)
if err != nil {
panic(err)
}
blockNIDs, err := roomserverDB.StateBlockNIDs(ctx, snapshotNIDs)
if err != nil {
panic(err)
}
stateres := state.NewStateResolution(roomserverDB, &types.RoomInfo{
RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion),
})
var stateEntries []types.StateEntryList
for _, list := range blockNIDs {
entries, err2 := roomserverDB.StateEntries(ctx, list.StateBlockNIDs)
if err2 != nil {
panic(err2)
var stateEntries []types.StateEntry
for _, snapshotNID := range snapshotNIDs {
var entries []types.StateEntry
entries, err = stateres.LoadStateAtSnapshot(ctx, snapshotNID)
if err != nil {
panic(err)
}
stateEntries = append(stateEntries, entries...)
}
var eventNIDs []types.EventNID
for _, entry := range stateEntries {
for _, e := range entry.StateEntries {
eventNIDs = append(eventNIDs, e.EventNID)
}
eventNIDs = append(eventNIDs, entry.EventNID)
}
fmt.Println("Fetching", len(eventNIDs), "state events")
@ -103,7 +111,8 @@ func main() {
}
fmt.Println("Resolving state")
resolved, err := gomatrixserverlib.ResolveConflicts(
var resolved Events
resolved, err = gomatrixserverlib.ResolveConflicts(
gomatrixserverlib.RoomVersion(*roomVersion),
events,
authEvents,
@ -113,9 +122,41 @@ func main() {
}
fmt.Println("Resolved state contains", len(resolved), "events")
sort.Sort(resolved)
filteringEventType := *filterType
count := 0
for _, event := range resolved {
if filteringEventType != "" && event.Type() != filteringEventType {
continue
}
count++
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
fmt.Println()
fmt.Println("Returned", count, "state events after filtering")
}
type Events []*gomatrixserverlib.Event
func (e Events) Len() int {
return len(e)
}
func (e Events) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
func (e Events) Less(i, j int) bool {
typeDelta := strings.Compare(e[i].Type(), e[j].Type())
if typeDelta < 0 {
return true
}
if typeDelta > 0 {
return false
}
stateKeyDelta := strings.Compare(*e[i].StateKey(), *e[j].StateKey())
return stateKeyDelta < 0
}

View file

@ -160,11 +160,14 @@ client_api:
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot"
# will be released after the cooloff time in milliseconds.
# will be released after the cooloff time in milliseconds. Server administrators
# and appservice users are exempt from rate limiting by default.
rate_limiting:
enabled: true
threshold: 5
cooloff_ms: 500
exempt_user_ids:
# - "@user:domain.com"
# Configuration for the Federation API.
federation_api:

View file

@ -163,11 +163,14 @@ client_api:
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot"
# will be released after the cooloff time in milliseconds.
# will be released after the cooloff time in milliseconds. Server administrators
# and appservice users are exempt from rate limiting by default.
rate_limiting:
enabled: true
threshold: 5
cooloff_ms: 500
exempt_user_ids:
# - "@user:domain.com"
# Configuration for the Federation API.
federation_api:

View file

@ -9,21 +9,19 @@ permalink: /installation/planning
## Modes
Dendrite can be run in one of two configurations:
Dendrite consists of several components, each responsible for a different aspect of the Matrix protocol.
Users can run Dendrite in one of two modes which dictate how these components are executed and communicate.
* **Monolith mode**: All components run in the same process. In this mode,
it is possible to run an in-process NATS Server instead of running a standalone deployment.
This will usually be the preferred model for low-to-mid volume deployments, providing the best
balance between performance and resource usage.
* **Monolith mode** runs all components in a single process. Components communicate through an internal NATS
server with generally low overhead. This mode dramatically simplifies deployment complexity and offers the
best balance between performance and resource usage for low-to-mid volume deployments.
* **Polylith mode**: A cluster of individual components running in their own processes, dealing
with different aspects of the Matrix protocol. Components communicate with each other using
internal HTTP APIs and NATS Server. This will almost certainly be the preferred model for very
large deployments but scalability comes with a cost. API calls are expensive and therefore a
polylith deployment may end up using disproportionately more resources for a smaller number of
users compared to a monolith deployment.
* **Polylith mode** runs all components in isolated processes. Components communicate through an external NATS
server and HTTP APIs, which incur considerable overhead. While this mode allows for more granular control of
resources dedicated toward individual processes, given the additional communications overhead, it is only
necessary for very large deployments.
At present, we **recommend monolith mode deployments** in all cases.
Given our current state of development, **we recommend monolith mode** for all deployments.
## Databases

View file

@ -213,10 +213,14 @@ func (r *FederationInternalAPI) performJoinUsingServer(
// If the remote server returned an event in the "event" key of
// the send_join request then we should use that instead. It may
// contain signatures that we don't know about.
if respSendJoin.Event != nil && isWellFormedMembershipEvent(
respSendJoin.Event, roomID, userID, r.cfg.Matrix.ServerName,
) {
event = respSendJoin.Event
if len(respSendJoin.Event) > 0 {
var remoteEvent *gomatrixserverlib.Event
remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion)
if err == nil && isWellFormedMembershipEvent(
remoteEvent, roomID, userID, r.cfg.Matrix.ServerName,
) {
event = remoteEvent
}
}
// Sanity-check the join response to ensure that it has a create

View file

@ -403,7 +403,7 @@ func SendJoin(
StateEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(stateAndAuthChainResponse.StateEvents),
AuthEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(stateAndAuthChainResponse.AuthChainEvents),
Origin: cfg.Matrix.ServerName,
Event: &signed,
Event: signed.JSON(),
},
}
}
@ -425,7 +425,7 @@ func checkRestrictedJoin(
roomVersion gomatrixserverlib.RoomVersion,
roomID, userID string,
) (*util.JSONResponse, string, error) {
if allowRestricted, err := roomVersion.AllowRestrictedJoinsInEventAuth(); err != nil {
if allowRestricted, err := roomVersion.MayAllowRestrictedJoinsInEventAuth(); err != nil {
return nil, "", err
} else if !allowRestricted {
return nil, "", nil

View file

@ -36,6 +36,10 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus (
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx
ON federationsender_queue_edus (json_nid, server_name);
CREATE INDEX IF NOT EXISTS federationsender_queue_edus_nid_idx
ON federationsender_queue_edus (json_nid);
CREATE INDEX IF NOT EXISTS federationsender_queue_edus_server_name_idx
ON federationsender_queue_edus (server_name);
`
const insertQueueEDUSQL = "" +

View file

@ -33,6 +33,9 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_json (
-- The JSON body. Text so that we preserve UTF-8.
json_body TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_json_json_nid_idx
ON federationsender_queue_json (json_nid);
`
const insertJSONSQL = "" +

View file

@ -36,6 +36,10 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx
ON federationsender_queue_pdus (json_nid, server_name);
CREATE INDEX IF NOT EXISTS federationsender_queue_pdus_json_nid_idx
ON federationsender_queue_pdus (json_nid);
CREATE INDEX IF NOT EXISTS federationsender_queue_pdus_server_name_idx
ON federationsender_queue_pdus (server_name);
`
const insertQueuePDUSQL = "" +

View file

@ -37,6 +37,10 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus (
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx
ON federationsender_queue_edus (json_nid, server_name);
CREATE INDEX IF NOT EXISTS federationsender_queue_edus_nid_idx
ON federationsender_queue_edus (json_nid);
CREATE INDEX IF NOT EXISTS federationsender_queue_edus_server_name_idx
ON federationsender_queue_edus (server_name);
`
const insertQueueEDUSQL = "" +

View file

@ -35,6 +35,9 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_json (
-- The JSON body. Text so that we preserve UTF-8.
json_body TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_json_json_nid_idx
ON federationsender_queue_json (json_nid);
`
const insertJSONSQL = "" +

View file

@ -38,6 +38,10 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx
ON federationsender_queue_pdus (json_nid, server_name);
CREATE INDEX IF NOT EXISTS federationsender_queue_pdus_json_nid_idx
ON federationsender_queue_pdus (json_nid);
CREATE INDEX IF NOT EXISTS federationsender_queue_pdus_server_name_idx
ON federationsender_queue_pdus (server_name);
`
const insertQueuePDUSQL = "" +

47
go.mod
View file

@ -5,20 +5,24 @@ replace github.com/nats-io/nats-server/v2 => github.com/neilalexander/nats-serve
replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e
require (
github.com/Arceliar/ironwood v0.0.0-20211125050254-8951369625d0
github.com/Arceliar/ironwood v0.0.0-20220306165321-319147a02d98
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/MFAshby/stdemuxerhook v1.0.0
github.com/Masterminds/semver/v3 v3.1.1
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/codeclysm/extract v2.2.0+incompatible
github.com/containerd/containerd v1.6.2 // indirect
github.com/docker/docker v20.10.14+incompatible
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v20.10.16+incompatible
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0 // indirect
github.com/frankban/quicktest v1.14.3 // indirect
github.com/getsentry/sentry-go v0.13.0
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gologme/log v1.3.0
github.com/google/go-cmp v0.5.7
github.com/google/go-cmp v0.5.8
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
@ -30,38 +34,47 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220524100759-f98e737f8f9c
github.com/matrix-org/gomatrixserverlib v0.0.0-20220607143425-e55d796fd0b3
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.10
github.com/miekg/dns v1.1.31 // indirect
github.com/mattn/go-sqlite3 v1.14.13
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.49 // indirect
github.com/moby/term v0.0.0-20210610120745-9d4ed1856297 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/nats-io/nats-server/v2 v2.7.4-0.20220309205833-773636c1c5bb
github.com/nats-io/nats.go v1.14.0
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/ngrok/sqlmw v0.0.0-20211220175533-9d16fdc47b31
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79
github.com/onsi/gomega v1.17.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/pressly/goose v2.7.0+incompatible
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_golang v1.12.2
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.1
github.com/tidwall/gjson v1.14.1
github.com/tidwall/sjson v1.2.4
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.4.3
go.uber.org/atomic v1.9.0
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122
golang.org/x/image v0.0.0-20220321031419-a8550c1d254a
golang.org/x/mobile v0.0.0-20220407111146-e579adbbc4a2
golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
golang.org/x/mobile v0.0.0-20220518205345-8578da9835fd
golang.org/x/net v0.0.0-20220524220425-1d687d428aca
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/h2non/bimg.v1 v1.1.9
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0 // indirect
gotest.tools/v3 v3.0.3 // indirect
nhooyr.io/websocket v1.8.7
)

874
go.sum

File diff suppressed because it is too large Load diff

View file

@ -7,6 +7,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
@ -17,6 +18,7 @@ type RateLimits struct {
enabled bool
requestThreshold int64
cooloffDuration time.Duration
exemptUserIDs map[string]struct{}
}
func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
@ -25,6 +27,10 @@ func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
enabled: cfg.Enabled,
requestThreshold: cfg.Threshold,
cooloffDuration: time.Duration(cfg.CooloffMS) * time.Millisecond,
exemptUserIDs: map[string]struct{}{},
}
for _, userID := range cfg.ExemptUserIDs {
l.exemptUserIDs[userID] = struct{}{}
}
if l.enabled {
go l.clean()
@ -52,7 +58,7 @@ func (l *RateLimits) clean() {
}
}
func (l *RateLimits) Limit(req *http.Request) *util.JSONResponse {
func (l *RateLimits) Limit(req *http.Request, device *userapi.Device) *util.JSONResponse {
// If rate limiting is disabled then do nothing.
if !l.enabled {
return nil
@ -67,9 +73,26 @@ func (l *RateLimits) Limit(req *http.Request) *util.JSONResponse {
// First of all, work out if X-Forwarded-For was sent to us. If not
// then we'll just use the IP address of the caller.
caller := req.RemoteAddr
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
caller = forwardedFor
var caller string
if device != nil {
switch device.AccountType {
case userapi.AccountTypeAdmin:
return nil // don't rate-limit server administrators
case userapi.AccountTypeAppService:
return nil // don't rate-limit appservice users
default:
if _, ok := l.exemptUserIDs[device.UserID]; ok {
// If the user is exempt from rate limiting then do nothing.
return nil
}
caller = device.UserID + device.ID
}
} else {
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
caller = forwardedFor
} else {
caller = req.RemoteAddr
}
}
// Look up the caller's channel, if they have one.

View file

@ -37,7 +37,7 @@ type traceInterceptor struct {
sqlmw.NullInterceptor
}
func (in *traceInterceptor) StmtQueryContext(ctx context.Context, stmt driver.StmtQueryContext, query string, args []driver.NamedValue) (driver.Rows, error) {
func (in *traceInterceptor) StmtQueryContext(ctx context.Context, stmt driver.StmtQueryContext, query string, args []driver.NamedValue) (context.Context, driver.Rows, error) {
startedAt := time.Now()
rows, err := stmt.QueryContext(ctx, args)
@ -45,7 +45,7 @@ func (in *traceInterceptor) StmtQueryContext(ctx context.Context, stmt driver.St
logrus.WithField("duration", time.Since(startedAt)).WithField(logrus.ErrorKey, err).Debug("executed sql query ", query, " args: ", args)
return rows, err
return ctx, rows, err
}
func (in *traceInterceptor) StmtExecContext(ctx context.Context, stmt driver.StmtExecContext, query string, args []driver.NamedValue) (driver.Result, error) {

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 8
VersionPatch = 5
VersionPatch = 7
VersionTag = "" // example: "rc1"
)

View file

@ -374,7 +374,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
// fetch stale device lists
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
if err != nil {
logger.WithError(err).Error("failed to load stale device lists")
logger.WithError(err).Error("Failed to load stale device lists")
return waitTime, true
}
failCount := 0
@ -399,7 +399,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
}
} else {
waitTime = time.Hour
logger.WithError(err).WithField("user_id", userID).Warn("GetUserDevices returned unknown error type")
logger.WithError(err).WithField("user_id", userID).Debug("GetUserDevices returned unknown error type")
}
continue
}
@ -422,12 +422,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
}
err = u.updateDeviceList(&res)
if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
logger.WithError(err).WithField("user_id", userID).Error("Fetched device list but failed to store/emit it")
failCount += 1
}
}
if failCount > 0 {
logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Error("failed to query device keys for some users")
logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Warn("Failed to query device keys for some users")
}
for _, userID := range userIDs {
// always clear the channel to unblock Update calls regardless of success/failure

View file

@ -62,7 +62,7 @@ func Setup(
uploadHandler := httputil.MakeAuthAPI(
"upload", userAPI,
func(req *http.Request, dev *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, dev); r != nil {
return *r
}
return Upload(req, cfg, dev, db, activeThumbnailGeneration)
@ -70,7 +70,7 @@ func Setup(
)
configHandler := httputil.MakeAuthAPI("config", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
respondSize := &cfg.MaxFileSizeBytes
@ -126,7 +126,7 @@ func makeDownloadAPI(
// Ratelimit requests
// NOTSPEC: The spec says everything at /media/ should be rate limited, but this causes issues with thumbnails (#2243)
if name != "thumbnail" {
if r := rateLimits.Limit(req); r != nil {
if r := rateLimits.Limit(req, nil); r != nil {
if err := json.NewEncoder(w).Encode(r); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
@ -75,6 +76,11 @@ func (r *Inputer) processRoomEvent(
default:
}
span, ctx := opentracing.StartSpanFromContext(ctx, "processRoomEvent")
span.SetTag("room_id", input.Event.RoomID())
span.SetTag("event_id", input.Event.EventID())
defer span.Finish()
// Measure how long it takes to process this event.
started := time.Now()
defer func() {
@ -411,6 +417,9 @@ func (r *Inputer) fetchAuthEvents(
known map[string]*types.Event,
servers []gomatrixserverlib.ServerName,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "fetchAuthEvents")
defer span.Finish()
unknown := map[string]struct{}{}
authEventIDs := event.AuthEventIDs()
if len(authEventIDs) == 0 {
@ -526,6 +535,9 @@ func (r *Inputer) calculateAndSetState(
event *gomatrixserverlib.Event,
isRejected bool,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "calculateAndSetState")
defer span.Finish()
var succeeded bool
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
if err != nil {
@ -535,8 +547,6 @@ func (r *Inputer) calculateAndSetState(
roomState := state.NewStateResolution(updater, roomInfo)
if input.HasState {
stateAtEvent.Overwrite = true
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
@ -549,8 +559,6 @@ func (r *Inputer) calculateAndSetState(
return fmt.Errorf("updater.AddState: %w", err)
}
} else {
stateAtEvent.Overwrite = false
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, isRejected); err != nil {
return fmt.Errorf("roomState.CalculateAndStoreStateBeforeEvent: %w", err)

View file

@ -27,6 +27,8 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
// updateLatestEvents updates the list of latest events for this room in the database and writes the
@ -55,6 +57,9 @@ func (r *Inputer) updateLatestEvents(
transactionID *api.TransactionID,
rewritesState bool,
) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "updateLatestEvents")
defer span.Finish()
var succeeded bool
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
if err != nil {
@ -101,8 +106,8 @@ type latestEventsUpdater struct {
// The eventID of the event that was processed before this one.
lastEventIDSent string
// The latest events in the room after processing this event.
oldLatest []types.StateAtEventAndReference
latest []types.StateAtEventAndReference
oldLatest types.StateAtEventAndReferences
latest types.StateAtEventAndReferences
// The state entries removed from and added to the current state of the
// room as a result of processing this event. They are sorted lists.
removed []types.StateEntry
@ -125,7 +130,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// state snapshot from somewhere else, e.g. a federated room join,
// then start with an empty set - none of the forward extremities
// that we knew about before matter anymore.
u.oldLatest = []types.StateAtEventAndReference{}
u.oldLatest = types.StateAtEventAndReferences{}
if !u.rewritesState {
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
u.oldLatest = u.updater.LatestEvents()
@ -199,13 +204,16 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
}
func (u *latestEventsUpdater) latestState() error {
span, ctx := opentracing.StartSpanFromContext(u.ctx, "processEventWithMissingState")
defer span.Finish()
var err error
roomState := state.NewStateResolution(u.updater, u.roomInfo)
// Work out if the state at the extremities has actually changed
// or not. If they haven't then we won't bother doing all of the
// hard work.
if u.event.StateKey() == nil {
if !u.stateAtEvent.IsStateEvent() {
stateChanged := false
oldStateNIDs := make([]types.StateSnapshotNID, 0, len(u.oldLatest))
newStateNIDs := make([]types.StateSnapshotNID, 0, len(u.latest))
@ -233,54 +241,51 @@ func (u *latestEventsUpdater) latestState() error {
}
}
// Take the old set of extremities and the new set of extremities and
// mash them together into a list. This may or may not include the new event
// from the input path, depending on whether it became a forward extremity
// or not. We'll then run state resolution across all of them to determine
// the new current state of the room. Including the old extremities here
// ensures that new forward extremities with bad state snapshots (from
// possible malicious actors) can't completely corrupt the room state
// away from what it was before.
combinedExtremities := types.StateAtEventAndReferences(append(u.oldLatest, u.latest...))
combinedExtremities = combinedExtremities[:util.SortAndUnique(combinedExtremities)]
latestStateAtEvents := make([]types.StateAtEvent, len(combinedExtremities))
for i := range combinedExtremities {
latestStateAtEvents[i] = combinedExtremities[i].StateAtEvent
// Get a list of the current latest events. This may or may not
// include the new event from the input path, depending on whether
// it is a forward extremity or not.
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
for i := range u.latest {
latestStateAtEvents[i] = u.latest[i].StateAtEvent
}
// Takes the NIDs of the latest events and creates a state snapshot
// of the state after the events. The snapshot state will be resolved
// using the correct state resolution algorithm for the room.
u.newStateNID, err = roomState.CalculateAndStoreStateAfterEvents(
u.ctx, latestStateAtEvents,
ctx, latestStateAtEvents,
)
if err != nil {
return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err)
}
// If we are overwriting the state then we should make sure that we
// don't send anything out over federation again, it will very likely
// be a repeat.
if u.stateAtEvent.Overwrite {
u.sendAsServer = ""
}
// Now that we have a new state snapshot based on the latest events,
// we can compare that new snapshot to the previous one and see what
// has changed. This gives us one list of removed state events and
// another list of added ones. Replacing a value for a state-key tuple
// will result one removed (the old event) and one added (the new event).
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
u.ctx, u.oldStateNID, u.newStateNID,
ctx, u.oldStateNID, u.newStateNID,
)
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
}
if removed := len(u.removed) - len(u.added); removed > 0 {
logrus.WithFields(logrus.Fields{
"event_id": u.event.EventID(),
"room_id": u.event.RoomID(),
"old_state_nid": u.oldStateNID,
"new_state_nid": u.newStateNID,
"old_latest": u.oldLatest.EventIDs(),
"new_latest": u.latest.EventIDs(),
}).Errorf("Unexpected state deletion (removing %d events)", removed)
}
// Also work out the state before the event removes and the event
// adds.
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = roomState.DifferenceBetweeenStateSnapshots(
u.ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
)
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweeenStateSnapshots: %w", err)
@ -296,6 +301,9 @@ func (u *latestEventsUpdater) calculateLatest(
newEvent *gomatrixserverlib.Event,
newStateAndRef types.StateAtEventAndReference,
) (bool, error) {
span, _ := opentracing.StartSpanFromContext(u.ctx, "calculateLatest")
defer span.Finish()
// First of all, get a list of all of the events in our current
// set of forward extremities.
existingRefs := make(map[string]*types.StateAtEventAndReference)

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
// updateMembership updates the current membership and the invites for each
@ -34,6 +35,9 @@ func (r *Inputer) updateMemberships(
updater *shared.RoomUpdater,
removed, added []types.StateEntry,
) ([]api.OutputEvent, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "updateMemberships")
defer span.Finish()
changes := membershipChanges(removed, added)
var eventNIDs []types.EventNID
for _, change := range changes {

View file

@ -15,6 +15,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
@ -59,6 +60,9 @@ type missingStateReq struct {
func (t *missingStateReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "processEventWithMissingState")
defer span.Finish()
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@ -235,6 +239,9 @@ func (t *missingStateReq) processEventWithMissingState(
}
func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupResolvedStateBeforeEvent")
defer span.Finish()
type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver.
// That's because the state will have been through state resolution once
@ -310,6 +317,9 @@ func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateAfterEvent")
defer span.Finish()
// try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
if respState != nil {
@ -361,6 +371,9 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.Event) *gomatrixs
}
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *parsedRespState {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateAfterEventLocally")
defer span.Finish()
var res parsedRespState
roomInfo, err := t.db.RoomInfo(ctx, roomID)
if err != nil {
@ -435,12 +448,17 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
// the server supports.
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateBeforeEvent")
defer span.Finish()
// Attempt to fetch the missing state using /state_ids and /events
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
}
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "resolveStatesAndCheck")
defer span.Finish()
var authEventList []*gomatrixserverlib.Event
var stateEventList []*gomatrixserverlib.Event
for _, state := range states {
@ -484,6 +502,9 @@ retryAllowedState:
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getMissingEvents")
defer span.Finish()
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID)
if err != nil {
@ -608,6 +629,9 @@ func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e *gomatrixserve
func (t *missingStateReq) lookupMissingStateViaState(
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
) (respState *parsedRespState, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaState")
defer span.Finish()
state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion)
if err != nil {
return nil, err
@ -637,6 +661,9 @@ func (t *missingStateReq) lookupMissingStateViaState(
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaStateIDs")
defer span.Finish()
util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event
stateIDs, err := t.federation.LookupStateIDs(ctx, t.origin, roomID, eventID)
@ -799,6 +826,9 @@ func (t *missingStateReq) createRespStateFromStateIDs(
}
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.Event, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupEvent")
defer span.Finish()
if localFirst {
// fetch from the roomserver
events, err := t.db.EventsFromIDs(ctx, []string{missingEventID})

View file

@ -105,13 +105,13 @@ func (r *Upgrader) performRoomUpgrade(
return "", pErr
}
// 5. Send the tombstone event to the old room (must do this before we set the new canonical_alias)
if pErr = r.sendHeaderedEvent(ctx, tombstoneEvent); pErr != nil {
// Send the setup events to the new room
if pErr = r.sendInitialEvents(ctx, evTime, userID, newRoomID, string(req.RoomVersion), eventsToMake); pErr != nil {
return "", pErr
}
// Send the setup events to the new room
if pErr = r.sendInitialEvents(ctx, evTime, userID, newRoomID, string(req.RoomVersion), eventsToMake); pErr != nil {
// 5. Send the tombstone event to the old room
if pErr = r.sendHeaderedEvent(ctx, tombstoneEvent, string(r.Cfg.Matrix.ServerName)); pErr != nil {
return "", pErr
}
@ -147,7 +147,7 @@ func (r *Upgrader) getRoomPowerLevels(ctx context.Context, roomID string) (*goma
if err != nil {
util.GetLogger(ctx).WithError(err).Error()
return nil, &api.PerformError{
Msg: "powerLevel event was not actually a power level event",
Msg: "Power level event was invalid or malformed",
}
}
return powerLevelContent, nil
@ -182,7 +182,7 @@ func (r *Upgrader) restrictOldRoomPowerLevels(ctx context.Context, evTime time.T
return resErr
}
} else {
if resErr = r.sendHeaderedEvent(ctx, restrictedPowerLevelsHeadered); resErr != nil {
if resErr = r.sendHeaderedEvent(ctx, restrictedPowerLevelsHeadered, api.DoNotSendToOtherServers); resErr != nil {
return resErr
}
}
@ -198,7 +198,7 @@ func moveLocalAliases(ctx context.Context,
aliasRes := api.GetAliasesForRoomIDResponse{}
if err = URSAPI.GetAliasesForRoomID(ctx, &aliasReq, &aliasRes); err != nil {
return &api.PerformError{
Msg: "Could not get aliases for old room",
Msg: fmt.Sprintf("Failed to get old room aliases: %s", err),
}
}
@ -207,7 +207,7 @@ func moveLocalAliases(ctx context.Context,
removeAliasRes := api.RemoveRoomAliasResponse{}
if err = URSAPI.RemoveRoomAlias(ctx, &removeAliasReq, &removeAliasRes); err != nil {
return &api.PerformError{
Msg: "api.RemoveRoomAlias failed",
Msg: fmt.Sprintf("Failed to remove old room alias: %s", err),
}
}
@ -215,7 +215,7 @@ func moveLocalAliases(ctx context.Context,
setAliasRes := api.SetRoomAliasResponse{}
if err = URSAPI.SetRoomAlias(ctx, &setAliasReq, &setAliasRes); err != nil {
return &api.PerformError{
Msg: "api.SetRoomAlias failed",
Msg: fmt.Sprintf("Failed to set new room alias: %s", err),
}
}
}
@ -253,7 +253,7 @@ func (r *Upgrader) clearOldCanonicalAliasEvent(ctx context.Context, oldRoom *api
return resErr
}
} else {
if resErr = r.sendHeaderedEvent(ctx, emptyCanonicalAliasEvent); resErr != nil {
if resErr = r.sendHeaderedEvent(ctx, emptyCanonicalAliasEvent, api.DoNotSendToOtherServers); resErr != nil {
return resErr
}
}
@ -509,7 +509,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
err = builder.SetContent(e.Content)
if err != nil {
return &api.PerformError{
Msg: "builder.SetContent failed",
Msg: fmt.Sprintf("Failed to set content of new %q event: %s", builder.Type, err),
}
}
if i > 0 {
@ -519,13 +519,13 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
event, err = r.buildEvent(&builder, &authEvents, evTime, gomatrixserverlib.RoomVersion(newVersion))
if err != nil {
return &api.PerformError{
Msg: "buildEvent failed",
Msg: fmt.Sprintf("Failed to build new %q event: %s", builder.Type, err),
}
}
if err = gomatrixserverlib.Allowed(event, &authEvents); err != nil {
return &api.PerformError{
Msg: "gomatrixserverlib.Allowed failed",
Msg: fmt.Sprintf("Failed to auth new %q event: %s", builder.Type, err),
}
}
@ -534,7 +534,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
err = authEvents.AddEvent(event)
if err != nil {
return &api.PerformError{
Msg: "authEvents.AddEvent failed",
Msg: fmt.Sprintf("Failed to add new %q event to auth set: %s", builder.Type, err),
}
}
}
@ -550,7 +550,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
}
if err = api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
return &api.PerformError{
Msg: "api.SendInputRoomEvents failed",
Msg: fmt.Sprintf("Failed to send new room %q to roomserver: %s", newRoomID, err),
}
}
return nil
@ -582,7 +582,7 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user
err := builder.SetContent(event.Content)
if err != nil {
return nil, &api.PerformError{
Msg: "builder.SetContent failed",
Msg: fmt.Sprintf("Failed to set new %q event content: %s", builder.Type, err),
}
}
var queryRes api.QueryLatestEventsAndStateResponse
@ -607,7 +607,7 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user
}
} else if err != nil {
return nil, &api.PerformError{
Msg: "eventutil.BuildEvent failed",
Msg: fmt.Sprintf("Failed to build new %q event: %s", builder.Type, err),
}
}
// check to see if this user can perform this operation
@ -619,7 +619,7 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user
if err = gomatrixserverlib.Allowed(headeredEvent.Event, &provider); err != nil {
return nil, &api.PerformError{
Code: api.PerformErrorNotAllowed,
Msg: err.Error(), // TODO: Is this error string comprehensible to the client?
Msg: fmt.Sprintf("Failed to auth new %q event: %s", builder.Type, err), // TODO: Is this error string comprehensible to the client?
}
}
@ -666,17 +666,18 @@ func createTemporaryPowerLevels(powerLevelContent *gomatrixserverlib.PowerLevelC
func (r *Upgrader) sendHeaderedEvent(
ctx context.Context,
headeredEvent *gomatrixserverlib.HeaderedEvent,
sendAsServer string,
) *api.PerformError {
var inputs []api.InputRoomEvent
inputs = append(inputs, api.InputRoomEvent{
Kind: api.KindNew,
Event: headeredEvent,
Origin: r.Cfg.Matrix.ServerName,
SendAsServer: api.DoNotSendToOtherServers,
SendAsServer: sendAsServer,
})
if err := api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
return &api.PerformError{
Msg: "api.SendInputRoomEvents failed",
Msg: fmt.Sprintf("Failed to send new %q event to roomserver: %s", headeredEvent.Type(), err),
}
}
@ -703,7 +704,7 @@ func (r *Upgrader) buildEvent(
r.Cfg.Matrix.PrivateKey, roomVersion,
)
if err != nil {
return nil, fmt.Errorf("cannot build event %s : Builder failed to build. %w", builder.Type, err)
return nil, err
}
return event, nil
}

View file

@ -772,17 +772,24 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
}
// If the room version doesn't allow restricted joins then don't
// try to process any further.
allowRestrictedJoins, err := roomInfo.RoomVersion.AllowRestrictedJoinsInEventAuth()
allowRestrictedJoins, err := roomInfo.RoomVersion.MayAllowRestrictedJoinsInEventAuth()
if err != nil {
return fmt.Errorf("roomInfo.RoomVersion.AllowRestrictedJoinsInEventAuth: %w", err)
} else if !allowRestrictedJoins {
return nil
}
// Start off by populating the "resident" flag in the response. If we
// come across any rooms in the request that are missing, we will unset
// the flag.
res.Resident = true
// Get the join rules to work out if the join rule is "restricted".
joinRulesEvent, err := r.DB.GetStateEvent(ctx, req.RoomID, gomatrixserverlib.MRoomJoinRules, "")
if err != nil {
return fmt.Errorf("r.DB.GetStateEvent: %w", err)
}
if joinRulesEvent == nil {
return nil
}
var joinRules gomatrixserverlib.JoinRuleContent
if err = json.Unmarshal(joinRulesEvent.Content(), &joinRules); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
@ -792,10 +799,6 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
if !res.Restricted {
return nil
}
// Start off by populating the "resident" flag in the response. If we
// come across any rooms in the request that are missing, we will unset
// the flag.
res.Resident = true
// If the user is already invited to the room then the join is allowed
// but we don't specify an authorised via user, since the event auth
// will allow the join anyway.

View file

@ -20,9 +20,11 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/matrix-org/dendrite/roomserver/types"
@ -39,6 +41,7 @@ type StateResolutionStorage interface {
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
}
type StateResolution struct {
@ -61,6 +64,9 @@ func NewStateResolution(db StateResolutionStorage, roomInfo *types.RoomInfo) Sta
func (v *StateResolution) LoadStateAtSnapshot(
ctx context.Context, stateNID types.StateSnapshotNID,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtSnapshot")
defer span.Finish()
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
if err != nil {
return nil, err
@ -99,6 +105,9 @@ func (v *StateResolution) LoadStateAtSnapshot(
func (v *StateResolution) LoadStateAtEvent(
ctx context.Context, eventID string,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtEvent")
defer span.Finish()
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
if err != nil {
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %s", eventID, err)
@ -121,6 +130,9 @@ func (v *StateResolution) LoadStateAtEvent(
func (v *StateResolution) LoadCombinedStateAfterEvents(
ctx context.Context, prevStates []types.StateAtEvent,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadCombinedStateAfterEvents")
defer span.Finish()
stateNIDs := make([]types.StateSnapshotNID, len(prevStates))
for i, state := range prevStates {
stateNIDs[i] = state.BeforeStateSnapshotNID
@ -193,6 +205,9 @@ func (v *StateResolution) LoadCombinedStateAfterEvents(
func (v *StateResolution) DifferenceBetweeenStateSnapshots(
ctx context.Context, oldStateNID, newStateNID types.StateSnapshotNID,
) (removed, added []types.StateEntry, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.DifferenceBetweeenStateSnapshots")
defer span.Finish()
if oldStateNID == newStateNID {
// If the snapshot NIDs are the same then nothing has changed
return nil, nil, nil
@ -254,6 +269,9 @@ func (v *StateResolution) LoadStateAtSnapshotForStringTuples(
stateNID types.StateSnapshotNID,
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtSnapshotForStringTuples")
defer span.Finish()
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
if err != nil {
return nil, err
@ -268,6 +286,9 @@ func (v *StateResolution) stringTuplesToNumericTuples(
ctx context.Context,
stringTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateKeyTuple, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.stringTuplesToNumericTuples")
defer span.Finish()
eventTypes := make([]string, len(stringTuples))
stateKeys := make([]string, len(stringTuples))
for i := range stringTuples {
@ -310,6 +331,9 @@ func (v *StateResolution) loadStateAtSnapshotForNumericTuples(
stateNID types.StateSnapshotNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateAtSnapshotForNumericTuples")
defer span.Finish()
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
if err != nil {
return nil, err
@ -358,6 +382,9 @@ func (v *StateResolution) LoadStateAfterEventsForStringTuples(
prevStates []types.StateAtEvent,
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAfterEventsForStringTuples")
defer span.Finish()
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
if err != nil {
return nil, err
@ -370,6 +397,9 @@ func (v *StateResolution) loadStateAfterEventsForNumericTuples(
prevStates []types.StateAtEvent,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateAfterEventsForNumericTuples")
defer span.Finish()
if len(prevStates) == 1 {
// Fast path for a single event.
prevState := prevStates[0]
@ -542,6 +572,9 @@ func (v *StateResolution) CalculateAndStoreStateBeforeEvent(
event *gomatrixserverlib.Event,
isRejected bool,
) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.CalculateAndStoreStateBeforeEvent")
defer span.Finish()
// Load the state at the prev events.
prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs())
if err != nil {
@ -558,6 +591,9 @@ func (v *StateResolution) CalculateAndStoreStateAfterEvents(
ctx context.Context,
prevStates []types.StateAtEvent,
) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.CalculateAndStoreStateAfterEvents")
defer span.Finish()
metrics := calculateStateMetrics{startTime: time.Now(), prevEventLength: len(prevStates)}
if len(prevStates) == 0 {
@ -630,6 +666,9 @@ func (v *StateResolution) calculateAndStoreStateAfterManyEvents(
prevStates []types.StateAtEvent,
metrics calculateStateMetrics,
) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.calculateAndStoreStateAfterManyEvents")
defer span.Finish()
state, algorithm, conflictLength, err :=
v.calculateStateAfterManyEvents(ctx, v.roomInfo.RoomVersion, prevStates)
metrics.algorithm = algorithm
@ -648,6 +687,9 @@ func (v *StateResolution) calculateStateAfterManyEvents(
ctx context.Context, roomVersion gomatrixserverlib.RoomVersion,
prevStates []types.StateAtEvent,
) (state []types.StateEntry, algorithm string, conflictLength int, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.calculateStateAfterManyEvents")
defer span.Finish()
var combined []types.StateEntry
// Conflict resolution.
// First stage: load the state after each of the prev events.
@ -659,15 +701,13 @@ func (v *StateResolution) calculateStateAfterManyEvents(
}
// Collect all the entries with the same type and key together.
// We don't care about the order here because the conflict resolution
// algorithm doesn't depend on the order of the prev events.
// Remove duplicate entires.
// This is done so findDuplicateStateKeys can work in groups.
// We remove duplicates (same type, state key and event NID) too.
combined = combined[:util.SortAndUnique(stateEntrySorter(combined))]
// Find the conflicts
conflicts := findDuplicateStateKeys(combined)
if len(conflicts) > 0 {
if conflicts := findDuplicateStateKeys(combined); len(conflicts) > 0 {
conflictMap := stateEntryMap(conflicts)
conflictLength = len(conflicts)
// 5) There are conflicting state events, for each conflict workout
@ -676,7 +716,7 @@ func (v *StateResolution) calculateStateAfterManyEvents(
// Work out which entries aren't conflicted.
var notConflicted []types.StateEntry
for _, entry := range combined {
if _, ok := stateEntryMap(conflicts).lookup(entry.StateKeyTuple); !ok {
if _, ok := conflictMap.lookup(entry.StateKeyTuple); !ok {
notConflicted = append(notConflicted, entry)
}
}
@ -689,7 +729,7 @@ func (v *StateResolution) calculateStateAfterManyEvents(
return
}
algorithm = "full_state_with_conflicts"
state = resolved[:util.SortAndUnique(stateEntrySorter(resolved))]
state = resolved
} else {
algorithm = "full_state_no_conflicts"
// 6) There weren't any conflicts
@ -702,6 +742,9 @@ func (v *StateResolution) resolveConflicts(
ctx context.Context, version gomatrixserverlib.RoomVersion,
notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflicts")
defer span.Finish()
stateResAlgo, err := version.StateResAlgorithm()
if err != nil {
return nil, err
@ -726,6 +769,8 @@ func (v *StateResolution) resolveConflictsV1(
ctx context.Context,
notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflictsV1")
defer span.Finish()
// Load the conflicted events
conflictedEvents, eventIDMap, err := v.loadStateEvents(ctx, conflicted)
@ -789,6 +834,9 @@ func (v *StateResolution) resolveConflictsV2(
ctx context.Context,
notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflictsV2")
defer span.Finish()
estimate := len(conflicted) + len(notConflicted)
eventIDMap := make(map[string]types.StateEntry, estimate)
@ -816,51 +864,47 @@ func (v *StateResolution) resolveConflictsV2(
authEvents := make([]*gomatrixserverlib.Event, 0, estimate*3)
gotAuthEvents := make(map[string]struct{}, estimate*3)
authDifference := make([]*gomatrixserverlib.Event, 0, estimate)
knownAuthEvents := make(map[string]types.Event, estimate*3)
// For each conflicted event, let's try and get the needed auth events.
neededStateKeys := make([]string, 16)
authEntries := make([]types.StateEntry, 16)
for _, conflictedEvent := range conflictedEvents {
// Work out which auth events we need to load.
key := conflictedEvent.EventID()
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{conflictedEvent})
if err = func() error {
span, sctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadAuthEvents")
defer span.Finish()
// Find the numeric IDs for the necessary state keys.
neededStateKeys = neededStateKeys[:0]
neededStateKeys = append(neededStateKeys, needed.Member...)
neededStateKeys = append(neededStateKeys, needed.ThirdPartyInvite...)
stateKeyNIDMap, err := v.db.EventStateKeyNIDs(ctx, neededStateKeys)
if err != nil {
return nil, err
loader := authEventLoader{
v: v,
lookupFromDB: make([]string, 0, len(conflictedEvents)*3),
lookupFromMem: make([]string, 0, len(conflictedEvents)*3),
lookedUpEvents: make([]types.Event, 0, len(conflictedEvents)*3),
eventMap: map[string]types.Event{},
}
for _, conflictedEvent := range conflictedEvents {
// Work out which auth events we need to load.
key := conflictedEvent.EventID()
// Load the necessary auth events.
tuplesNeeded := v.stateKeyTuplesNeeded(stateKeyNIDMap, needed)
authEntries = authEntries[:0]
for _, tuple := range tuplesNeeded {
if eventNID, ok := stateEntryMap(notConflicted).lookup(tuple); ok {
authEntries = append(authEntries, types.StateEntry{
StateKeyTuple: tuple,
EventNID: eventNID,
})
}
}
// Store the newly found auth events in the auth set for this event.
authSets[key], _, err = v.loadStateEvents(ctx, authEntries)
if err != nil {
return nil, err
}
// Only add auth events into the authEvents slice once, otherwise the
// check for the auth difference can become expensive and produce
// duplicate entries, which just waste memory and CPU time.
for _, event := range authSets[key] {
if _, ok := gotAuthEvents[event.EventID()]; !ok {
authEvents = append(authEvents, event)
gotAuthEvents[event.EventID()] = struct{}{}
// Store the newly found auth events in the auth set for this event.
var authEventMap map[string]types.StateEntry
authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, conflictedEvent, knownAuthEvents)
if err != nil {
return err
}
for k, v := range authEventMap {
eventIDMap[k] = v
}
// Only add auth events into the authEvents slice once, otherwise the
// check for the auth difference can become expensive and produce
// duplicate entries, which just waste memory and CPU time.
for _, event := range authSets[key] {
if _, ok := gotAuthEvents[event.EventID()]; !ok {
authEvents = append(authEvents, event)
gotAuthEvents[event.EventID()] = struct{}{}
}
}
}
return nil
}(); err != nil {
return nil, err
}
// Kill the reference to this so that the GC may pick it up, since we no
@ -891,25 +935,35 @@ func (v *StateResolution) resolveConflictsV2(
// Look through all of the auth events that we've been given and work out if
// there are any events which don't appear in all of the auth sets. If they
// don't then we add them to the auth difference.
for _, event := range authEvents {
if !isInAllAuthLists(event) {
authDifference = append(authDifference, event)
func() {
span, _ := opentracing.StartSpanFromContext(ctx, "isInAllAuthLists")
defer span.Finish()
for _, event := range authEvents {
if !isInAllAuthLists(event) {
authDifference = append(authDifference, event)
}
}
}
}()
// Resolve the conflicts.
resolvedEvents := gomatrixserverlib.ResolveStateConflictsV2(
conflictedEvents,
nonConflictedEvents,
authEvents,
authDifference,
)
resolvedEvents := func() []*gomatrixserverlib.Event {
span, _ := opentracing.StartSpanFromContext(ctx, "gomatrixserverlib.ResolveStateConflictsV2")
defer span.Finish()
return gomatrixserverlib.ResolveStateConflictsV2(
conflictedEvents,
nonConflictedEvents,
authEvents,
authDifference,
)
}()
// Map from the full events back to numeric state entries.
for _, resolvedEvent := range resolvedEvents {
entry, ok := eventIDMap[resolvedEvent.EventID()]
if !ok {
panic(fmt.Errorf("missing state entry for event ID %q", resolvedEvent.EventID()))
return nil, fmt.Errorf("missing state entry for event ID %q", resolvedEvent.EventID())
}
notConflicted = append(notConflicted, entry)
}
@ -968,6 +1022,9 @@ func (v *StateResolution) stateKeyTuplesNeeded(stateKeyNIDMap map[string]types.E
func (v *StateResolution) loadStateEvents(
ctx context.Context, entries []types.StateEntry,
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateEvents")
defer span.Finish()
result := make([]*gomatrixserverlib.Event, 0, len(entries))
eventEntries := make([]types.StateEntry, 0, len(entries))
eventNIDs := make([]types.EventNID, 0, len(entries))
@ -996,6 +1053,127 @@ func (v *StateResolution) loadStateEvents(
return result, eventIDMap, nil
}
type authEventLoader struct {
sync.Mutex
v *StateResolution
lookupFromDB []string // scratch space
lookupFromMem []string // scratch space
lookedUpEvents []types.Event // scratch space
eventMap map[string]types.Event
}
// loadAuthEvents loads all of the auth events for a given event recursively,
// along with a map that contains state entries for all of the auth events.
func (l *authEventLoader) loadAuthEvents(
ctx context.Context, event *gomatrixserverlib.Event, eventMap map[string]types.Event,
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
l.Lock()
defer l.Unlock()
authEvents := []types.Event{} // our returned list
included := map[string]struct{}{} // dedupes authEvents above
queue := event.AuthEventIDs()
for i := 0; i < len(queue); i++ {
// Reuse the same underlying memory, since it reduces the
// amount of allocations we make the more times we call
// loadAuthEvents.
l.lookupFromDB = l.lookupFromDB[:0]
l.lookupFromMem = l.lookupFromMem[:0]
l.lookedUpEvents = l.lookedUpEvents[:0]
// Separate out the list of events in the queue based on if
// we think we already know the event in memory or not.
for _, authEventID := range queue {
if _, ok := included[authEventID]; ok {
continue
}
if _, ok := eventMap[authEventID]; ok {
l.lookupFromMem = append(l.lookupFromMem, authEventID)
} else {
l.lookupFromDB = append(l.lookupFromDB, authEventID)
}
}
// If there's nothing to do, stop here.
if len(l.lookupFromDB) == 0 && len(l.lookupFromMem) == 0 {
break
}
// If we need to get events from the database, go and fetch
// those now.
if len(l.lookupFromDB) > 0 {
eventsFromDB, err := l.v.db.EventsFromIDs(ctx, l.lookupFromDB)
if err != nil {
return nil, nil, fmt.Errorf("v.db.EventsFromIDs: %w", err)
}
l.lookedUpEvents = append(l.lookedUpEvents, eventsFromDB...)
for _, event := range eventsFromDB {
eventMap[event.EventID()] = event
}
}
// Fill in the gaps with events that we already have in memory.
if len(l.lookupFromMem) > 0 {
for _, eventID := range l.lookupFromMem {
l.lookedUpEvents = append(l.lookedUpEvents, eventMap[eventID])
}
}
// From the events that we've retrieved, work out which auth
// events to look up on the next iteration.
add := map[string]struct{}{}
for _, event := range l.lookedUpEvents {
authEvents = append(authEvents, event)
included[event.EventID()] = struct{}{}
for _, authEventID := range event.AuthEventIDs() {
if _, ok := included[authEventID]; ok {
continue
}
add[authEventID] = struct{}{}
}
}
for authEventID := range add {
queue = append(queue, authEventID)
}
}
authEventTypes := map[string]struct{}{}
authEventStateKeys := map[string]struct{}{}
for _, authEvent := range authEvents {
authEventTypes[authEvent.Type()] = struct{}{}
authEventStateKeys[*authEvent.StateKey()] = struct{}{}
}
lookupAuthEventTypes := make([]string, 0, len(authEventTypes))
lookupAuthEventStateKeys := make([]string, 0, len(authEventStateKeys))
for eventType := range authEventTypes {
lookupAuthEventTypes = append(lookupAuthEventTypes, eventType)
}
for eventStateKey := range authEventStateKeys {
lookupAuthEventStateKeys = append(lookupAuthEventStateKeys, eventStateKey)
}
eventTypes, err := l.v.db.EventTypeNIDs(ctx, lookupAuthEventTypes)
if err != nil {
return nil, nil, fmt.Errorf("v.db.EventTypeNIDs: %w", err)
}
eventStateKeys, err := l.v.db.EventStateKeyNIDs(ctx, lookupAuthEventStateKeys)
if err != nil {
return nil, nil, fmt.Errorf("v.db.EventStateKeyNIDs: %w", err)
}
stateEntryMap := map[string]types.StateEntry{}
for _, authEvent := range authEvents {
stateEntryMap[authEvent.EventID()] = types.StateEntry{
EventNID: authEvent.EventNID,
StateKeyTuple: types.StateKeyTuple{
EventTypeNID: eventTypes[authEvent.Type()],
EventStateKeyNID: eventStateKeys[*authEvent.StateKey()],
},
}
}
nakedEvents := make([]*gomatrixserverlib.Event, 0, len(authEvents))
for _, authEvent := range authEvents {
nakedEvents = append(nakedEvents, authEvent.Event)
}
return nakedEvents, stateEntryMap, nil
}
// findDuplicateStateKeys finds the state entries where the state key tuple appears more than once in a sorted list.
// Returns a sorted list of those state entries.
func findDuplicateStateKeys(a []types.StateEntry) []types.StateEntry {

View file

@ -192,6 +192,10 @@ func (u *RoomUpdater) StateAtEventIDs(
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
}
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, false)
}
func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true)
}

View file

@ -85,7 +85,6 @@ func Test_EventsTable(t *testing.T) {
assert.NoError(t, err)
}
stateAtEvent := types.StateAtEvent{
Overwrite: false,
BeforeStateSnapshotNID: types.StateSnapshotNID(stateSnapshot),
IsRejected: false,
StateEntry: types.StateEntry{

View file

@ -173,10 +173,6 @@ func DeduplicateStateEntries(a []StateEntry) []StateEntry {
// StateAtEvent is the state before and after a matrix event.
type StateAtEvent struct {
// Should this state overwrite the latest events and memberships of the room?
// This might be necessary when rejoining a federated room after a period of
// absence, as our state and latest events will be out of date.
Overwrite bool
// The state before the event.
BeforeStateSnapshotNID StateSnapshotNID
// True if this StateEntry is rejected. State resolution should then treat this
@ -214,6 +210,14 @@ func (s StateAtEventAndReferences) Swap(a, b int) {
s[a], s[b] = s[b], s[a]
}
func (s StateAtEventAndReferences) EventIDs() string {
strs := make([]string, 0, len(s))
for _, r := range s {
strs = append(strs, r.EventID)
}
return "[" + strings.Join(strs, " ") + "]"
}
// An Event is a gomatrixserverlib.Event with the numeric event ID attached.
// It is when performing bulk event lookup in the database.
type Event struct {

View file

@ -134,6 +134,10 @@ type RateLimiting struct {
// The cooloff period in milliseconds after a request before the "slot"
// is freed again
CooloffMS int64 `yaml:"cooloff_ms"`
// A list of users that are exempt from rate limiting, i.e. if you want
// to run Mjolnir or other bots.
ExemptUserIDs []string `yaml:"exempt_user_ids"`
}
func (r *RateLimiting) Verify(configErrs *ConfigErrors) {

View file

@ -717,3 +717,6 @@ Unnamed room comes with a name summary
Named room comes with just joined member count summary
Room summary only has 5 heroes
registration is idempotent, with username specified
Setting state twice is idempotent
Joining room twice is idempotent
Inbound federation can return missing events for shared visibility

View file

@ -65,7 +65,7 @@ const selectPasswordHashSQL = "" +
"SELECT password_hash FROM account_accounts WHERE localpart = $1 AND is_deactivated = FALSE"
const selectNewNumericLocalpartSQL = "" +
"SELECT COALESCE(MAX(localpart::integer), 0) FROM account_accounts WHERE localpart ~ '^[0-9]*$'"
"SELECT COALESCE(MAX(localpart::bigint), 0) FROM account_accounts WHERE localpart ~ '^[0-9]{1,}$'"
type accountsStatements struct {
insertAccountStmt *sql.Stmt

View file

@ -124,6 +124,23 @@ func Test_Accounts(t *testing.T) {
_, err = db.GetAccountByLocalpart(ctx, "unusename")
assert.Error(t, err, "expected an error for non existent localpart")
// create an empty localpart; this should never happen, but is required to test getting a numeric localpart
// if there's already a user without a localpart in the database
_, err = db.CreateAccount(ctx, "", "", "", api.AccountTypeUser)
assert.NoError(t, err)
// test getting a numeric localpart, with an existing user without a localpart
_, err = db.CreateAccount(ctx, "", "", "", api.AccountTypeGuest)
assert.NoError(t, err)
// Create a user with a high numeric localpart, out of range for the Postgres integer (2147483647) type
_, err = db.CreateAccount(ctx, "2147483650", "", "", api.AccountTypeUser)
assert.NoError(t, err)
// Now try to create a new guest user
_, err = db.CreateAccount(ctx, "", "", "", api.AccountTypeGuest)
assert.NoError(t, err)
})
}