diff --git a/CHANGES.md b/CHANGES.md index a09a80148..0db25f05a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,22 @@ # Changelog +## Dendrite 0.8.8 (2022-06-09) + +### Features + +* The performance of state resolution has been increased significantly for larger rooms +* A number of changes have been made to rate limiting: + * Logged in users will now be rate-limited on a per-session basis rather than by remote IP + * Rate limiting no longer applies to admin or appservice users + * It is now possible to configure additional users that are exempt from rate limiting using the `exempt_user_ids` option in the `rate_limiting` section of the Dendrite config +* Setting state is now idempotent via the client API state endpoints + +### Fixes + +* Room upgrades now properly propagate tombstone events to remote servers +* Room upgrades will no longer send tombstone events if creating the upgraded room fails +* A crash has been fixed when evaluating restricted room joins + ## Dendrite 0.8.7 (2022-06-01) ### Features diff --git a/clientapi/routing/aliases.go b/clientapi/routing/aliases.go index 504d60265..68d0f4195 100644 --- a/clientapi/routing/aliases.go +++ b/clientapi/routing/aliases.go @@ -44,7 +44,7 @@ func GetAliases( return util.ErrorResponse(fmt.Errorf("rsAPI.QueryCurrentState: %w", err)) } - visibility := "invite" + visibility := gomatrixserverlib.HistoryVisibilityInvited if historyVisEvent, ok := stateRes.StateEvents[stateTuple]; ok { var err error visibility, err = historyVisEvent.HistoryVisibility() diff --git a/clientapi/routing/register.go b/clientapi/routing/register.go index eba4920c6..c4ac0f2e7 100644 --- a/clientapi/routing/register.go +++ b/clientapi/routing/register.go @@ -29,9 +29,10 @@ import ( "sync" "time" + "github.com/tidwall/gjson" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" - "github.com/tidwall/gjson" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/tokens" @@ -68,9 +69,10 @@ const ( // It shouldn't be passed by value because it contains a mutex. type sessionsDict struct { sync.RWMutex - sessions map[string][]authtypes.LoginType - params map[string]registerRequest - timer map[string]*time.Timer + sessions map[string][]authtypes.LoginType + sessionCompletedResult map[string]registerResponse + params map[string]registerRequest + timer map[string]*time.Timer // deleteSessionToDeviceID protects requests to DELETE /devices/{deviceID} from being abused. // If a UIA session is started by trying to delete device1, and then UIA is completed by deleting device2, // the delete request will fail for device2 since the UIA was initiated by trying to delete device1. @@ -115,6 +117,7 @@ func (d *sessionsDict) deleteSession(sessionID string) { delete(d.params, sessionID) delete(d.sessions, sessionID) delete(d.deleteSessionToDeviceID, sessionID) + delete(d.sessionCompletedResult, sessionID) // stop the timer, e.g. because the registration was completed if t, ok := d.timer[sessionID]; ok { if !t.Stop() { @@ -130,6 +133,7 @@ func (d *sessionsDict) deleteSession(sessionID string) { func newSessionsDict() *sessionsDict { return &sessionsDict{ sessions: make(map[string][]authtypes.LoginType), + sessionCompletedResult: make(map[string]registerResponse), params: make(map[string]registerRequest), timer: make(map[string]*time.Timer), deleteSessionToDeviceID: make(map[string]string), @@ -173,6 +177,19 @@ func (d *sessionsDict) addDeviceToDelete(sessionID, deviceID string) { d.deleteSessionToDeviceID[sessionID] = deviceID } +func (d *sessionsDict) addCompletedRegistration(sessionID string, response registerResponse) { + d.Lock() + defer d.Unlock() + d.sessionCompletedResult[sessionID] = response +} + +func (d *sessionsDict) getCompletedRegistration(sessionID string) (registerResponse, bool) { + d.RLock() + defer d.RUnlock() + result, ok := d.sessionCompletedResult[sessionID] + return result, ok +} + func (d *sessionsDict) getDeviceToDelete(sessionID string) (string, bool) { d.RLock() defer d.RUnlock() @@ -544,6 +561,14 @@ func Register( r.DeviceID = data.DeviceID r.InitialDisplayName = data.InitialDisplayName r.InhibitLogin = data.InhibitLogin + // Check if the user already registered using this session, if so, return that result + if response, ok := sessions.getCompletedRegistration(sessionID); ok { + return util.JSONResponse{ + Code: http.StatusOK, + JSON: response, + } + } + } if resErr := httputil.UnmarshalJSON(reqBody, &r); resErr != nil { return *resErr @@ -839,13 +864,6 @@ func completeRegistration( displayName, deviceID *string, accType userapi.AccountType, ) util.JSONResponse { - var registrationOK bool - defer func() { - if registrationOK { - sessions.deleteSession(sessionID) - } - }() - if username == "" { return util.JSONResponse{ Code: http.StatusBadRequest, @@ -886,7 +904,6 @@ func completeRegistration( // Check whether inhibit_login option is set. If so, don't create an access // token or a device for this user if inhibitLogin { - registrationOK = true return util.JSONResponse{ Code: http.StatusOK, JSON: registerResponse{ @@ -920,15 +937,17 @@ func completeRegistration( } } - registrationOK = true + result := registerResponse{ + UserID: devRes.Device.UserID, + AccessToken: devRes.Device.AccessToken, + HomeServer: accRes.Account.ServerName, + DeviceID: devRes.Device.ID, + } + sessions.addCompletedRegistration(sessionID, result) + return util.JSONResponse{ Code: http.StatusOK, - JSON: registerResponse{ - UserID: devRes.Device.UserID, - AccessToken: devRes.Device.AccessToken, - HomeServer: accRes.Account.ServerName, - DeviceID: devRes.Device.ID, - }, + JSON: result, } } diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go index 6ed6ebdb8..c02140003 100644 --- a/cmd/resolve-state/main.go +++ b/cmd/resolve-state/main.go @@ -28,7 +28,9 @@ import ( var roomVersion = flag.String("roomversion", "5", "the room version to parse events as") var filterType = flag.String("filtertype", "", "the event types to filter on") +var difference = flag.Bool("difference", false, "whether to calculate the difference between snapshots") +// nolint:gocyclo func main() { ctx := context.Background() cfg := setup.ParseFlags(true) @@ -36,6 +38,7 @@ func main() { Type: "std", Level: "error", }) + cfg.ClientAPI.RegistrationDisabled = true base := base.NewBaseDendrite(cfg, "ResolveState", base.DisableMetrics) args := flag.Args() @@ -64,6 +67,59 @@ func main() { RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion), }) + if *difference { + if len(snapshotNIDs) != 2 { + panic("need exactly two state snapshot NIDs to calculate difference") + } + var removed, added []types.StateEntry + removed, added, err = stateres.DifferenceBetweeenStateSnapshots(ctx, snapshotNIDs[0], snapshotNIDs[1]) + if err != nil { + panic(err) + } + + var eventNIDs []types.EventNID + for _, entry := range append(removed, added...) { + eventNIDs = append(eventNIDs, entry.EventNID) + } + + var eventEntries []types.Event + eventEntries, err = roomserverDB.Events(ctx, eventNIDs) + if err != nil { + panic(err) + } + + events := make(map[types.EventNID]*gomatrixserverlib.Event, len(eventEntries)) + for _, entry := range eventEntries { + events[entry.EventNID] = entry.Event + } + + if len(removed) > 0 { + fmt.Println("Removed:") + for _, r := range removed { + event := events[r.EventNID] + fmt.Println() + fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey()) + fmt.Printf(" %s\n", string(event.Content())) + } + } + + if len(removed) > 0 && len(added) > 0 { + fmt.Println() + } + + if len(added) > 0 { + fmt.Println("Added:") + for _, a := range added { + event := events[a.EventNID] + fmt.Println() + fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey()) + fmt.Printf(" %s\n", string(event.Content())) + } + } + + return + } + var stateEntries []types.StateEntry for _, snapshotNID := range snapshotNIDs { var entries []types.StateEntry diff --git a/docs/caddy/monolith/CaddyFile b/docs/caddy/monolith/CaddyFile new file mode 100644 index 000000000..cd93f9e10 --- /dev/null +++ b/docs/caddy/monolith/CaddyFile @@ -0,0 +1,68 @@ +{ + # debug + admin off + email example@example.com + default_sni example.com + # Debug endpoint + # acme_ca https://acme-staging-v02.api.letsencrypt.org/directory +} + +####################################################################### +# Snippets +#______________________________________________________________________ + +(handle_errors_maintenance) { + handle_errors { + @maintenance expression {http.error.status_code} == 502 + rewrite @maintenance maintenance.html + root * "/path/to/service/pages" + file_server + } +} + +(matrix-well-known-header) { + # Headers + header Access-Control-Allow-Origin "*" + header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS" + header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization" + header Content-Type "application/json" +} + +####################################################################### + +example.com { + + # ... + + handle /.well-known/matrix/server { + import matrix-well-known-header + respond `{ "m.server": "matrix.example.com:443" }` 200 + } + + handle /.well-known/matrix/client { + import matrix-well-known-header + respond `{ "m.homeserver": { "base_url": "https://matrix.example.com" } }` 200 + } + + import handle_errors_maintenance +} + +example.com:8448 { + # server<->server HTTPS traffic + reverse_proxy http://dendrite-host:8008 +} + +matrix.example.com { + + handle /_matrix/* { + # client<->server HTTPS traffic + reverse_proxy http://dendrite-host:8008 + } + + handle_path /* { + # Client webapp (Element SPA or ...) + file_server { + root /path/to/www/example.com/matrix-web-client/ + } + } +} diff --git a/docs/installation/10_optimisation.md b/docs/installation/10_optimisation.md new file mode 100644 index 000000000..c19b7a75e --- /dev/null +++ b/docs/installation/10_optimisation.md @@ -0,0 +1,71 @@ +--- +title: Optimise your installation +parent: Installation +has_toc: true +nav_order: 10 +permalink: /installation/start/optimisation +--- + +# Optimise your installation + +Now that you have Dendrite running, the following tweaks will improve the reliability +and performance of your installation. + +## File descriptor limit + +Most platforms have a limit on how many file descriptors a single process can open. All +connections made by Dendrite consume file descriptors — this includes database connections +and network requests to remote homeservers. When participating in large federated rooms +where Dendrite must talk to many remote servers, it is often very easy to exhaust default +limits which are quite low. + +We currently recommend setting the file descriptor limit to 65535 to avoid such +issues. Dendrite will log immediately after startup if the file descriptor limit is too low: + +``` +level=warning msg="IMPORTANT: Process file descriptor limit is currently 1024, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues" +``` + +UNIX systems have two limits: a hard limit and a soft limit. You can view the soft limit +by running `ulimit -Sn` and the hard limit with `ulimit -Hn`: + +```bash +$ ulimit -Hn +1048576 + +$ ulimit -Sn +1024 +``` + +Increase the soft limit before starting Dendrite: + +```bash +ulimit -Sn 65535 +``` + +The log line at startup should no longer appear if the limit is sufficient. + +If you are running under a systemd service, you can instead add `LimitNOFILE=65535` option +to the `[Service]` section of your service unit file. + +## DNS caching + +Dendrite has a built-in DNS cache which significantly reduces the load that Dendrite will +place on your DNS resolver. This may also speed up outbound federation. + +Consider enabling the DNS cache by modifying the `global` section of your configuration file: + +```yaml + dns_cache: + enabled: true + cache_size: 4096 + cache_lifetime: 600s +``` + +## Time synchronisation + +Matrix relies heavily on TLS which requires the system time to be correct. If the clock +drifts then you may find that federation no works reliably (or at all) and clients may +struggle to connect to your Dendrite server. + +Ensure that the time is synchronised on your system by enabling NTP sync. diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index ff159beea..97bcc12a5 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -63,6 +63,7 @@ func AddPublicRoutes( TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), ServerName: cfg.Matrix.ServerName, UserAPI: userAPI, } diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 494150036..6453d026c 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -17,6 +17,7 @@ package producers import ( "context" "encoding/json" + "fmt" "strconv" "time" @@ -34,6 +35,7 @@ type SyncAPIProducer struct { TopicSendToDeviceEvent string TopicTypingEvent string TopicPresenceEvent string + TopicDeviceListUpdate string JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName UserAPI userapi.UserInternalAPI @@ -161,3 +163,18 @@ func (p *SyncAPIProducer) SendPresence( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendDeviceListUpdate( + ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent, +) (err error) { + m := nats.NewMsg(p.TopicDeviceListUpdate) + m.Header.Set(jetstream.UserID, deviceListUpdate.UserID) + m.Data, err = json.Marshal(deviceListUpdate) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + log.Debugf("Sending device list update: %+v", m.Header) + _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/federationapi/routing/devices.go b/federationapi/routing/devices.go index 1a092645f..2f9da1f25 100644 --- a/federationapi/routing/devices.go +++ b/federationapi/routing/devices.go @@ -85,6 +85,9 @@ func GetUserDevices( if targetKey, ok := targetUser[gomatrixserverlib.KeyID(dev.DeviceID)]; ok { for sourceUserID, forSourceUser := range targetKey { for sourceKeyID, sourceKey := range forSourceUser { + if device.Keys.Signatures == nil { + device.Keys.Signatures = map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{} + } if _, ok := device.Keys.Signatures[sourceUserID]; !ok { device.Keys.Signatures[sourceUserID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{} } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index b505c858c..54e14d014 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -512,11 +512,7 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli } else if serverName != t.Origin { return } - var inputRes keyapi.InputDeviceListUpdateResponse - t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{ - Event: payload, - }, &inputRes) - if inputRes.Error != nil { - util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate") + if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil { + util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate") } } diff --git a/go.mod b/go.mod index ea6e8caeb..b2a096751 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220607143425-e55d796fd0b3 + github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 diff --git a/go.sum b/go.sum index e21794f41..3a35c47da 100644 --- a/go.sum +++ b/go.sum @@ -418,8 +418,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220607143425-e55d796fd0b3 h1:2eYcBt8Kg+nW/xIJY5x8Uo2dQLjUF+oxLap00uFC5l8= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220607143425-e55d796fd0b3/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a h1:jOkrb6twViAGTHHadA51sQwdloHT0Vx1MCptk9InTHo= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/internal/version.go b/internal/version.go index 2543ec90c..e29996f36 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 8 - VersionPatch = 7 + VersionPatch = 8 VersionTag = "" // example: "rc1" ) diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 140f03569..c0a1eedbb 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -62,8 +62,6 @@ type FederationKeyAPI interface { QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse) QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse) - // InputDeviceListUpdate from a federated server EDU - InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse) PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) } @@ -337,11 +335,3 @@ type QuerySignaturesResponse struct { // The request error, if any Error *KeyError } - -type InputDeviceListUpdateRequest struct { - Event gomatrixserverlib.DeviceListUpdateEvent -} - -type InputDeviceListUpdateResponse struct { - Error *KeyError -} diff --git a/keyserver/consumers/devicelistupdate.go b/keyserver/consumers/devicelistupdate.go new file mode 100644 index 000000000..f4f246280 --- /dev/null +++ b/keyserver/consumers/devicelistupdate.go @@ -0,0 +1,82 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/keyserver/internal" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +// DeviceListUpdateConsumer consumes device list updates that came in over federation. +type DeviceListUpdateConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + updater *internal.DeviceListUpdater +} + +// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers. +func NewDeviceListUpdateConsumer( + process *process.ProcessContext, + cfg *config.KeyServer, + js nats.JetStreamContext, + updater *internal.DeviceListUpdater, +) *DeviceListUpdateConsumer { + return &DeviceListUpdateConsumer{ + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + updater: updater, + } +} + +// Start consuming from key servers +func (t *DeviceListUpdateConsumer) Start() error { + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +// onMessage is called in response to a message received on the +// key change events topic from the key server. +func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + var m gomatrixserverlib.DeviceListUpdateEvent + if err := json.Unmarshal(msg.Data, &m); err != nil { + logrus.WithError(err).Errorf("Failed to read from device list update input topic") + return true + } + err := t.updater.Update(ctx, m) + if err != nil { + logrus.WithFields(logrus.Fields{ + "user_id": m.UserID, + "device_id": m.DeviceID, + "stream_id": m.StreamID, + "prev_id": m.PrevID, + }).WithError(err).Errorf("Failed to update device list") + return false + } + return true +} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index f8d0d69c3..c146b2aa0 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) { a.UserAPI = i } -func (a *KeyInternalAPI) InputDeviceListUpdate( - ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, -) { - err := a.Updater.Update(ctx, req.Event) - if err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to update device list: %s", err), - } - } -} - func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset) if err != nil { diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go index abce81582..dac61d1ea 100644 --- a/keyserver/inthttp/client.go +++ b/keyserver/inthttp/client.go @@ -63,20 +63,6 @@ type httpKeyInternalAPI struct { func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) { // no-op: doesn't need it } -func (h *httpKeyInternalAPI) InputDeviceListUpdate( - ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, -) { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate") - defer span.Finish() - - apiURL := h.apiURL + InputDeviceListUpdatePath - err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) - if err != nil { - res.Error = &api.KeyError{ - Err: err.Error(), - } - } -} func (h *httpKeyInternalAPI) PerformClaimKeys( ctx context.Context, diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go index 8d557a768..5bf5976a8 100644 --- a/keyserver/inthttp/server.go +++ b/keyserver/inthttp/server.go @@ -25,17 +25,6 @@ import ( ) func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) { - internalAPIMux.Handle(InputDeviceListUpdatePath, - httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse { - request := api.InputDeviceListUpdateRequest{} - response := api.InputDeviceListUpdateResponse{} - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - s.InputDeviceListUpdate(req.Context(), &request, &response) - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) internalAPIMux.Handle(PerformClaimKeysPath, httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse { request := api.PerformClaimKeysRequest{} diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 3ffd3ba1e..cd506f981 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,6 +18,7 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" @@ -59,10 +60,17 @@ func NewInternalAPI( updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable ap.Updater = updater go func() { - if err := updater.Start(); err != nil { + if err = updater.Start(); err != nil { logrus.WithError(err).Panicf("failed to start device list updater") } }() + dlConsumer := consumers.NewDeviceListUpdateConsumer( + base.ProcessContext, cfg, js, updater, + ) + if err = dlConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start device list consumer") + } + return ap } diff --git a/roomserver/api/output.go b/roomserver/api/output.go index a82bf8701..36d0625c7 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -161,6 +161,8 @@ type OutputNewRoomEvent struct { // The transaction ID of the send request if sent by a local user and one // was specified TransactionID *TransactionID `json:"transaction_id,omitempty"` + // The history visibility of the event. + HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"` } func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) { @@ -187,7 +189,8 @@ func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.Headere // should build their current room state up from OutputNewRoomEvents only. type OutputOldRoomEvent struct { // The Event. - Event *gomatrixserverlib.HeaderedEvent `json:"event"` + Event *gomatrixserverlib.HeaderedEvent `json:"event"` + HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"` } // An OutputNewInviteEvent is written whenever an invite becomes active. diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index deb88ea82..ff05f798c 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -295,6 +295,22 @@ func (r *Inputer) processRoomEvent( } } + // Get the state before the event so that we can work out if the event was + // allowed at the time, and also to get the history visibility. We won't + // bother doing this if the event was already rejected as it just ends up + // burning CPU time. + historyVisibility := gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive. + if rejectionErr == nil && !isRejected && !softfail { + var err error + historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev) + if err != nil { + return fmt.Errorf("r.processStateBefore: %w", err) + } + if rejectionErr != nil { + isRejected = true + } + } + // Store the event. _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected || softfail) if err != nil { @@ -360,6 +376,7 @@ func (r *Inputer) processRoomEvent( input.SendAsServer, // send as server input.TransactionID, // transaction ID input.HasState, // rewrites state? + historyVisibility, // the history visibility before the event ); err != nil { return fmt.Errorf("r.updateLatestEvents: %w", err) } @@ -368,7 +385,8 @@ func (r *Inputer) processRoomEvent( { Type: api.OutputTypeOldRoomEvent, OldRoomEvent: &api.OutputOldRoomEvent{ - Event: headered, + Event: headered, + HistoryVisibility: historyVisibility, }, }, }) @@ -402,6 +420,100 @@ func (r *Inputer) processRoomEvent( return nil } +// processStateBefore works out what the state is before the event and +// then checks the event auths against the state at the time. It also +// tries to determine what the history visibility was of the event at +// the time, so that it can be sent in the output event to downstream +// components. +// nolint:nakedret +func (r *Inputer) processStateBefore( + ctx context.Context, + input *api.InputRoomEvent, + missingPrev bool, +) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) { + historyVisibility = gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive. + event := input.Event.Unwrap() + isCreateEvent := event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") + var stateBeforeEvent []*gomatrixserverlib.Event + switch { + case isCreateEvent: + // There's no state before a create event so there is nothing + // else to do. + return + case input.HasState: + // If we're overriding the state then we need to go and retrieve + // them from the database. It's a hard error if they are missing. + stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs) + if err != nil { + return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err) + } + stateBeforeEvent = make([]*gomatrixserverlib.Event, 0, len(stateEvents)) + for _, entry := range stateEvents { + stateBeforeEvent = append(stateBeforeEvent, entry.Event) + } + case missingPrev: + // We don't know all of the prev events, so we can't work out + // the state before the event. Reject it in that case. + rejectionErr = fmt.Errorf("event %q has missing prev events", event.EventID()) + return + case len(event.PrevEventIDs()) == 0: + // There should be prev events since it's not a create event. + // A non-create event that claims to have no prev events is + // invalid, so reject it. + rejectionErr = fmt.Errorf("event %q must have prev events", event.EventID()) + return + default: + // For all non-create events, there must be prev events, so we'll + // ask the query API for the relevant tuples needed for auth. We + // will include the history visibility here even though we don't + // actually need it for auth, because we want to send it in the + // output events. + tuplesNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event}).Tuples() + tuplesNeeded = append(tuplesNeeded, gomatrixserverlib.StateKeyTuple{ + EventType: gomatrixserverlib.MRoomHistoryVisibility, + StateKey: "", + }) + stateBeforeReq := &api.QueryStateAfterEventsRequest{ + RoomID: event.RoomID(), + PrevEventIDs: event.PrevEventIDs(), + StateToFetch: tuplesNeeded, + } + stateBeforeRes := &api.QueryStateAfterEventsResponse{} + if err := r.Queryer.QueryStateAfterEvents(ctx, stateBeforeReq, stateBeforeRes); err != nil { + return "", nil, fmt.Errorf("r.Queryer.QueryStateAfterEvents: %w", err) + } + switch { + case !stateBeforeRes.RoomExists: + rejectionErr = fmt.Errorf("room %q does not exist", event.RoomID()) + return + case !stateBeforeRes.PrevEventsExist: + rejectionErr = fmt.Errorf("prev events of %q are not known", event.EventID()) + return + default: + stateBeforeEvent = gomatrixserverlib.UnwrapEventHeaders(stateBeforeRes.StateEvents) + } + } + // At this point, stateBeforeEvent should be populated either by + // the supplied state in the input request, or from the prev events. + // Check whether the event is allowed or not. + stateBeforeAuth := gomatrixserverlib.NewAuthEvents(stateBeforeEvent) + if rejectionErr = gomatrixserverlib.Allowed(event, &stateBeforeAuth); rejectionErr != nil { + return + } + // Work out what the history visibility was at the time of the + // event. + for _, event := range stateBeforeEvent { + if event.Type() != gomatrixserverlib.MRoomHistoryVisibility || !event.StateKeyEquals("") { + continue + } + if hisVis, err := event.HistoryVisibility(); err == nil { + historyVisibility = hisVis + break + } + } + return +} + // fetchAuthEvents will check to see if any of the // auth events specified by the given event are unknown. If they are // then we will go off and request them from the federation and then diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 9738ed4e6..e76f4ba8d 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -56,6 +56,7 @@ func (r *Inputer) updateLatestEvents( sendAsServer string, transactionID *api.TransactionID, rewritesState bool, + historyVisibility gomatrixserverlib.HistoryVisibility, ) (err error) { span, ctx := opentracing.StartSpanFromContext(ctx, "updateLatestEvents") defer span.Finish() @@ -69,15 +70,16 @@ func (r *Inputer) updateLatestEvents( defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) u := latestEventsUpdater{ - ctx: ctx, - api: r, - updater: updater, - roomInfo: roomInfo, - stateAtEvent: stateAtEvent, - event: event, - sendAsServer: sendAsServer, - transactionID: transactionID, - rewritesState: rewritesState, + ctx: ctx, + api: r, + updater: updater, + roomInfo: roomInfo, + stateAtEvent: stateAtEvent, + event: event, + sendAsServer: sendAsServer, + transactionID: transactionID, + rewritesState: rewritesState, + historyVisibility: historyVisibility, } if err = u.doUpdateLatestEvents(); err != nil { @@ -119,6 +121,8 @@ type latestEventsUpdater struct { // The snapshots of current state before and after processing this event oldStateNID types.StateSnapshotNID newStateNID types.StateSnapshotNID + // The history visibility of the event itself (from the state before the event). + historyVisibility gomatrixserverlib.HistoryVisibility } func (u *latestEventsUpdater) doUpdateLatestEvents() error { @@ -365,12 +369,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } ore := api.OutputNewRoomEvent{ - Event: u.event.Headered(u.roomInfo.RoomVersion), - RewritesState: u.rewritesState, - LastSentEventID: u.lastEventIDSent, - LatestEventIDs: latestEventIDs, - TransactionID: u.transactionID, - SendAsServer: u.sendAsServer, + Event: u.event.Headered(u.roomInfo.RoomVersion), + RewritesState: u.rewritesState, + LastSentEventID: u.lastEventIDSent, + LatestEventIDs: latestEventIDs, + TransactionID: u.transactionID, + SendAsServer: u.sendAsServer, + HistoryVisibility: u.historyVisibility, } eventIDMap, err := u.stateEventMap() diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index cc4a9fff5..67dcfdf38 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -823,13 +823,39 @@ func (d *Database) handleRedactions( return nil, "", nil } + // Get the power level from the database, so we can verify the user is allowed to redact the event + powerLevels, err := d.GetStateEvent(ctx, event.RoomID(), gomatrixserverlib.MRoomPowerLevels, "") + if err != nil { + return nil, "", fmt.Errorf("d.GetStateEvent: %w", err) + } + pl, err := powerLevels.PowerLevels() + if err != nil { + return nil, "", fmt.Errorf("unable to get powerlevels for room: %w", err) + } + + redactUser := pl.UserLevel(redactionEvent.Sender()) + switch { + case redactUser >= pl.Redact: + // The power level of the redaction event’s sender is greater than or equal to the redact level. + case redactedEvent.Origin() == redactionEvent.Origin() && redactedEvent.Sender() == redactionEvent.Sender(): + // The domain of the redaction event’s sender matches that of the original event’s sender. + default: + return nil, "", nil + } + // mark the event as redacted + if redactionsArePermanent { + redactedEvent.Event = redactedEvent.Redact() + } + err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent) if err != nil { return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err) } - if redactionsArePermanent { - redactedEvent.Event = redactedEvent.Redact() + // NOTSPEC: sytest relies on this unspecced field existing :( + err = redactedEvent.SetUnsignedField("redacted_by", redactionEvent.EventID()) + if err != nil { + return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err) } // overwrite the eventJSON table err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON()) diff --git a/setup/base/sanity_unix.go b/setup/base/sanity_unix.go index 0c1543e0b..c630d3f19 100644 --- a/setup/base/sanity_unix.go +++ b/setup/base/sanity_unix.go @@ -15,8 +15,21 @@ func platformSanityChecks() { // If we run out of file descriptors, we might run into problems accessing // PostgreSQL amongst other things. Complain at startup if we think the // number of file descriptors is too low. - var rLimit syscall.Rlimit - if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 { + warn := func(rLimit *syscall.Rlimit) { logrus.Warnf("IMPORTANT: Process file descriptor limit is currently %d, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues", rLimit.Cur) } + var rLimit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 { + // The file descriptor count is too low. Let's try to raise it. + rLimit.Cur = 65535 + if err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + // We failed to raise it, so log an error. + logrus.WithError(err).Warn("IMPORTANT: Failed to raise the file descriptor limit") + warn(&rLimit) + } else if err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 { + // We think we successfully raised the limit, but a second call to + // get the limit told us that we didn't succeed. Log an error. + warn(&rLimit) + } + } } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 6594e6941..110808b1b 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -16,6 +16,7 @@ const ( var ( InputRoomEvent = "InputRoomEvent" + InputDeviceListUpdate = "InputDeviceListUpdate" OutputRoomEvent = "OutputRoomEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent" @@ -45,6 +46,11 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: InputDeviceListUpdate, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, { Name: OutputRoomEvent, Retention: nats.InterestPolicy, diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index d9fb9cf82..219b35e2c 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -43,9 +43,6 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT } func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) { -} -func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) { - } func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) { } diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 96438e184..d021d365d 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -97,7 +97,7 @@ func Context( state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil) // verify the user is allowed to see the context for this room/event for _, x := range state { - var hisVis string + var hisVis gomatrixserverlib.HistoryVisibility hisVis, err = x.HistoryVisibility() if err != nil { continue diff --git a/sytest-whitelist b/sytest-whitelist index 5f6797a3e..60a3b73f6 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -716,6 +716,8 @@ PUT /rooms/:room_id/redact/:event_id/:txn_id is idempotent Unnamed room comes with a name summary Named room comes with just joined member count summary Room summary only has 5 heroes +registration is idempotent, with username specified Setting state twice is idempotent Joining room twice is idempotent -Inbound federation can return missing events for shared visibility \ No newline at end of file +Inbound federation can return missing events for shared visibility +Inbound federation ignores redactions from invalid servers room > v3 \ No newline at end of file