From 0e26662a552b671ef201490defbbea59df4250c1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Feb 2022 13:45:48 +0000 Subject: [PATCH 1/8] Allow events to be un-rejected (#2159) * Allow un-rejecting an event later * SQL * Only un-reject, don't re-reject * Clarify ambiguous column reference --- roomserver/storage/postgres/events_table.go | 6 +++--- roomserver/storage/sqlite3/events_table.go | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index 6c3847752..ece1d9e3c 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -71,10 +71,10 @@ CREATE TABLE IF NOT EXISTS roomserver_events ( ` const insertEventSQL = "" + - "INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + + "INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + - " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique" + - " DO NOTHING" + + " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique DO UPDATE" + + " SET is_rejected = $8 WHERE e.is_rejected = FALSE" + " RETURNING event_nid, state_snapshot_nid" const selectEventSQL = "" + diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index e1e6a597c..cef09fe60 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -49,7 +49,8 @@ const eventsSchema = ` const insertEventSQL = ` INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT DO NOTHING + ON CONFLICT DO UPDATE + SET is_rejected = $8 WHERE is_rejected = 0 RETURNING event_nid, state_snapshot_nid; ` From 8a1dfffe3dc7e720964820301f58a7a9e50d5ee6 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Feb 2022 16:16:01 +0000 Subject: [PATCH 2/8] Various updates for renaming the `master` branch to `main` --- .github/PULL_REQUEST_TEMPLATE.md | 2 +- .github/workflows/codeql-analysis.yml | 30 +++++++++++++-------------- .github/workflows/tests.yml | 6 +++--- build.sh | 2 +- docs/CONTRIBUTING.md | 6 +++--- docs/p2p.md | 2 +- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 1204582e2..8014e9414 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -2,6 +2,6 @@ -* [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/master/docs/CONTRIBUTING.md#sign-off) +* [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off) Signed-off-by: `Your Name ` diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index a4ef8b395..de6c79ddc 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -2,9 +2,9 @@ name: "CodeQL" on: push: - branches: [master] + branches: [main] pull_request: - branches: [master] + branches: [main] jobs: analyze: @@ -14,21 +14,21 @@ jobs: strategy: fail-fast: false matrix: - language: ['go'] + language: ["go"] steps: - - name: Checkout repository - uses: actions/checkout@v2 - with: - fetch-depth: 2 + - name: Checkout repository + uses: actions/checkout@v2 + with: + fetch-depth: 2 - - run: git checkout HEAD^2 - if: ${{ github.event_name == 'pull_request' }} + - run: git checkout HEAD^2 + if: ${{ github.event_name == 'pull_request' }} - - name: Initialize CodeQL - uses: github/codeql-action/init@v1 - with: - languages: ${{ matrix.language }} + - name: Initialize CodeQL + uses: github/codeql-action/init@v1 + with: + languages: ${{ matrix.language }} - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ad5a2660c..4a1720295 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -2,7 +2,7 @@ name: Tests on: push: - branches: [ 'master' ] + branches: ["main"] pull_request: concurrency: @@ -33,7 +33,7 @@ jobs: path: dendrite # Attempt to check out the same branch of Complement as the PR. If it - # doesn't exist, fallback to master. + # doesn't exist, fallback to main. - name: Checkout complement shell: bash run: | @@ -68,4 +68,4 @@ jobs: name: Run Complement Tests env: COMPLEMENT_BASE_IMAGE: complement-dendrite:latest - working-directory: complement \ No newline at end of file + working-directory: complement diff --git a/build.sh b/build.sh index 8196fc653..700e6434f 100755 --- a/build.sh +++ b/build.sh @@ -7,7 +7,7 @@ if [ -d ".git" ] then export BUILD=`git rev-parse --short HEAD || ""` export BRANCH=`(git symbolic-ref --short HEAD | tr -d \/ ) || ""` - if [ "$BRANCH" = master ] + if [ "$BRANCH" = main ] then export BRANCH="" fi diff --git a/docs/CONTRIBUTING.md b/docs/CONTRIBUTING.md index ea4b2b27d..fe7127c76 100644 --- a/docs/CONTRIBUTING.md +++ b/docs/CONTRIBUTING.md @@ -37,7 +37,7 @@ If a job fails, click the "details" button and you should be taken to the job's logs. ![Click the details button on the failing build -step](https://raw.githubusercontent.com/matrix-org/dendrite/master/docs/images/details-button-location.jpg) +step](https://raw.githubusercontent.com/matrix-org/dendrite/main/docs/images/details-button-location.jpg) Scroll down to the failing step and you should see some log output. Scan the logs until you find what it's complaining about, fix it, submit a new commit, @@ -57,7 +57,7 @@ significant amount of CPU and RAM. Once the code builds, run [Sytest](https://github.com/matrix-org/sytest) according to the guide in -[docs/sytest.md](https://github.com/matrix-org/dendrite/blob/master/docs/sytest.md#using-a-sytest-docker-image) +[docs/sytest.md](https://github.com/matrix-org/dendrite/blob/main/docs/sytest.md#using-a-sytest-docker-image) so you can see whether something is being broken and whether there are newly passing tests. @@ -94,4 +94,4 @@ For more general questions please use We ask that everyone who contributes to the project signs off their contributions, in accordance with the -[DCO](https://github.com/matrix-org/matrix-doc/blob/master/CONTRIBUTING.rst#sign-off). +[DCO](https://github.com/matrix-org/matrix-doc/blob/main/CONTRIBUTING.rst#sign-off). diff --git a/docs/p2p.md b/docs/p2p.md index e858ba114..4e9a50524 100644 --- a/docs/p2p.md +++ b/docs/p2p.md @@ -6,7 +6,7 @@ These are the instructions for setting up P2P Dendrite, current as of May 2020. #### Build -- The `master` branch has a WASM-only binary for dendrite: `./cmd/dendritejs`. +- The `main` branch has a WASM-only binary for dendrite: `./cmd/dendritejs`. - Build it and copy assets to riot-web. ``` $ ./build-dendritejs.sh From bb39149ff8c0a14f312aec694f93902c6f409cc0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Feb 2022 16:18:16 +0000 Subject: [PATCH 3/8] Fix DendriteJS dockerfile --- build/docker/DendriteJS.Dockerfile | 74 +++++++++++++++--------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/build/docker/DendriteJS.Dockerfile b/build/docker/DendriteJS.Dockerfile index e8d742b7e..5e1cffcad 100644 --- a/build/docker/DendriteJS.Dockerfile +++ b/build/docker/DendriteJS.Dockerfile @@ -9,9 +9,9 @@ FROM golang:1.14-alpine AS gobuild # Download and build dendrite WORKDIR /build -ADD https://github.com/matrix-org/dendrite/archive/master.tar.gz /build/master.tar.gz -RUN tar xvfz master.tar.gz -WORKDIR /build/dendrite-master +ADD https://github.com/matrix-org/dendrite/archive/main.tar.gz /build/main.tar.gz +RUN tar xvfz main.tar.gz +WORKDIR /build/dendrite-main RUN GOOS=js GOARCH=wasm go build -o main.wasm ./cmd/dendritejs @@ -21,7 +21,7 @@ RUN apt-get update && apt-get -y install python # Download riot-web and libp2p repos WORKDIR /build -ADD https://github.com/matrix-org/go-http-js-libp2p/archive/master.tar.gz /build/libp2p.tar.gz +ADD https://github.com/matrix-org/go-http-js-libp2p/archive/main.tar.gz /build/libp2p.tar.gz RUN tar xvfz libp2p.tar.gz ADD https://github.com/vector-im/element-web/archive/matthew/p2p.tar.gz /build/p2p.tar.gz RUN tar xvfz p2p.tar.gz @@ -31,21 +31,21 @@ WORKDIR /build/element-web-matthew-p2p RUN yarn install RUN ln -s /build/go-http-js-libp2p-master /build/element-web-matthew-p2p/node_modules/go-http-js-libp2p RUN (cd node_modules/go-http-js-libp2p && yarn install) -COPY --from=gobuild /build/dendrite-master/main.wasm ./src/vector/dendrite.wasm +COPY --from=gobuild /build/dendrite-main/main.wasm ./src/vector/dendrite.wasm # build it all RUN yarn build:p2p SHELL ["/bin/bash", "-c"] RUN echo $'\ -{ \n\ + { \n\ "default_server_config": { \n\ - "m.homeserver": { \n\ - "base_url": "https://p2p.riot.im", \n\ - "server_name": "p2p.riot.im" \n\ - }, \n\ - "m.identity_server": { \n\ - "base_url": "https://vector.im" \n\ - } \n\ + "m.homeserver": { \n\ + "base_url": "https://p2p.riot.im", \n\ + "server_name": "p2p.riot.im" \n\ + }, \n\ + "m.identity_server": { \n\ + "base_url": "https://vector.im" \n\ + } \n\ }, \n\ "disable_custom_urls": false, \n\ "disable_guests": true, \n\ @@ -55,57 +55,57 @@ RUN echo $'\ "integrations_ui_url": "https://scalar.vector.im/", \n\ "integrations_rest_url": "https://scalar.vector.im/api", \n\ "integrations_widgets_urls": [ \n\ - "https://scalar.vector.im/_matrix/integrations/v1", \n\ - "https://scalar.vector.im/api", \n\ - "https://scalar-staging.vector.im/_matrix/integrations/v1", \n\ - "https://scalar-staging.vector.im/api", \n\ - "https://scalar-staging.riot.im/scalar/api" \n\ + "https://scalar.vector.im/_matrix/integrations/v1", \n\ + "https://scalar.vector.im/api", \n\ + "https://scalar-staging.vector.im/_matrix/integrations/v1", \n\ + "https://scalar-staging.vector.im/api", \n\ + "https://scalar-staging.riot.im/scalar/api" \n\ ], \n\ "integrations_jitsi_widget_url": "https://scalar.vector.im/api/widgets/jitsi.html", \n\ "bug_report_endpoint_url": "https://riot.im/bugreports/submit", \n\ "defaultCountryCode": "GB", \n\ "showLabsSettings": false, \n\ "features": { \n\ - "feature_pinning": "labs", \n\ - "feature_custom_status": "labs", \n\ - "feature_custom_tags": "labs", \n\ - "feature_state_counters": "labs" \n\ + "feature_pinning": "labs", \n\ + "feature_custom_status": "labs", \n\ + "feature_custom_tags": "labs", \n\ + "feature_state_counters": "labs" \n\ }, \n\ "default_federate": true, \n\ "default_theme": "light", \n\ "roomDirectory": { \n\ - "servers": [ \n\ - "matrix.org" \n\ - ] \n\ + "servers": [ \n\ + "matrix.org" \n\ + ] \n\ }, \n\ "welcomeUserId": "", \n\ "piwik": { \n\ - "url": "https://piwik.riot.im/", \n\ - "whitelistedHSUrls": ["https://matrix.org"], \n\ - "whitelistedISUrls": ["https://vector.im", "https://matrix.org"], \n\ - "siteId": 1 \n\ + "url": "https://piwik.riot.im/", \n\ + "whitelistedHSUrls": ["https://matrix.org"], \n\ + "whitelistedISUrls": ["https://vector.im", "https://matrix.org"], \n\ + "siteId": 1 \n\ }, \n\ "enable_presence_by_hs_url": { \n\ - "https://matrix.org": false, \n\ - "https://matrix-client.matrix.org": false \n\ + "https://matrix.org": false, \n\ + "https://matrix-client.matrix.org": false \n\ }, \n\ "settingDefaults": { \n\ - "breadcrumbs": true \n\ + "breadcrumbs": true \n\ } \n\ -}' > webapp/config.json + }' > webapp/config.json FROM nginx # Add "Service-Worker-Allowed: /" header so the worker can sniff traffic on this domain rather # than just the path this gets hosted under. NB this newline echo syntax only works on bash. SHELL ["/bin/bash", "-c"] RUN echo $'\ -server { \n\ + server { \n\ listen 80; \n\ add_header \'Service-Worker-Allowed\' \'/\'; \n\ location / { \n\ - root /usr/share/nginx/html; \n\ - index index.html index.htm; \n\ + root /usr/share/nginx/html; \n\ + index index.html index.htm; \n\ } \n\ -}' > /etc/nginx/conf.d/default.conf + }' > /etc/nginx/conf.d/default.conf RUN sed -i 's/}/ application\/wasm wasm;\n}/g' /etc/nginx/mime.types COPY --from=jsbuild /build/element-web-matthew-p2p/webapp /usr/share/nginx/html From a84f50f4fb0963e15b6aca60f658bed45b779127 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Feb 2022 16:49:49 +0000 Subject: [PATCH 4/8] Demote logging entry for backoff --- federationapi/queue/destinationqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 1306e8588..09814b31f 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -297,7 +297,7 @@ func (oq *destinationQueue) backgroundSend() { // We haven't backed off yet, so wait for the suggested amount of // time. duration := time.Until(*until) - logrus.Warnf("Backing off %q for %s", oq.destination, duration) + logrus.Debugf("Backing off %q for %s", oq.destination, duration) oq.backingOff.Store(true) destinationQueueBackingOff.Inc() select { From 457a07eac5d668a0f04c273e086d321cab7ea640 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Feb 2022 17:06:13 +0000 Subject: [PATCH 5/8] More relaxed auth event fetching (#2161) * Tweaks around auth event fetching * More tweaking --- roomserver/internal/input/input_events.go | 51 ++++++++++++++++------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 85189e476..bb35ade9c 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -195,9 +195,26 @@ func (r *Inputer) processRoomEvent( authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) for _, authEventID := range authEventIDs { if _, ok := knownEvents[authEventID]; !ok { - return rollbackTransaction, fmt.Errorf("missing auth event %s", authEventID) + // Unknown auth events only really matter if the event actually failed + // auth. If it passed auth then we can assume that everything that was + // known was sufficient, even if extraneous auth events were specified + // but weren't found. + if isRejected { + if event.StateKey() != nil { + return commitTransaction, fmt.Errorf( + "missing auth event %s for state event %s (type %q, state key %q)", + authEventID, event.EventID(), event.Type(), *event.StateKey(), + ) + } else { + return commitTransaction, fmt.Errorf( + "missing auth event %s for timeline event %s (type %q)", + authEventID, event.EventID(), event.Type(), + ) + } + } + } else { + authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID) } - authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID) } var softfail bool @@ -416,6 +433,10 @@ func (r *Inputer) fetchAuthEvents( return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers) } + // Reuse these to reduce allocations. + authEventNIDs := make([]types.EventNID, 0, 5) + isRejected := false +nextAuthEvent: for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering( res.AuthEvents, gomatrixserverlib.TopologicalOrderByAuthEvents, @@ -424,36 +445,30 @@ func (r *Inputer) fetchAuthEvents( // need to store it again or do anything further with it, so just skip // over it rather than wasting cycles. if ev, ok := known[authEvent.EventID()]; ok && ev != nil { - continue + continue nextAuthEvent } // Check the signatures of the event. If this fails then we'll simply // skip it, because gomatrixserverlib.Allowed() will notice a problem // if a critical event is missing anyway. if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { - continue + continue nextAuthEvent } // In order to store the new auth event, we need to know its auth chain // as NIDs for the `auth_event_nids` column. Let's see if we can find those. - authEventNIDs := make([]types.EventNID, 0, len(authEvent.AuthEventIDs())) + authEventNIDs = authEventNIDs[:0] for _, eventID := range authEvent.AuthEventIDs() { knownEvent, ok := known[eventID] if !ok { - return fmt.Errorf("missing auth event %s for %s", eventID, authEvent.EventID()) + continue nextAuthEvent } authEventNIDs = append(authEventNIDs, knownEvent.EventNID) } - // Let's take a note of the fact that we now know about this event. - if err := auth.AddEvent(authEvent); err != nil { - return fmt.Errorf("auth.AddEvent: %w", err) - } - // Check if the auth event should be rejected. - isRejected := false - if err := gomatrixserverlib.Allowed(authEvent, auth); err != nil { - isRejected = true + err := gomatrixserverlib.Allowed(authEvent, auth) + if isRejected = err != nil; isRejected { logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID()) } @@ -463,6 +478,14 @@ func (r *Inputer) fetchAuthEvents( return fmt.Errorf("updater.StoreEvent: %w", err) } + // Let's take a note of the fact that we now know about this event for + // authenticating future events. + if !isRejected { + if err := auth.AddEvent(authEvent); err != nil { + return fmt.Errorf("auth.AddEvent: %w", err) + } + } + // Now we know about this event, it was stored and the signatures were OK. known[authEvent.EventID()] = &types.Event{ EventNID: eventNID, From 2771d93748380aa7dc21adca0ef690348d79f002 Mon Sep 17 00:00:00 2001 From: S7evinK <2353100+S7evinK@users.noreply.github.com> Date: Tue, 8 Feb 2022 18:13:38 +0100 Subject: [PATCH 6/8] Remove OutputKeyChangeEvent consumer on keyserver (#2160) * Remove keyserver consumer * Remove keyserver from eduserver * Directly upload device keys without eduserver * Add passing tests --- eduserver/api/input.go | 6 -- eduserver/eduserver.go | 1 - eduserver/input/input.go | 31 ------- eduserver/inthttp/client.go | 20 +---- eduserver/inthttp/server.go | 13 --- federationapi/consumers/roomserver.go | 1 - federationapi/routing/send.go | 44 ++++++--- keyserver/consumers/cross_signing.go | 123 -------------------------- keyserver/internal/cross_signing.go | 50 +++++------ keyserver/keyserver.go | 8 -- syncapi/internal/keychange.go | 2 + sytest-whitelist | 1 + 12 files changed, 59 insertions(+), 241 deletions(-) delete mode 100644 keyserver/consumers/cross_signing.go diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 2fa253f4d..2aab107b2 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -100,10 +100,4 @@ type EDUServerInputAPI interface { request *InputReceiptEventRequest, response *InputReceiptEventResponse, ) error - - InputCrossSigningKeyUpdate( - ctx context.Context, - request *InputCrossSigningKeyUpdateRequest, - response *InputCrossSigningKeyUpdateResponse, - ) error } diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index febcf2864..9b7e21651 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -51,7 +51,6 @@ func NewInternalAPI( OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), - OutputKeyChangeEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 4f8ab3e34..e58f0dd34 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" - keyapi "github.com/matrix-org/dendrite/keyserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" @@ -40,8 +39,6 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // The kafka topic to output new receipt events to OutputReceiptEventTopic string - // The kafka topic to output new key change events to - OutputKeyChangeEventTopic string // kafka producer JetStream nats.JetStreamContext // Internal user query API @@ -80,34 +77,6 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent( return t.sendToDeviceEvent(ise) } -// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI -func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate( - ctx context.Context, - request *api.InputCrossSigningKeyUpdateRequest, - response *api.InputCrossSigningKeyUpdateResponse, -) error { - eventJSON, err := json.Marshal(&keyapi.DeviceMessage{ - Type: keyapi.TypeCrossSigningUpdate, - OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{ - CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, - }, - }) - if err != nil { - return err - } - - logrus.WithFields(logrus.Fields{ - "user_id": request.UserID, - }).Tracef("Producing to topic '%s'", t.OutputKeyChangeEventTopic) - - _, err = t.JetStream.PublishMsg(&nats.Msg{ - Subject: t.OutputKeyChangeEventTopic, - Header: nats.Header{}, - Data: eventJSON, - }) - return err -} - func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go index 9a6f483c2..0690ed827 100644 --- a/eduserver/inthttp/client.go +++ b/eduserver/inthttp/client.go @@ -12,10 +12,9 @@ import ( // HTTP paths for the internal HTTP APIs const ( - EDUServerInputTypingEventPath = "/eduserver/input" - EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" - EDUServerInputReceiptEventPath = "/eduserver/receipt" - EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate" + EDUServerInputTypingEventPath = "/eduserver/input" + EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" + EDUServerInputReceiptEventPath = "/eduserver/receipt" ) // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. @@ -69,16 +68,3 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent( apiURL := h.eduServerURL + EDUServerInputReceiptEventPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } - -// InputCrossSigningKeyUpdate implements EDUServerInputAPI -func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate( - ctx context.Context, - request *api.InputCrossSigningKeyUpdateRequest, - response *api.InputCrossSigningKeyUpdateResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate") - defer span.Finish() - - apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go index a50ca84f9..a34943750 100644 --- a/eduserver/inthttp/server.go +++ b/eduserver/inthttp/server.go @@ -51,17 +51,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) - internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath, - httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse { - var request api.InputCrossSigningKeyUpdateRequest - var response api.InputCrossSigningKeyUpdateResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) } diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index e9862000a..60066bb2f 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/types" diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 524fd510e..dd4fe13a8 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -382,20 +382,8 @@ func (t *txnReq) processEDUs(ctx context.Context) { } } case eduserverAPI.MSigningKeyUpdate: - var updatePayload eduserverAPI.CrossSigningKeyUpdate - if err := json.Unmarshal(e.Content, &updatePayload); err != nil { - util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ - "user_id": updatePayload.UserID, - }).Debug("Failed to send signing key update to edu server") - continue - } - inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{ - CrossSigningKeyUpdate: updatePayload, - } - inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{} - if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil { - util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update") - continue + if err := t.processSigningKeyUpdate(ctx, e); err != nil { + logrus.WithError(err).Errorf("Failed to process signing key update") } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") @@ -403,6 +391,34 @@ func (t *txnReq) processEDUs(ctx context.Context) { } } +func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error { + var updatePayload eduserverAPI.CrossSigningKeyUpdate + if err := json.Unmarshal(e.Content, &updatePayload); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "user_id": updatePayload.UserID, + }).Debug("Failed to unmarshal signing key update") + return err + } + + keys := gomatrixserverlib.CrossSigningKeys{} + if updatePayload.MasterKey != nil { + keys.MasterKey = *updatePayload.MasterKey + } + if updatePayload.SelfSigningKey != nil { + keys.SelfSigningKey = *updatePayload.SelfSigningKey + } + uploadReq := &keyapi.PerformUploadDeviceKeysRequest{ + CrossSigningKeys: keys, + UserID: updatePayload.UserID, + } + uploadRes := &keyapi.PerformUploadDeviceKeysResponse{} + t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes) + if uploadRes.Error != nil { + return uploadRes.Error + } + return nil +} + // processReceiptEvent sends receipt events to the edu server func (t *txnReq) processReceiptEvent(ctx context.Context, userID, roomID, receiptType string, diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go deleted file mode 100644 index aae69e960..000000000 --- a/keyserver/consumers/cross_signing.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2021 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/dendrite/keyserver/api" - "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -type OutputCrossSigningKeyUpdateConsumer struct { - ctx context.Context - keyDB storage.Database - keyAPI api.KeyInternalAPI - serverName string - jetstream nats.JetStreamContext - durable string - topic string -} - -func NewOutputCrossSigningKeyUpdateConsumer( - process *process.ProcessContext, - cfg *config.Dendrite, - js nats.JetStreamContext, - keyDB storage.Database, - keyAPI api.KeyInternalAPI, -) *OutputCrossSigningKeyUpdateConsumer { - // The keyserver both produces and consumes on the TopicOutputKeyChangeEvent - // topic. We will only produce events where the UserID matches our server name, - // and we will only consume events where the UserID does NOT match our server - // name (because the update came from a remote server). - s := &OutputCrossSigningKeyUpdateConsumer{ - ctx: process.Context(), - keyDB: keyDB, - jetstream: js, - durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"), - topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), - keyAPI: keyAPI, - serverName: string(cfg.Global.ServerName), - } - - return s -} - -func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ) -} - -// onMessage is called in response to a message received on the -// key change events topic from the key server. -func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var m api.DeviceMessage - if err := json.Unmarshal(msg.Data, &m); err != nil { - logrus.WithError(err).Errorf("failed to read device message from key change topic") - return true - } - if m.OutputCrossSigningKeyUpdate == nil { - // This probably shouldn't happen but stops us from panicking if we come - // across an update that doesn't satisfy either types. - return true - } - switch m.Type { - case api.TypeCrossSigningUpdate: - return t.onCrossSigningMessage(m) - default: - return true - } -} - -func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { - output := m.CrossSigningKeyUpdate - _, host, err := gomatrixserverlib.SplitID('@', output.UserID) - if err != nil { - logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") - return true - } - if host == gomatrixserverlib.ServerName(s.serverName) { - // Ignore any messages that contain information about our own users, as - // they already originated from this server. - return true - } - uploadReq := &api.PerformUploadDeviceKeysRequest{ - UserID: output.UserID, - } - if output.MasterKey != nil { - uploadReq.MasterKey = *output.MasterKey - } - if output.SelfSigningKey != nil { - uploadReq.SelfSigningKey = *output.SelfSigningKey - } - uploadRes := &api.PerformUploadDeviceKeysResponse{} - s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) - if uploadRes.Error != nil { - // If the error is due to a missing or invalid parameter then we'd might - // as well just acknowledge the message, because otherwise otherwise we'll - // just keep getting delivered a faulty message over and over again. - return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam - } - return true -} diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 1e1871b8b..527990cf9 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -219,25 +219,23 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P } // Finally, generate a notification that we updated the keys. - if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer { - update := eduserverAPI.CrossSigningKeyUpdate{ - UserID: req.UserID, - } - if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { - update.MasterKey = &mk - } - if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { - update.SelfSigningKey = &ssk - } - if update.MasterKey == nil && update.SelfSigningKey == nil { - return - } - if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), - } - return + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: req.UserID, + } + if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { + update.MasterKey = &mk + } + if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { + update.SelfSigningKey = &ssk + } + if update.MasterKey == nil && update.SelfSigningKey == nil { + return + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), } + return } } @@ -310,16 +308,14 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req // Finally, generate a notification that we updated the signatures. for userID := range req.Signatures { - if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer { - update := eduserverAPI.CrossSigningKeyUpdate{ - UserID: userID, - } - if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), - } - return + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: userID, + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), } + return } } } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 61ccc0303..bd36fd9f9 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,7 +18,6 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" - "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" @@ -65,12 +64,5 @@ func NewInternalAPI( } }() - keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( - base.ProcessContext, base.Cfg, js, db, ap, - ) - if err := keyconsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer") - } - return ap } diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index fa1064b70..37a9e2d39 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -282,6 +282,8 @@ func membershipEvents(res *types.Response) (joinUserIDs, leaveUserIDs []string) if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil { if strings.Contains(string(ev.Content), `"join"`) { joinUserIDs = append(joinUserIDs, *ev.StateKey) + } else if strings.Contains(string(ev.Content), `"invite"`) { + joinUserIDs = append(joinUserIDs, *ev.StateKey) } else if strings.Contains(string(ev.Content), `"leave"`) { leaveUserIDs = append(leaveUserIDs, *ev.StateKey) } else if strings.Contains(string(ev.Content), `"ban"`) { diff --git a/sytest-whitelist b/sytest-whitelist index 7d26c610e..c6ce1daad 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -590,3 +590,4 @@ Can reject invites over federation for rooms with version 9 Can receive redactions from regular users over federation in room version 9 Forward extremities remain so even after the next events are populated as outliers If a device list update goes missing, the server resyncs on the next one +uploading self-signing key notifies over federation From b4687f2ed24ae4f397e039776118c6efee306fa9 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 9 Feb 2022 11:24:49 +0000 Subject: [PATCH 7/8] Fix storage bug in PSQL events table --- roomserver/storage/postgres/events_table.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index ece1d9e3c..c136f039a 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -74,7 +74,7 @@ const insertEventSQL = "" + "INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique DO UPDATE" + - " SET is_rejected = $8 WHERE e.is_rejected = FALSE" + + " SET is_rejected = $8 WHERE e.event_id = $4 AND e.is_rejected = FALSE" + " RETURNING event_nid, state_snapshot_nid" const selectEventSQL = "" + @@ -192,7 +192,8 @@ func (s *eventStatements) InsertEvent( ) (types.EventNID, types.StateSnapshotNID, error) { var eventNID int64 var stateNID int64 - err := s.insertEventStmt.QueryRowContext( + stmt := sqlutil.TxStmt(txn, s.insertEventStmt) + err := stmt.QueryRowContext( ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected, From cf447dd52a0015c2c5b10813ed11e59a3712607e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 9 Feb 2022 11:41:21 +0000 Subject: [PATCH 8/8] Revert "Fix storage bug in PSQL events table" This reverts commit b4687f2ed24ae4f397e039776118c6efee306fa9. --- roomserver/storage/postgres/events_table.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index c136f039a..ece1d9e3c 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -74,7 +74,7 @@ const insertEventSQL = "" + "INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique DO UPDATE" + - " SET is_rejected = $8 WHERE e.event_id = $4 AND e.is_rejected = FALSE" + + " SET is_rejected = $8 WHERE e.is_rejected = FALSE" + " RETURNING event_nid, state_snapshot_nid" const selectEventSQL = "" + @@ -192,8 +192,7 @@ func (s *eventStatements) InsertEvent( ) (types.EventNID, types.StateSnapshotNID, error) { var eventNID int64 var stateNID int64 - stmt := sqlutil.TxStmt(txn, s.insertEventStmt) - err := stmt.QueryRowContext( + err := s.insertEventStmt.QueryRowContext( ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected,