Merge branch 'main' into logintoken

This commit is contained in:
kegsay 2022-02-10 10:09:12 +00:00 committed by GitHub
commit 8c8a55d9cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 594 additions and 583 deletions

View file

@ -2,6 +2,6 @@
<!-- Please read docs/CONTRIBUTING.md before submitting your pull request --> <!-- Please read docs/CONTRIBUTING.md before submitting your pull request -->
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/master/docs/CONTRIBUTING.md#sign-off) * [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off)
Signed-off-by: `Your Name <your@email.example.org>` Signed-off-by: `Your Name <your@email.example.org>`

View file

@ -2,9 +2,9 @@ name: "CodeQL"
on: on:
push: push:
branches: [master] branches: [main]
pull_request: pull_request:
branches: [master] branches: [main]
jobs: jobs:
analyze: analyze:
@ -14,21 +14,21 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
language: ['go'] language: ["go"]
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v2 uses: actions/checkout@v2
with: with:
fetch-depth: 2 fetch-depth: 2
- run: git checkout HEAD^2 - run: git checkout HEAD^2
if: ${{ github.event_name == 'pull_request' }} if: ${{ github.event_name == 'pull_request' }}
- name: Initialize CodeQL - name: Initialize CodeQL
uses: github/codeql-action/init@v1 uses: github/codeql-action/init@v1
with: with:
languages: ${{ matrix.language }} languages: ${{ matrix.language }}
- name: Perform CodeQL Analysis - name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1 uses: github/codeql-action/analyze@v1

View file

@ -2,7 +2,7 @@ name: Tests
on: on:
push: push:
branches: [ 'master' ] branches: ["main"]
pull_request: pull_request:
concurrency: concurrency:
@ -33,7 +33,7 @@ jobs:
path: dendrite path: dendrite
# Attempt to check out the same branch of Complement as the PR. If it # Attempt to check out the same branch of Complement as the PR. If it
# doesn't exist, fallback to master. # doesn't exist, fallback to main.
- name: Checkout complement - name: Checkout complement
shell: bash shell: bash
run: | run: |

View file

@ -7,7 +7,7 @@ if [ -d ".git" ]
then then
export BUILD=`git rev-parse --short HEAD || ""` export BUILD=`git rev-parse --short HEAD || ""`
export BRANCH=`(git symbolic-ref --short HEAD | tr -d \/ ) || ""` export BRANCH=`(git symbolic-ref --short HEAD | tr -d \/ ) || ""`
if [ "$BRANCH" = master ] if [ "$BRANCH" = main ]
then then
export BRANCH="" export BRANCH=""
fi fi

View file

@ -9,9 +9,9 @@ FROM golang:1.14-alpine AS gobuild
# Download and build dendrite # Download and build dendrite
WORKDIR /build WORKDIR /build
ADD https://github.com/matrix-org/dendrite/archive/master.tar.gz /build/master.tar.gz ADD https://github.com/matrix-org/dendrite/archive/main.tar.gz /build/main.tar.gz
RUN tar xvfz master.tar.gz RUN tar xvfz main.tar.gz
WORKDIR /build/dendrite-master WORKDIR /build/dendrite-main
RUN GOOS=js GOARCH=wasm go build -o main.wasm ./cmd/dendritejs RUN GOOS=js GOARCH=wasm go build -o main.wasm ./cmd/dendritejs
@ -21,7 +21,7 @@ RUN apt-get update && apt-get -y install python
# Download riot-web and libp2p repos # Download riot-web and libp2p repos
WORKDIR /build WORKDIR /build
ADD https://github.com/matrix-org/go-http-js-libp2p/archive/master.tar.gz /build/libp2p.tar.gz ADD https://github.com/matrix-org/go-http-js-libp2p/archive/main.tar.gz /build/libp2p.tar.gz
RUN tar xvfz libp2p.tar.gz RUN tar xvfz libp2p.tar.gz
ADD https://github.com/vector-im/element-web/archive/matthew/p2p.tar.gz /build/p2p.tar.gz ADD https://github.com/vector-im/element-web/archive/matthew/p2p.tar.gz /build/p2p.tar.gz
RUN tar xvfz p2p.tar.gz RUN tar xvfz p2p.tar.gz
@ -31,21 +31,21 @@ WORKDIR /build/element-web-matthew-p2p
RUN yarn install RUN yarn install
RUN ln -s /build/go-http-js-libp2p-master /build/element-web-matthew-p2p/node_modules/go-http-js-libp2p RUN ln -s /build/go-http-js-libp2p-master /build/element-web-matthew-p2p/node_modules/go-http-js-libp2p
RUN (cd node_modules/go-http-js-libp2p && yarn install) RUN (cd node_modules/go-http-js-libp2p && yarn install)
COPY --from=gobuild /build/dendrite-master/main.wasm ./src/vector/dendrite.wasm COPY --from=gobuild /build/dendrite-main/main.wasm ./src/vector/dendrite.wasm
# build it all # build it all
RUN yarn build:p2p RUN yarn build:p2p
SHELL ["/bin/bash", "-c"] SHELL ["/bin/bash", "-c"]
RUN echo $'\ RUN echo $'\
{ \n\ { \n\
"default_server_config": { \n\ "default_server_config": { \n\
"m.homeserver": { \n\ "m.homeserver": { \n\
"base_url": "https://p2p.riot.im", \n\ "base_url": "https://p2p.riot.im", \n\
"server_name": "p2p.riot.im" \n\ "server_name": "p2p.riot.im" \n\
}, \n\ }, \n\
"m.identity_server": { \n\ "m.identity_server": { \n\
"base_url": "https://vector.im" \n\ "base_url": "https://vector.im" \n\
} \n\ } \n\
}, \n\ }, \n\
"disable_custom_urls": false, \n\ "disable_custom_urls": false, \n\
"disable_guests": true, \n\ "disable_guests": true, \n\
@ -55,57 +55,57 @@ RUN echo $'\
"integrations_ui_url": "https://scalar.vector.im/", \n\ "integrations_ui_url": "https://scalar.vector.im/", \n\
"integrations_rest_url": "https://scalar.vector.im/api", \n\ "integrations_rest_url": "https://scalar.vector.im/api", \n\
"integrations_widgets_urls": [ \n\ "integrations_widgets_urls": [ \n\
"https://scalar.vector.im/_matrix/integrations/v1", \n\ "https://scalar.vector.im/_matrix/integrations/v1", \n\
"https://scalar.vector.im/api", \n\ "https://scalar.vector.im/api", \n\
"https://scalar-staging.vector.im/_matrix/integrations/v1", \n\ "https://scalar-staging.vector.im/_matrix/integrations/v1", \n\
"https://scalar-staging.vector.im/api", \n\ "https://scalar-staging.vector.im/api", \n\
"https://scalar-staging.riot.im/scalar/api" \n\ "https://scalar-staging.riot.im/scalar/api" \n\
], \n\ ], \n\
"integrations_jitsi_widget_url": "https://scalar.vector.im/api/widgets/jitsi.html", \n\ "integrations_jitsi_widget_url": "https://scalar.vector.im/api/widgets/jitsi.html", \n\
"bug_report_endpoint_url": "https://riot.im/bugreports/submit", \n\ "bug_report_endpoint_url": "https://riot.im/bugreports/submit", \n\
"defaultCountryCode": "GB", \n\ "defaultCountryCode": "GB", \n\
"showLabsSettings": false, \n\ "showLabsSettings": false, \n\
"features": { \n\ "features": { \n\
"feature_pinning": "labs", \n\ "feature_pinning": "labs", \n\
"feature_custom_status": "labs", \n\ "feature_custom_status": "labs", \n\
"feature_custom_tags": "labs", \n\ "feature_custom_tags": "labs", \n\
"feature_state_counters": "labs" \n\ "feature_state_counters": "labs" \n\
}, \n\ }, \n\
"default_federate": true, \n\ "default_federate": true, \n\
"default_theme": "light", \n\ "default_theme": "light", \n\
"roomDirectory": { \n\ "roomDirectory": { \n\
"servers": [ \n\ "servers": [ \n\
"matrix.org" \n\ "matrix.org" \n\
] \n\ ] \n\
}, \n\ }, \n\
"welcomeUserId": "", \n\ "welcomeUserId": "", \n\
"piwik": { \n\ "piwik": { \n\
"url": "https://piwik.riot.im/", \n\ "url": "https://piwik.riot.im/", \n\
"whitelistedHSUrls": ["https://matrix.org"], \n\ "whitelistedHSUrls": ["https://matrix.org"], \n\
"whitelistedISUrls": ["https://vector.im", "https://matrix.org"], \n\ "whitelistedISUrls": ["https://vector.im", "https://matrix.org"], \n\
"siteId": 1 \n\ "siteId": 1 \n\
}, \n\ }, \n\
"enable_presence_by_hs_url": { \n\ "enable_presence_by_hs_url": { \n\
"https://matrix.org": false, \n\ "https://matrix.org": false, \n\
"https://matrix-client.matrix.org": false \n\ "https://matrix-client.matrix.org": false \n\
}, \n\ }, \n\
"settingDefaults": { \n\ "settingDefaults": { \n\
"breadcrumbs": true \n\ "breadcrumbs": true \n\
} \n\ } \n\
}' > webapp/config.json }' > webapp/config.json
FROM nginx FROM nginx
# Add "Service-Worker-Allowed: /" header so the worker can sniff traffic on this domain rather # Add "Service-Worker-Allowed: /" header so the worker can sniff traffic on this domain rather
# than just the path this gets hosted under. NB this newline echo syntax only works on bash. # than just the path this gets hosted under. NB this newline echo syntax only works on bash.
SHELL ["/bin/bash", "-c"] SHELL ["/bin/bash", "-c"]
RUN echo $'\ RUN echo $'\
server { \n\ server { \n\
listen 80; \n\ listen 80; \n\
add_header \'Service-Worker-Allowed\' \'/\'; \n\ add_header \'Service-Worker-Allowed\' \'/\'; \n\
location / { \n\ location / { \n\
root /usr/share/nginx/html; \n\ root /usr/share/nginx/html; \n\
index index.html index.htm; \n\ index index.html index.htm; \n\
} \n\ } \n\
}' > /etc/nginx/conf.d/default.conf }' > /etc/nginx/conf.d/default.conf
RUN sed -i 's/}/ application\/wasm wasm;\n}/g' /etc/nginx/mime.types RUN sed -i 's/}/ application\/wasm wasm;\n}/g' /etc/nginx/mime.types
COPY --from=jsbuild /build/element-web-matthew-p2p/webapp /usr/share/nginx/html COPY --from=jsbuild /build/element-web-matthew-p2p/webapp /usr/share/nginx/html

View file

@ -2,6 +2,10 @@ FROM golang:1.16-stretch as build
RUN apt-get update && apt-get install -y sqlite3 RUN apt-get update && apt-get install -y sqlite3
WORKDIR /build WORKDIR /build
# we will dump the binaries and config file to this location to ensure any local untracked files
# that come from the COPY . . file don't contaminate the build
RUN mkdir /dendrite
# Utilise Docker caching when downloading dependencies, this stops us needlessly # Utilise Docker caching when downloading dependencies, this stops us needlessly
# downloading dependencies every time. # downloading dependencies every time.
COPY go.mod . COPY go.mod .
@ -9,9 +13,11 @@ COPY go.sum .
RUN go mod download RUN go mod download
COPY . . COPY . .
RUN go build ./cmd/dendrite-monolith-server RUN go build -o /dendrite ./cmd/dendrite-monolith-server
RUN go build ./cmd/generate-keys RUN go build -o /dendrite ./cmd/generate-keys
RUN go build ./cmd/generate-config RUN go build -o /dendrite ./cmd/generate-config
WORKDIR /dendrite
RUN ./generate-keys --private-key matrix_key.pem RUN ./generate-keys --private-key matrix_key.pem
ENV SERVER_NAME=localhost ENV SERVER_NAME=localhost

View file

@ -17,6 +17,7 @@ package routing
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"net/http" "net/http"
"time" "time"
@ -459,13 +460,7 @@ func SendForget(
if membershipRes.IsInRoom { if membershipRes.IsInRoom {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
JSON: jsonerror.Forbidden("user is still a member of the room"), JSON: jsonerror.Unknown(fmt.Sprintf("User %s is in room %s", device.UserID, roomID)),
}
}
if !membershipRes.HasBeenInRoom {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Forbidden("user did not belong to room"),
} }
} }

View file

@ -37,7 +37,7 @@ If a job fails, click the "details" button and you should be taken to the job's
logs. logs.
![Click the details button on the failing build ![Click the details button on the failing build
step](https://raw.githubusercontent.com/matrix-org/dendrite/master/docs/images/details-button-location.jpg) step](https://raw.githubusercontent.com/matrix-org/dendrite/main/docs/images/details-button-location.jpg)
Scroll down to the failing step and you should see some log output. Scan the Scroll down to the failing step and you should see some log output. Scan the
logs until you find what it's complaining about, fix it, submit a new commit, logs until you find what it's complaining about, fix it, submit a new commit,
@ -57,7 +57,7 @@ significant amount of CPU and RAM.
Once the code builds, run [Sytest](https://github.com/matrix-org/sytest) Once the code builds, run [Sytest](https://github.com/matrix-org/sytest)
according to the guide in according to the guide in
[docs/sytest.md](https://github.com/matrix-org/dendrite/blob/master/docs/sytest.md#using-a-sytest-docker-image) [docs/sytest.md](https://github.com/matrix-org/dendrite/blob/main/docs/sytest.md#using-a-sytest-docker-image)
so you can see whether something is being broken and whether there are newly so you can see whether something is being broken and whether there are newly
passing tests. passing tests.
@ -94,4 +94,4 @@ For more general questions please use
We ask that everyone who contributes to the project signs off their We ask that everyone who contributes to the project signs off their
contributions, in accordance with the contributions, in accordance with the
[DCO](https://github.com/matrix-org/matrix-doc/blob/master/CONTRIBUTING.rst#sign-off). [DCO](https://github.com/matrix-org/matrix-doc/blob/main/CONTRIBUTING.rst#sign-off).

View file

@ -6,7 +6,7 @@ These are the instructions for setting up P2P Dendrite, current as of May 2020.
#### Build #### Build
- The `master` branch has a WASM-only binary for dendrite: `./cmd/dendritejs`. - The `main` branch has a WASM-only binary for dendrite: `./cmd/dendritejs`.
- Build it and copy assets to riot-web. - Build it and copy assets to riot-web.
``` ```
$ ./build-dendritejs.sh $ ./build-dendritejs.sh

View file

@ -100,10 +100,4 @@ type EDUServerInputAPI interface {
request *InputReceiptEventRequest, request *InputReceiptEventRequest,
response *InputReceiptEventResponse, response *InputReceiptEventResponse,
) error ) error
InputCrossSigningKeyUpdate(
ctx context.Context,
request *InputCrossSigningKeyUpdateRequest,
response *InputCrossSigningKeyUpdateResponse,
) error
} }

View file

@ -51,7 +51,6 @@ func NewInternalAPI(
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
OutputKeyChangeEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -40,8 +39,6 @@ type EDUServerInputAPI struct {
OutputSendToDeviceEventTopic string OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to // The kafka topic to output new receipt events to
OutputReceiptEventTopic string OutputReceiptEventTopic string
// The kafka topic to output new key change events to
OutputKeyChangeEventTopic string
// kafka producer // kafka producer
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
// Internal user query API // Internal user query API
@ -80,34 +77,6 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
return t.sendToDeviceEvent(ise) return t.sendToDeviceEvent(ise)
} }
// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
ctx context.Context,
request *api.InputCrossSigningKeyUpdateRequest,
response *api.InputCrossSigningKeyUpdateResponse,
) error {
eventJSON, err := json.Marshal(&keyapi.DeviceMessage{
Type: keyapi.TypeCrossSigningUpdate,
OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
},
})
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"user_id": request.UserID,
}).Tracef("Producing to topic '%s'", t.OutputKeyChangeEventTopic)
_, err = t.JetStream.PublishMsg(&nats.Msg{
Subject: t.OutputKeyChangeEventTopic,
Header: nats.Header{},
Data: eventJSON,
})
return err
}
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{ ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping, Type: gomatrixserverlib.MTyping,

View file

@ -12,10 +12,9 @@ import (
// HTTP paths for the internal HTTP APIs // HTTP paths for the internal HTTP APIs
const ( const (
EDUServerInputTypingEventPath = "/eduserver/input" EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
EDUServerInputReceiptEventPath = "/eduserver/receipt" EDUServerInputReceiptEventPath = "/eduserver/receipt"
EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate"
) )
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@ -69,16 +68,3 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// InputCrossSigningKeyUpdate implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate(
ctx context.Context,
request *api.InputCrossSigningKeyUpdateRequest,
response *api.InputCrossSigningKeyUpdateResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -51,17 +51,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath,
httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
var request api.InputCrossSigningKeyUpdateRequest
var response api.InputCrossSigningKeyUpdateResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
} }

View file

@ -134,7 +134,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.M
return true return true
} }
log.Infof("Sending send-to-device message into %q destination queue", destServerName) log.Debugf("Sending send-to-device message into %q destination queue", destServerName)
if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
log.WithError(err).Error("failed to send EDU") log.WithError(err).Error("failed to send EDU")
return false return false

View file

@ -127,6 +127,9 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
return true return true
} }
if len(destinations) == 0 {
return true
}
// Pack the EDU and marshal it // Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDeviceListUpdate, Type: gomatrixserverlib.MDeviceListUpdate,
@ -146,7 +149,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
return true return true
} }
logger.Infof("Sending device list update message to %q", destinations) logger.Debugf("Sending device list update message to %q", destinations)
err = t.queues.SendEDU(edu, t.serverName, destinations) err = t.queues.SendEDU(edu, t.serverName, destinations)
return err == nil return err == nil
} }
@ -181,6 +184,10 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
return true return true
} }
if len(destinations) == 0 {
return true
}
// Pack the EDU and marshal it // Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: eduserverAPI.MSigningKeyUpdate, Type: eduserverAPI.MSigningKeyUpdate,
@ -191,7 +198,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
return true return true
} }
logger.Infof("Sending cross-signing update message to %q", destinations) logger.Debugf("Sending cross-signing update message to %q", destinations)
err = t.queues.SendEDU(edu, t.serverName, destinations) err = t.queues.SendEDU(edu, t.serverName, destinations)
return err == nil return err == nil
} }

View file

@ -114,11 +114,6 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
} }
} }
case api.OutputTypeNewInviteEvent:
log.WithField("type", output.Type).Debug(
"received new invite, send device keys",
)
case api.OutputTypeNewInboundPeek: case api.OutputTypeNewInboundPeek:
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil { if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{

View file

@ -201,7 +201,6 @@ func (r *FederationInternalAPI) performJoinUsingServer(
context.Background(), context.Background(),
serverName, serverName,
event, event,
respMakeJoin.RoomVersion,
) )
if err != nil { if err != nil {
r.statistics.ForServer(serverName).Failure() r.statistics.ForServer(serverName).Failure()
@ -209,9 +208,11 @@ func (r *FederationInternalAPI) performJoinUsingServer(
} }
r.statistics.ForServer(serverName).Success() r.statistics.ForServer(serverName).Success()
authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion)
// Sanity-check the join response to ensure that it has a create // Sanity-check the join response to ensure that it has a create
// event, that the room version is known, etc. // event, that the room version is known, etc.
if err = sanityCheckAuthChain(respSendJoin.AuthEvents); err != nil { if err = sanityCheckAuthChain(authEvents); err != nil {
return fmt.Errorf("sanityCheckAuthChain: %w", err) return fmt.Errorf("sanityCheckAuthChain: %w", err)
} }
@ -225,6 +226,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
var respState *gomatrixserverlib.RespState var respState *gomatrixserverlib.RespState
respState, err = respSendJoin.Check( respState, err = respSendJoin.Check(
context.Background(), context.Background(),
respMakeJoin.RoomVersion,
r.keyRing, r.keyRing,
event, event,
federatedAuthProvider(ctx, r.federation, r.keyRing, serverName), federatedAuthProvider(ctx, r.federation, r.keyRing, serverName),
@ -392,12 +394,13 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer(
ctx = context.Background() ctx = context.Background()
respState := respPeek.ToRespState() respState := respPeek.ToRespState()
authEvents := respState.AuthEvents.UntrustedEvents(respPeek.RoomVersion)
// authenticate the state returned (check its auth events etc) // authenticate the state returned (check its auth events etc)
// the equivalent of CheckSendJoinResponse() // the equivalent of CheckSendJoinResponse()
if err = sanityCheckAuthChain(respState.AuthEvents); err != nil { if err = sanityCheckAuthChain(authEvents); err != nil {
return fmt.Errorf("sanityCheckAuthChain: %w", err) return fmt.Errorf("sanityCheckAuthChain: %w", err)
} }
if err = respState.Check(ctx, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName)); err != nil { if err = respState.Check(ctx, respPeek.RoomVersion, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName)); err != nil {
return fmt.Errorf("error checking state returned from peeking: %w", err) return fmt.Errorf("error checking state returned from peeking: %w", err)
} }
@ -549,10 +552,15 @@ func (r *FederationInternalAPI) PerformInvite(
inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq) inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq)
if err != nil { if err != nil {
return fmt.Errorf("r.federation.SendInviteV2: %w", err) return fmt.Errorf("r.federation.SendInviteV2: failed to send invite: %w", err)
} }
logrus.Infof("GOT INVITE RESPONSE %s", string(inviteRes.Event))
response.Event = inviteRes.Event.Headered(request.RoomVersion) inviteEvent, err := inviteRes.Event.UntrustedEvent(request.RoomVersion)
if err != nil {
return fmt.Errorf("r.federation.SendInviteV2 failed to decode event response: %w", err)
}
response.Event = inviteEvent.Headered(request.RoomVersion)
return nil return nil
} }

View file

@ -387,14 +387,7 @@ func (h *httpFederationInternalAPI) LookupMissingEvents(
if request.Err != nil { if request.Err != nil {
return res, request.Err return res, request.Err
} }
res.Events = make([]*gomatrixserverlib.Event, 0, len(request.Res.Events)) res.Events = request.Res.Events
for _, js := range request.Res.Events {
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(js, roomVersion)
if err != nil {
return res, err
}
res.Events = append(res.Events, ev)
}
return res, nil return res, nil
} }

View file

@ -297,7 +297,7 @@ func (oq *destinationQueue) backgroundSend() {
// We haven't backed off yet, so wait for the suggested amount of // We haven't backed off yet, so wait for the suggested amount of
// time. // time.
duration := time.Until(*until) duration := time.Until(*until)
logrus.Warnf("Backing off %q for %s", oq.destination, duration) logrus.Debugf("Backing off %q for %s", oq.destination, duration)
oq.backingOff.Store(true) oq.backingOff.Store(true)
destinationQueueBackingOff.Inc() destinationQueueBackingOff.Inc()
select { select {

View file

@ -65,7 +65,7 @@ func GetEventAuth(
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: gomatrixserverlib.RespEventAuth{ JSON: gomatrixserverlib.RespEventAuth{
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), AuthEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(response.AuthChainEvents),
}, },
} }
} }

View file

@ -178,12 +178,12 @@ func processInvite(
if isInviteV2 { if isInviteV2 {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: gomatrixserverlib.RespInviteV2{Event: &signedEvent}, JSON: gomatrixserverlib.RespInviteV2{Event: signedEvent.JSON()},
} }
} else { } else {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: gomatrixserverlib.RespInvite{Event: &signedEvent}, JSON: gomatrixserverlib.RespInvite{Event: signedEvent.JSON()},
} }
} }
default: default:

View file

@ -351,8 +351,8 @@ func SendJoin(
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: gomatrixserverlib.RespSendJoin{ JSON: gomatrixserverlib.RespSendJoin{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateAndAuthChainResponse.StateEvents), StateEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(stateAndAuthChainResponse.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(stateAndAuthChainResponse.AuthChainEvents), AuthEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(stateAndAuthChainResponse.AuthChainEvents),
Origin: cfg.Matrix.ServerName, Origin: cfg.Matrix.ServerName,
}, },
} }

View file

@ -62,7 +62,7 @@ func GetMissingEvents(
eventsResponse.Events = filterEvents(eventsResponse.Events, roomID) eventsResponse.Events = filterEvents(eventsResponse.Events, roomID)
resp := gomatrixserverlib.RespMissingEvents{ resp := gomatrixserverlib.RespMissingEvents{
Events: gomatrixserverlib.UnwrapEventHeaders(eventsResponse.Events), Events: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(eventsResponse.Events),
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -88,8 +88,8 @@ func Peek(
} }
respPeek := gomatrixserverlib.RespPeek{ respPeek := gomatrixserverlib.RespPeek{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), StateEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), AuthEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(response.AuthChainEvents),
RoomVersion: response.RoomVersion, RoomVersion: response.RoomVersion,
LatestEvent: response.LatestEvent.Unwrap(), LatestEvent: response.LatestEvent.Unwrap(),
RenewalInterval: renewalInterval, RenewalInterval: renewalInterval,

View file

@ -382,20 +382,8 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
case eduserverAPI.MSigningKeyUpdate: case eduserverAPI.MSigningKeyUpdate:
var updatePayload eduserverAPI.CrossSigningKeyUpdate if err := t.processSigningKeyUpdate(ctx, e); err != nil {
if err := json.Unmarshal(e.Content, &updatePayload); err != nil { logrus.WithError(err).Errorf("Failed to process signing key update")
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
}).Debug("Failed to send signing key update to edu server")
continue
}
inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
CrossSigningKeyUpdate: updatePayload,
}
inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update")
continue
} }
default: default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
@ -403,6 +391,34 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
var updatePayload eduserverAPI.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
}).Debug("Failed to unmarshal signing key update")
return err
}
keys := gomatrixserverlib.CrossSigningKeys{}
if updatePayload.MasterKey != nil {
keys.MasterKey = *updatePayload.MasterKey
}
if updatePayload.SelfSigningKey != nil {
keys.SelfSigningKey = *updatePayload.SelfSigningKey
}
uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
CrossSigningKeys: keys,
UserID: updatePayload.UserID,
}
uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes)
if uploadRes.Error != nil {
return uploadRes.Error
}
return nil
}
// processReceiptEvent sends receipt events to the edu server // processReceiptEvent sends receipt events to the edu server
func (t *txnReq) processReceiptEvent(ctx context.Context, func (t *txnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string, userID, roomID, receiptType string,

View file

@ -35,12 +35,15 @@ func GetState(
return *err return *err
} }
state, err := getState(ctx, request, rsAPI, roomID, eventID) stateEvents, authChain, err := getState(ctx, request, rsAPI, roomID, eventID)
if err != nil { if err != nil {
return *err return *err
} }
return util.JSONResponse{Code: http.StatusOK, JSON: state} return util.JSONResponse{Code: http.StatusOK, JSON: &gomatrixserverlib.RespState{
AuthEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(authChain),
StateEvents: gomatrixserverlib.NewEventJSONsFromHeaderedEvents(stateEvents),
}}
} }
// GetStateIDs returns state event IDs & auth event IDs for the roomID, eventID // GetStateIDs returns state event IDs & auth event IDs for the roomID, eventID
@ -55,13 +58,13 @@ func GetStateIDs(
return *err return *err
} }
state, err := getState(ctx, request, rsAPI, roomID, eventID) stateEvents, authEvents, err := getState(ctx, request, rsAPI, roomID, eventID)
if err != nil { if err != nil {
return *err return *err
} }
stateEventIDs := getIDsFromEvent(state.StateEvents) stateEventIDs := getIDsFromEvent(stateEvents)
authEventIDs := getIDsFromEvent(state.AuthEvents) authEventIDs := getIDsFromEvent(authEvents)
return util.JSONResponse{Code: http.StatusOK, JSON: gomatrixserverlib.RespStateIDs{ return util.JSONResponse{Code: http.StatusOK, JSON: gomatrixserverlib.RespStateIDs{
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
@ -97,18 +100,18 @@ func getState(
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
roomID string, roomID string,
eventID string, eventID string,
) (*gomatrixserverlib.RespState, *util.JSONResponse) { ) (stateEvents, authEvents []*gomatrixserverlib.HeaderedEvent, errRes *util.JSONResponse) {
event, resErr := fetchEvent(ctx, rsAPI, eventID) event, resErr := fetchEvent(ctx, rsAPI, eventID)
if resErr != nil { if resErr != nil {
return nil, resErr return nil, nil, resErr
} }
if event.RoomID() != roomID { if event.RoomID() != roomID {
return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: jsonerror.NotFound("event does not belong to this room")} return nil, nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: jsonerror.NotFound("event does not belong to this room")}
} }
resErr = allowedToSeeEvent(ctx, request.Origin(), rsAPI, eventID) resErr = allowedToSeeEvent(ctx, request.Origin(), rsAPI, eventID)
if resErr != nil { if resErr != nil {
return nil, resErr return nil, nil, resErr
} }
var response api.QueryStateAndAuthChainResponse var response api.QueryStateAndAuthChainResponse
@ -123,20 +126,17 @@ func getState(
) )
if err != nil { if err != nil {
resErr := util.ErrorResponse(err) resErr := util.ErrorResponse(err)
return nil, &resErr return nil, nil, &resErr
} }
if !response.RoomExists { if !response.RoomExists {
return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} return nil, nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
} }
return &gomatrixserverlib.RespState{ return response.StateEvents, response.AuthChainEvents, nil
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
}, nil
} }
func getIDsFromEvent(events []*gomatrixserverlib.Event) []string { func getIDsFromEvent(events []*gomatrixserverlib.HeaderedEvent) []string {
IDs := make([]string, len(events)) IDs := make([]string, len(events))
for i := range events { for i := range events {
IDs[i] = events[i].EventID() IDs[i] = events[i].EventID()

View file

@ -170,13 +170,18 @@ func ExchangeThirdPartyInvite(
util.GetLogger(httpReq.Context()).WithError(err).Error("federation.SendInvite failed") util.GetLogger(httpReq.Context()).WithError(err).Error("federation.SendInvite failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
inviteEvent, err := signedEvent.Event.UntrustedEvent(verRes.RoomVersion)
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("federation.SendInvite failed")
return jsonerror.InternalServerError()
}
// Send the event to the roomserver // Send the event to the roomserver
if err = api.SendEvents( if err = api.SendEvents(
httpReq.Context(), rsAPI, httpReq.Context(), rsAPI,
api.KindNew, api.KindNew,
[]*gomatrixserverlib.HeaderedEvent{ []*gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion), inviteEvent.Headered(verRes.RoomVersion),
}, },
request.Origin(), request.Origin(),
cfg.Matrix.ServerName, cfg.Matrix.ServerName,

8
go.mod
View file

@ -41,7 +41,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220204110702-c559d2019275 github.com/matrix-org/gomatrixserverlib v0.0.0-20220209202448-9805ef634335
github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.10 github.com/mattn/go-sqlite3 v1.14.10
@ -59,17 +59,17 @@ require (
github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.3 // indirect
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
github.com/tidwall/gjson v1.13.0 github.com/tidwall/gjson v1.14.0
github.com/tidwall/sjson v1.2.4 github.com/tidwall/sjson v1.2.4
github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.4.2 github.com/yggdrasil-network/yggdrasil-go v0.4.2
go.uber.org/atomic v1.9.0 go.uber.org/atomic v1.9.0
golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8 golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a
golang.org/x/image v0.0.0-20211028202545-6944b10bf410 golang.org/x/image v0.0.0-20211028202545-6944b10bf410
golang.org/x/mobile v0.0.0-20220112015953-858099ff7816 golang.org/x/mobile v0.0.0-20220112015953-858099ff7816
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
gopkg.in/h2non/bimg.v1 v1.1.5 gopkg.in/h2non/bimg.v1 v1.1.5
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0

16
go.sum
View file

@ -983,8 +983,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220204110702-c559d2019275 h1:f6Hh7D3EOTl1uUr76FiyHNA1h4pKBhcVUtyHbxn0hKA= github.com/matrix-org/gomatrixserverlib v0.0.0-20220209202448-9805ef634335 h1:xzK9Q9VGqsZNGx5ANFOCWkJ8R+W1J2BOguxsVZw6m8M=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220204110702-c559d2019275/go.mod h1:qFvhfbQ5orQxlH9vCiFnP4dW27xxnWHdNUBKyj/fbiY= github.com/matrix-org/gomatrixserverlib v0.0.0-20220209202448-9805ef634335/go.mod h1:qFvhfbQ5orQxlH9vCiFnP4dW27xxnWHdNUBKyj/fbiY=
github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf h1:/nqfHUdQHr3WVdbZieaYFvHF1rin5pvDTa/NOZ/qCyE= github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf h1:/nqfHUdQHr3WVdbZieaYFvHF1rin5pvDTa/NOZ/qCyE=
github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
@ -1363,8 +1363,8 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= github.com/tidwall/gjson v1.14.0 h1:6aeJ0bzojgWLa82gDQHcx3S0Lr/O51I9bJ5nv6JFx5w=
github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.14.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
@ -1509,8 +1509,8 @@ golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8 h1:kACShD3qhmr/3rLmg1yXyt+N4HcwutKyPRB93s54TIU= golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a h1:atOEWVSedO4ksXBe/UrlbSLVxQQ9RxM/tT2Jy10IaHo=
golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -1734,8 +1734,8 @@ golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= golang.org/x/sys v0.0.0-20220207234003-57398862261d h1:Bm7BNOQt2Qv7ZqysjeLjgCBanX+88Z/OtdvsrEv1Djc=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220207234003-57398862261d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=

View file

@ -1,123 +0,0 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
type OutputCrossSigningKeyUpdateConsumer struct {
ctx context.Context
keyDB storage.Database
keyAPI api.KeyInternalAPI
serverName string
jetstream nats.JetStreamContext
durable string
topic string
}
func NewOutputCrossSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
js nats.JetStreamContext,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputCrossSigningKeyUpdateConsumer {
// The keyserver both produces and consumes on the TopicOutputKeyChangeEvent
// topic. We will only produce events where the UserID matches our server name,
// and we will only consume events where the UserID does NOT match our server
// name (because the update came from a remote server).
s := &OutputCrossSigningKeyUpdateConsumer{
ctx: process.Context(),
keyDB: keyDB,
jetstream: js,
durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
keyAPI: keyAPI,
serverName: string(cfg.Global.ServerName),
}
return s
}
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return true
}
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
default:
return true
}
}
func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
return true
}
if host == gomatrixserverlib.ServerName(s.serverName) {
// Ignore any messages that contain information about our own users, as
// they already originated from this server.
return true
}
uploadReq := &api.PerformUploadDeviceKeysRequest{
UserID: output.UserID,
}
if output.MasterKey != nil {
uploadReq.MasterKey = *output.MasterKey
}
if output.SelfSigningKey != nil {
uploadReq.SelfSigningKey = *output.SelfSigningKey
}
uploadRes := &api.PerformUploadDeviceKeysResponse{}
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
if uploadRes.Error != nil {
// If the error is due to a missing or invalid parameter then we'd might
// as well just acknowledge the message, because otherwise otherwise we'll
// just keep getting delivered a faulty message over and over again.
return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam
}
return true
}

View file

@ -219,25 +219,23 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
} }
// Finally, generate a notification that we updated the keys. // Finally, generate a notification that we updated the keys.
if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer { update := eduserverAPI.CrossSigningKeyUpdate{
update := eduserverAPI.CrossSigningKeyUpdate{ UserID: req.UserID,
UserID: req.UserID, }
} if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { update.MasterKey = &mk
update.MasterKey = &mk }
} if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok {
if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { update.SelfSigningKey = &ssk
update.SelfSigningKey = &ssk }
} if update.MasterKey == nil && update.SelfSigningKey == nil {
if update.MasterKey == nil && update.SelfSigningKey == nil { return
return }
} if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { res.Error = &api.KeyError{
res.Error = &api.KeyError{ Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
}
return
} }
return
} }
} }
@ -310,16 +308,18 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
// Finally, generate a notification that we updated the signatures. // Finally, generate a notification that we updated the signatures.
for userID := range req.Signatures { for userID := range req.Signatures {
if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer { masterKey := queryRes.MasterKeys[userID]
update := eduserverAPI.CrossSigningKeyUpdate{ selfSigningKey := queryRes.SelfSigningKeys[userID]
UserID: userID, update := eduserverAPI.CrossSigningKeyUpdate{
} UserID: userID,
if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { MasterKey: &masterKey,
res.Error = &api.KeyError{ SelfSigningKey: &selfSigningKey,
Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), }
} if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
return res.Error = &api.KeyError{
Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
} }
return
} }
} }
} }

View file

@ -326,8 +326,14 @@ func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysReques
if err = json.Unmarshal(key, &deviceKey); err != nil { if err = json.Unmarshal(key, &deviceKey); err != nil {
continue continue
} }
if deviceKey.Signatures == nil {
deviceKey.Signatures = map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{}
}
for sourceUserID, forSourceUser := range sigMap { for sourceUserID, forSourceUser := range sigMap {
for sourceKeyID, sourceSig := range forSourceUser { for sourceKeyID, sourceSig := range forSourceUser {
if _, ok := deviceKey.Signatures[sourceUserID]; !ok {
deviceKey.Signatures[sourceUserID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{}
}
deviceKey.Signatures[sourceUserID][sourceKeyID] = sourceSig deviceKey.Signatures[sourceUserID][sourceKeyID] = sourceSig
} }
} }
@ -447,7 +453,6 @@ func (a *KeyInternalAPI) queryRemoteKeysOnServer(
for userID, deviceIDs := range devKeys { for userID, deviceIDs := range devKeys {
if len(deviceIDs) == 0 { if len(deviceIDs) == 0 {
userIDsForAllDevices[userID] = struct{}{} userIDsForAllDevices[userID] = struct{}{}
delete(devKeys, userID)
} }
} }
// for cross-signing keys, it's probably easier just to hit /keys/query if we aren't already doing // for cross-signing keys, it's probably easier just to hit /keys/query if we aren't already doing

View file

@ -18,7 +18,6 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/consumers"
"github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/producers"
@ -65,12 +64,5 @@ func NewInternalAPI(
} }
}() }()
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
base.ProcessContext, base.Cfg, js, db, ap,
)
if err := keyconsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
}
return ap return ap
} }

View file

@ -51,7 +51,7 @@ func SendEventWithState(
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
origin gomatrixserverlib.ServerName, haveEventIDs map[string]bool, async bool, origin gomatrixserverlib.ServerName, haveEventIDs map[string]bool, async bool,
) error { ) error {
outliers, err := state.Events() outliers, err := state.Events(event.RoomVersion)
if err != nil { if err != nil {
return err return err
} }
@ -68,9 +68,10 @@ func SendEventWithState(
}) })
} }
stateEventIDs := make([]string, len(state.StateEvents)) stateEvents := state.StateEvents.UntrustedEvents(event.RoomVersion)
for i := range state.StateEvents { stateEventIDs := make([]string, len(stateEvents))
stateEventIDs[i] = state.StateEvents[i].EventID() for i := range stateEvents {
stateEventIDs[i] = stateEvents[i].EventID()
} }
ires = append(ires, InputRoomEvent{ ires = append(ires, InputRoomEvent{

View file

@ -195,9 +195,26 @@ func (r *Inputer) processRoomEvent(
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
for _, authEventID := range authEventIDs { for _, authEventID := range authEventIDs {
if _, ok := knownEvents[authEventID]; !ok { if _, ok := knownEvents[authEventID]; !ok {
return rollbackTransaction, fmt.Errorf("missing auth event %s", authEventID) // Unknown auth events only really matter if the event actually failed
// auth. If it passed auth then we can assume that everything that was
// known was sufficient, even if extraneous auth events were specified
// but weren't found.
if isRejected {
if event.StateKey() != nil {
return commitTransaction, fmt.Errorf(
"missing auth event %s for state event %s (type %q, state key %q)",
authEventID, event.EventID(), event.Type(), *event.StateKey(),
)
} else {
return commitTransaction, fmt.Errorf(
"missing auth event %s for timeline event %s (type %q)",
authEventID, event.EventID(), event.Type(),
)
}
}
} else {
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
} }
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
} }
var softfail bool var softfail bool
@ -238,13 +255,32 @@ func (r *Inputer) processRoomEvent(
hadEvents: map[string]bool{}, hadEvents: map[string]bool{},
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
} }
if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { if stateSnapshot, err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
// Something went wrong with retrieving the missing state, so we can't
// really do anything with the event other than reject it at this point.
isRejected = true isRejected = true
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
} else if stateSnapshot != nil {
// We retrieved some state and we ended up having to call /state_ids for
// the new event in question (probably because closing the gap by using
// /get_missing_events didn't do what we hoped) so we'll instead overwrite
// the state snapshot with the newly resolved state.
missingPrev = false
input.HasState = true
input.StateEventIDs = make([]string, 0, len(stateSnapshot.StateEvents))
for _, e := range stateSnapshot.StateEvents {
input.StateEventIDs = append(input.StateEventIDs, e.EventID())
}
} else { } else {
// We retrieved some state and it would appear that rolling forward the
// state did everything we needed it to do, so we can just resolve the
// state for the event in the normal way.
missingPrev = false missingPrev = false
} }
} else { } else {
// We're missing prev events or state for the event, but for some reason
// we don't know any servers to ask. In this case we can't do anything but
// reject the event and hope that it gets unrejected later.
isRejected = true isRejected = true
rejectionErr = fmt.Errorf("missing prev events and no other servers to ask") rejectionErr = fmt.Errorf("missing prev events and no other servers to ask")
} }
@ -282,7 +318,7 @@ func (r *Inputer) processRoomEvent(
return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID()) return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
} }
if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 { if input.HasState || (!missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0) {
// We haven't calculated a state for this event yet. // We haven't calculated a state for this event yet.
// Lets calculate one. // Lets calculate one.
err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected) err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected)
@ -297,7 +333,10 @@ func (r *Inputer) processRoomEvent(
"soft_fail": softfail, "soft_fail": softfail,
"missing_prev": missingPrev, "missing_prev": missingPrev,
}).Warn("Stored rejected event") }).Warn("Stored rejected event")
return commitTransaction, rejectionErr if rejectionErr != nil {
return commitTransaction, types.RejectedError(rejectionErr.Error())
}
return commitTransaction, nil
} }
switch input.Kind { switch input.Kind {
@ -413,44 +452,42 @@ func (r *Inputer) fetchAuthEvents(
return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers) return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers)
} }
// Reuse these to reduce allocations.
authEventNIDs := make([]types.EventNID, 0, 5)
isRejected := false
nextAuthEvent:
for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering( for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering(
res.AuthEvents, res.AuthEvents.UntrustedEvents(event.RoomVersion),
gomatrixserverlib.TopologicalOrderByAuthEvents, gomatrixserverlib.TopologicalOrderByAuthEvents,
) { ) {
// If we already know about this event from the database then we don't // If we already know about this event from the database then we don't
// need to store it again or do anything further with it, so just skip // need to store it again or do anything further with it, so just skip
// over it rather than wasting cycles. // over it rather than wasting cycles.
if ev, ok := known[authEvent.EventID()]; ok && ev != nil { if ev, ok := known[authEvent.EventID()]; ok && ev != nil {
continue continue nextAuthEvent
} }
// Check the signatures of the event. If this fails then we'll simply // Check the signatures of the event. If this fails then we'll simply
// skip it, because gomatrixserverlib.Allowed() will notice a problem // skip it, because gomatrixserverlib.Allowed() will notice a problem
// if a critical event is missing anyway. // if a critical event is missing anyway.
if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
continue continue nextAuthEvent
} }
// In order to store the new auth event, we need to know its auth chain // In order to store the new auth event, we need to know its auth chain
// as NIDs for the `auth_event_nids` column. Let's see if we can find those. // as NIDs for the `auth_event_nids` column. Let's see if we can find those.
authEventNIDs := make([]types.EventNID, 0, len(authEvent.AuthEventIDs())) authEventNIDs = authEventNIDs[:0]
for _, eventID := range authEvent.AuthEventIDs() { for _, eventID := range authEvent.AuthEventIDs() {
knownEvent, ok := known[eventID] knownEvent, ok := known[eventID]
if !ok { if !ok {
return fmt.Errorf("missing auth event %s for %s", eventID, authEvent.EventID()) continue nextAuthEvent
} }
authEventNIDs = append(authEventNIDs, knownEvent.EventNID) authEventNIDs = append(authEventNIDs, knownEvent.EventNID)
} }
// Let's take a note of the fact that we now know about this event.
if err := auth.AddEvent(authEvent); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
// Check if the auth event should be rejected. // Check if the auth event should be rejected.
isRejected := false err := gomatrixserverlib.Allowed(authEvent, auth)
if err := gomatrixserverlib.Allowed(authEvent, auth); err != nil { if isRejected = err != nil; isRejected {
isRejected = true
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID()) logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
} }
@ -460,6 +497,14 @@ func (r *Inputer) fetchAuthEvents(
return fmt.Errorf("updater.StoreEvent: %w", err) return fmt.Errorf("updater.StoreEvent: %w", err)
} }
// Let's take a note of the fact that we now know about this event for
// authenticating future events.
if !isRejected {
if err := auth.AddEvent(authEvent); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
}
// Now we know about this event, it was stored and the signatures were OK. // Now we know about this event, it was stored and the signatures were OK.
known[authEvent.EventID()] = &types.Event{ known[authEvent.EventID()] = &types.Event{
EventNID: eventNID, EventNID: eventNID,
@ -483,16 +528,7 @@ func (r *Inputer) calculateAndSetState(
roomState := state.NewStateResolution(updater, roomInfo) roomState := state.NewStateResolution(updater, roomInfo)
if input.HasState { if input.HasState {
// Check here if we think we're in the room already.
stateAtEvent.Overwrite = true stateAtEvent.Overwrite = true
var joinEventNIDs []types.EventNID
// Request join memberships only for local users only.
if joinEventNIDs, err = updater.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, true); err == nil {
// If we have no local users that are joined to the room then any state about
// the room that we have is quite possibly out of date. Therefore in that case
// we should overwrite it rather than merge it.
stateAtEvent.Overwrite = len(joinEventNIDs) == 0
}
// We've been told what the state at the event is so we don't need to calculate it. // We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state. // Check that those state events are in the database and store the state.

View file

@ -48,7 +48,7 @@ func (r *Inputer) updateMemberships(
// Load the event JSON so we can look up the "membership" key. // Load the event JSON so we can look up the "membership" key.
// TODO: Maybe add a membership key to the events table so we can load that // TODO: Maybe add a membership key to the events table so we can load that
// key without having to load the entire event JSON? // key without having to load the entire event JSON?
events, err := r.DB.Events(ctx, eventNIDs) events, err := updater.Events(ctx, eventNIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -12,11 +12,17 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
type parsedRespState struct {
AuthEvents []*gomatrixserverlib.Event
StateEvents []*gomatrixserverlib.Event
}
type missingStateReq struct { type missingStateReq struct {
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
db *shared.RoomUpdater db *shared.RoomUpdater
@ -34,9 +40,10 @@ type missingStateReq struct {
// processEventWithMissingState is the entrypoint for a missingStateReq // processEventWithMissingState is the entrypoint for a missingStateReq
// request, as called from processRoomEvent. // request, as called from processRoomEvent.
// nolint:gocyclo
func (t *missingStateReq) processEventWithMissingState( func (t *missingStateReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) error { ) (*parsedRespState, error) {
// We are missing the previous events for this events. // We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the // This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap: // room. There two ways that we can handle such a gap:
@ -62,100 +69,51 @@ func (t *missingStateReq) processEventWithMissingState(
// - fill in the gap completely then process event `e` returning no backwards extremity // - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil // - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
newEvents, isGapFilled, err := t.getMissingEvents(ctx, e, roomVersion) newEvents, isGapFilled, prevStatesKnown, err := t.getMissingEvents(ctx, e, roomVersion)
if err != nil { if err != nil {
return fmt.Errorf("t.getMissingEvents: %w", err) return nil, fmt.Errorf("t.getMissingEvents: %w", err)
} }
if len(newEvents) == 0 { if len(newEvents) == 0 {
return fmt.Errorf("expected to find missing events but didn't") return nil, fmt.Errorf("expected to find missing events but didn't")
} }
if isGapFilled { if isGapFilled {
logger.Infof("gap filled by /get_missing_events, injecting %d new events", len(newEvents)) logger.Infof("Gap filled by /get_missing_events, injecting %d new events", len(newEvents))
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled // we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
// in the gap in the DAG // in the gap in the DAG
for _, newEvent := range newEvents { for _, newEvent := range newEvents {
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
Kind: api.KindNew, Kind: api.KindOld,
Event: newEvent.Headered(roomVersion), Event: newEvent.Headered(roomVersion),
Origin: t.origin, Origin: t.origin,
SendAsServer: api.DoNotSendToOtherServers, SendAsServer: api.DoNotSendToOtherServers,
}) })
if err != nil { if err != nil {
return fmt.Errorf("t.inputer.processRoomEvent: %w", err) if _, ok := err.(types.RejectedError); !ok {
return nil, fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err)
}
} }
} }
return nil
} }
// If we filled the gap *and* we know the state before the prev events
// then there's nothing else to do, we have everything we need to deal
// with the new event.
if isGapFilled && prevStatesKnown {
logger.Infof("Gap filled and state found for all prev events")
return nil, nil
}
// Otherwise, if we've reached this point, it's possible that we've
// either not closed the gap, or we did but we still don't seem to
// know the events before the new event. Start by looking up the
// state at the event at the back of the gap and we'll try to roll
// forward the state first.
backwardsExtremity := newEvents[0] backwardsExtremity := newEvents[0]
newEvents = newEvents[1:] newEvents = newEvents[1:]
type respState struct { resolvedState, err := t.lookupResolvedStateBeforeEvent(ctx, backwardsExtremity, roomVersion)
// A snapshot is considered trustworthy if it came from our own roomserver. if err != nil {
// That's because the state will have been through state resolution once return nil, fmt.Errorf("t.lookupState (backwards extremity): %w", err)
// already in QueryStateAfterEvent.
trustworthy bool
*gomatrixserverlib.RespState
}
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
var states []*respState
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
if lerr != nil {
logger.WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return lerr
}
// Append the state onto the collected state. We'll run this through the
// state resolution next.
states = append(states, &respState{trustworthy, prevState})
}
// Now that we have collected all of the state from the prev_events, we'll
// run the state through the appropriate state resolution algorithm for the
// room if needed. This does a couple of things:
// 1. Ensures that the state is deduplicated fully for each state-key tuple
// 2. Ensures that we pick the latest events from both sets, in the case that
// one of the prev_events is quite a bit older than the others
resolvedState := &gomatrixserverlib.RespState{}
switch len(states) {
case 0:
extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
if !extremityIsCreate {
// There are no previous states and this isn't the beginning of the
// room - this is an error condition!
logger.Errorf("Failed to lookup any state after prev_events")
return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states))
}
case 1:
// There's only one previous state - if it's trustworthy (came from a
// local state snapshot which will already have been through state res),
// use it as-is. There's no point in resolving it again.
if states[0].trustworthy {
resolvedState = states[0].RespState
break
}
// Otherwise, if it isn't trustworthy (came from federation), run it through
// state resolution anyway for safety, in case there are duplicates.
fallthrough
default:
respStates := make([]*gomatrixserverlib.RespState, len(states))
for i := range states {
respStates[i] = states[i].RespState
}
// There's more than one previous state - run them all through state res
t.roomsMu.Lock(e.RoomID())
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity)
t.roomsMu.Unlock(e.RoomID())
if err != nil {
logger.WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
return err
}
} }
hadEvents := map[string]bool{} hadEvents := map[string]bool{}
@ -165,27 +123,37 @@ func (t *missingStateReq) processEventWithMissingState(
} }
t.hadEventsMutex.Unlock() t.hadEventsMutex.Unlock()
// Send outliers first so we can send the new backwards extremity without causing errors sendOutliers := func(resolvedState *parsedRespState) error {
outliers, err := resolvedState.Events() outliers, oerr := gomatrixserverlib.OrderAuthAndStateEvents(resolvedState.AuthEvents, resolvedState.StateEvents, roomVersion)
if err != nil { if oerr != nil {
return err return fmt.Errorf("gomatrixserverlib.OrderAuthAndStateEvents: %w", oerr)
}
var outlierRoomEvents []api.InputRoomEvent
for _, outlier := range outliers {
if hadEvents[outlier.EventID()] {
continue
} }
outlierRoomEvents = append(outlierRoomEvents, api.InputRoomEvent{ var outlierRoomEvents []api.InputRoomEvent
Kind: api.KindOutlier, for _, outlier := range outliers {
Event: outlier.Headered(roomVersion), if hadEvents[outlier.EventID()] {
Origin: t.origin, continue
}) }
} outlierRoomEvents = append(outlierRoomEvents, api.InputRoomEvent{
// TODO: we could do this concurrently? Kind: api.KindOutlier,
for _, ire := range outlierRoomEvents { Event: outlier.Headered(roomVersion),
if _, err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil { Origin: t.origin,
return fmt.Errorf("t.inputer.processRoomEvent[outlier]: %w", err) })
} }
for _, ire := range outlierRoomEvents {
_, err = t.inputer.processRoomEvent(ctx, t.db, &ire)
if err != nil {
if _, ok := err.(types.RejectedError); !ok {
return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err)
}
}
}
return nil
}
// Send outliers first so we can send the state along with the new backwards
// extremity without any missing auth events.
if err = sendOutliers(resolvedState); err != nil {
return nil, fmt.Errorf("sendOutliers: %w", err)
} }
// Now send the backward extremity into the roomserver with the // Now send the backward extremity into the roomserver with the
@ -205,7 +173,9 @@ func (t *missingStateReq) processEventWithMissingState(
SendAsServer: api.DoNotSendToOtherServers, SendAsServer: api.DoNotSendToOtherServers,
}) })
if err != nil { if err != nil {
return fmt.Errorf("t.inputer.processRoomEvent: %w", err) if _, ok := err.(types.RejectedError); !ok {
return nil, fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err)
}
} }
// Then send all of the newer backfilled events, of which will all be newer // Then send all of the newer backfilled events, of which will all be newer
@ -220,16 +190,115 @@ func (t *missingStateReq) processEventWithMissingState(
SendAsServer: api.DoNotSendToOtherServers, SendAsServer: api.DoNotSendToOtherServers,
}) })
if err != nil { if err != nil {
return fmt.Errorf("t.inputer.processRoomEvent: %w", err) if _, ok := err.(types.RejectedError); !ok {
return nil, fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err)
}
} }
} }
return nil // Finally, check again if we know everything we need to know in order to
// make forward progress. If the prev state is known then we consider the
// rolled forward state to be sufficient — we now know all of the state
// before the prev events. If we don't then we need to look up the state
// before the new event as well, otherwise we will never make any progress.
if t.isPrevStateKnown(ctx, e) {
return nil, nil
}
// If we still haven't got the state for the prev events then we'll go and
// ask the federation for it if needed.
resolvedState, err = t.lookupResolvedStateBeforeEvent(ctx, e, roomVersion)
if err != nil {
return nil, fmt.Errorf("t.lookupState (new event): %w", err)
}
// Send the outliers for the retrieved state.
if err = sendOutliers(resolvedState); err != nil {
return nil, fmt.Errorf("sendOutliers: %w", err)
}
// Then return the resolved state, for which the caller can replace the
// HasState with the event IDs to create a new state snapshot when we
// process the new event.
return resolvedState, nil
}
func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver.
// That's because the state will have been through state resolution once
// already in QueryStateAfterEvent.
trustworthy bool
*parsedRespState
}
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
var states []*respState
for _, prevEventID := range e.PrevEventIDs() {
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
prevState, trustworthy, err := t.lookupStateAfterEvent(ctx, roomVersion, e.RoomID(), prevEventID)
if err != nil {
return nil, fmt.Errorf("t.lookupStateAfterEvent: %w", err)
}
// Append the state onto the collected state. We'll run this through the
// state resolution next.
states = append(states, &respState{trustworthy, prevState})
}
// Now that we have collected all of the state from the prev_events, we'll
// run the state through the appropriate state resolution algorithm for the
// room if needed. This does a couple of things:
// 1. Ensures that the state is deduplicated fully for each state-key tuple
// 2. Ensures that we pick the latest events from both sets, in the case that
// one of the prev_events is quite a bit older than the others
resolvedState := &parsedRespState{}
switch len(states) {
case 0:
extremityIsCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("")
if !extremityIsCreate {
// There are no previous states and this isn't the beginning of the
// room - this is an error condition!
return nil, fmt.Errorf("expected %d states but got %d", len(e.PrevEventIDs()), len(states))
}
case 1:
// There's only one previous state - if it's trustworthy (came from a
// local state snapshot which will already have been through state res),
// use it as-is. There's no point in resolving it again. Only trust a
// trustworthy state snapshot if it actually contains some state for all
// non-create events, otherwise we need to resolve what came from federation.
isCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("")
if states[0].trustworthy && (isCreate || len(states[0].StateEvents) > 0) {
resolvedState = states[0].parsedRespState
break
}
// Otherwise, if it isn't trustworthy (came from federation), run it through
// state resolution anyway for safety, in case there are duplicates.
fallthrough
default:
respStates := make([]*parsedRespState, len(states))
for i := range states {
respStates[i] = states[i].parsedRespState
}
// There's more than one previous state - run them all through state res
var err error
t.roomsMu.Lock(e.RoomID())
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, e)
t.roomsMu.Unlock(e.RoomID())
if err != nil {
return nil, fmt.Errorf("t.resolveStatesAndCheck: %w", err)
}
}
return resolvedState, nil
} }
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix. // added into the mix.
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) { func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
// try doing all this locally before we resort to querying federation // try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID) respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
if respState != nil { if respState != nil {
@ -280,7 +349,7 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *g
return ev return ev
} }
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState { func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *parsedRespState {
var res api.QueryStateAfterEventsResponse var res api.QueryStateAfterEventsResponse
err := t.queryer.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{ err := t.queryer.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
RoomID: roomID, RoomID: roomID,
@ -335,7 +404,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
queryRes.Events = nil queryRes.Events = nil
} }
return &gomatrixserverlib.RespState{ return &parsedRespState{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents), StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
AuthEvents: authEvents, AuthEvents: authEvents,
} }
@ -344,13 +413,13 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what // lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
// the server supports. // the server supports.
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) ( func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
*gomatrixserverlib.RespState, error) { *parsedRespState, error) {
// Attempt to fetch the missing state using /state_ids and /events // Attempt to fetch the missing state using /state_ids and /events
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion) return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
} }
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) { func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
var authEventList []*gomatrixserverlib.Event var authEventList []*gomatrixserverlib.Event
var stateEventList []*gomatrixserverlib.Event var stateEventList []*gomatrixserverlib.Event
for _, state := range states { for _, state := range states {
@ -369,7 +438,7 @@ retryAllowedState:
h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true) h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
switch err2.(type) { switch err2.(type) {
case verifySigError: case verifySigError:
return &gomatrixserverlib.RespState{ return &parsedRespState{
AuthEvents: authEventList, AuthEvents: authEventList,
StateEvents: resolvedStateEvents, StateEvents: resolvedStateEvents,
}, nil }, nil
@ -385,7 +454,7 @@ retryAllowedState:
} }
return nil, err return nil, err
} }
return &gomatrixserverlib.RespState{ return &parsedRespState{
AuthEvents: authEventList, AuthEvents: authEventList,
StateEvents: resolvedStateEvents, StateEvents: resolvedStateEvents,
}, nil }, nil
@ -393,22 +462,13 @@ retryAllowedState:
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject, // get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events // without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled bool, err error) { func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e})
// query latest events (our trusted forward extremities) latest := t.db.LatestEvents()
req := api.QueryLatestEventsAndStateRequest{ latestEvents := make([]string, len(latest))
RoomID: e.RoomID(), for i, ev := range latest {
StateToFetch: needed.Tuples(), latestEvents[i] = ev.EventID
}
var res api.QueryLatestEventsAndStateResponse
if err = t.queryer.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
logger.WithError(err).Warn("Failed to query latest events")
return nil, false, err
}
latestEvents := make([]string, len(res.LatestEvents))
for i, ev := range res.LatestEvents {
latestEvents[i] = res.LatestEvents[i].EventID
t.hadEvent(ev.EventID) t.hadEvent(ev.EventID)
} }
@ -429,7 +489,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {
select { select {
case <-ctx.Done(): // the parent request context timed out case <-ctx.Done(): // the parent request context timed out
return nil, false, context.DeadlineExceeded return nil, false, false, context.DeadlineExceeded
default: // this request exceed its own timeout default: // this request exceed its own timeout
continue continue
} }
@ -442,7 +502,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", "%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.origin, len(t.servers), t.origin, len(t.servers),
) )
return nil, false, missingPrevEventsError{ return nil, false, false, missingPrevEventsError{
eventID: e.EventID(), eventID: e.EventID(),
err: err, err: err,
} }
@ -451,12 +511,13 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
// Make sure events from the missingResp are using the cache - missing events // Make sure events from the missingResp are using the cache - missing events
// will be added and duplicates will be removed. // will be added and duplicates will be removed.
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events)) logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
for i, ev := range missingResp.Events { missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
missingResp.Events[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
missingEvents = append(missingEvents, t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap())
} }
// topologically sort and sanity check that we are making forward progress // topologically sort and sanity check that we are making forward progress
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents) newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
shouldHaveSomeEventIDs := e.PrevEventIDs() shouldHaveSomeEventIDs := e.PrevEventIDs()
hasPrevEvent := false hasPrevEvent := false
Event: Event:
@ -474,52 +535,84 @@ Event:
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.origin, t.origin,
) )
return nil, false, missingPrevEventsError{ return nil, false, false, missingPrevEventsError{
eventID: e.EventID(), eventID: e.EventID(),
err: err, err: err,
} }
} }
if len(newEvents) == 0 { if len(newEvents) == 0 {
return nil, false, nil // TODO: error instead? return nil, false, false, nil // TODO: error instead?
} }
// now check if we can fill the gap. Look to see if we have state snapshot IDs for the earliest event
earliestNewEvent := newEvents[0] earliestNewEvent := newEvents[0]
if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 {
if earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") { // If we retrieved back to the beginning of the room then there's nothing else
// we got to the beginning of the room so there will be no state! It's all good we can process this // to do - we closed the gap.
return newEvents, true, nil if len(earliestNewEvent.PrevEventIDs()) == 0 && earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") {
} return newEvents, true, t.isPrevStateKnown(ctx, e), nil
// we don't have the state at this earliest event from /g_m_e so we won't have state for later events either
return newEvents, false, nil
} }
// StateAtEventIDs returned some kind of state for the earliest event so we can fill in the gap!
return newEvents, true, nil // If our backward extremity was not a known event to us then we obviously didn't
// close the gap.
if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 && state[0].BeforeStateSnapshotNID == 0 {
return newEvents, false, false, nil
}
// At this point we are satisfied that we know the state both at the earliest
// retrieved event and at the prev events of the new event.
return newEvents, true, t.isPrevStateKnown(ctx, e), nil
} }
func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e *gomatrixserverlib.Event) bool {
respState *gomatrixserverlib.RespState, err error) { expected := len(e.PrevEventIDs())
state, err := t.db.StateAtEventIDs(ctx, e.PrevEventIDs())
if err != nil || len(state) != expected {
// We didn't get as many state snapshots as we expected, or there was an error,
// so we haven't completely solved the problem for the new event.
return false
}
// Check to see if we have a populated state snapshot for all of the prev events.
for _, stateAtEvent := range state {
if stateAtEvent.BeforeStateSnapshotNID == 0 {
// One of the prev events still has unknown state, so we haven't really
// solved the problem.
return false
}
}
return true
}
func (t *missingStateReq) lookupMissingStateViaState(
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
) (respState *parsedRespState, err error) {
state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion) state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Check that the returned state is valid. // Check that the returned state is valid.
if err := state.Check(ctx, t.keys, nil); err != nil { if err := state.Check(ctx, roomVersion, t.keys, nil); err != nil {
return nil, err return nil, err
} }
parsedState := &parsedRespState{
AuthEvents: make([]*gomatrixserverlib.Event, len(state.AuthEvents)),
StateEvents: make([]*gomatrixserverlib.Event, len(state.StateEvents)),
}
// Cache the results of this state lookup and deduplicate anything we already // Cache the results of this state lookup and deduplicate anything we already
// have in the cache, freeing up memory. // have in the cache, freeing up memory.
for i, ev := range state.AuthEvents { // We load these as trusted as we called state.Check before which loaded them as untrusted.
state.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() for i, evJSON := range state.AuthEvents {
ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
parsedState.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
} }
for i, ev := range state.StateEvents { for i, evJSON := range state.StateEvents {
state.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
parsedState.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
} }
return &state, nil return parsedState, nil
} }
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*gomatrixserverlib.RespState, error) { *parsedRespState, error) {
util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID) util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event // fetch the state event IDs at the time of the event
stateIDs, err := t.federation.LookupStateIDs(ctx, t.origin, roomID, eventID) stateIDs, err := t.federation.LookupStateIDs(ctx, t.origin, roomID, eventID)
@ -651,13 +744,14 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
return resp, err return resp, err
} }
func (t *missingStateReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) ( func (t *missingStateReq) createRespStateFromStateIDs(
*gomatrixserverlib.RespState, error) { // nolint:unparam stateIDs gomatrixserverlib.RespStateIDs,
) (*parsedRespState, error) { // nolint:unparam
t.haveEventsMutex.Lock() t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock() defer t.haveEventsMutex.Unlock()
// create a RespState response using the response to /state_ids as a guide // create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib.RespState{} respState := parsedRespState{}
for i := range stateIDs.StateEventIDs { for i := range stateIDs.StateEventIDs {
ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]] ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]

View file

@ -149,7 +149,8 @@ func (r *Queryer) QueryMissingAuthPrevEvents(
} }
for _, prevEventID := range request.PrevEventIDs { for _, prevEventID := range request.PrevEventIDs {
if state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID}); err != nil || len(state) == 0 { state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID})
if err != nil || len(state) == 0 || (!state[0].IsCreate() && state[0].BeforeStateSnapshotNID == 0) {
response.MissingPrevEventIDs = append(response.MissingPrevEventIDs, prevEventID) response.MissingPrevEventIDs = append(response.MissingPrevEventIDs, prevEventID)
} }
} }

View file

@ -71,10 +71,10 @@ CREATE TABLE IF NOT EXISTS roomserver_events (
` `
const insertEventSQL = "" + const insertEventSQL = "" +
"INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + "INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" +
" ON CONFLICT ON CONSTRAINT roomserver_event_id_unique" + " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique DO UPDATE" +
" DO NOTHING" + " SET is_rejected = $8 WHERE e.event_id = $4 AND e.is_rejected = FALSE" +
" RETURNING event_nid, state_snapshot_nid" " RETURNING event_nid, state_snapshot_nid"
const selectEventSQL = "" + const selectEventSQL = "" +
@ -192,7 +192,8 @@ func (s *eventStatements) InsertEvent(
) (types.EventNID, types.StateSnapshotNID, error) { ) (types.EventNID, types.StateSnapshotNID, error) {
var eventNID int64 var eventNID int64
var stateNID int64 var stateNID int64
err := s.insertEventStmt.QueryRowContext( stmt := sqlutil.TxStmt(txn, s.insertEventStmt)
err := stmt.QueryRowContext(
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth,
isRejected, isRejected,

View file

@ -136,7 +136,7 @@ func (u *MembershipUpdater) SetToJoin(senderUserID string, eventID string, isUpd
} }
// Look up the NID of the new join event // Look up the NID of the new join event
nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID}) nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID})
if err != nil { if err != nil {
return fmt.Errorf("u.d.EventNIDs: %w", err) return fmt.Errorf("u.d.EventNIDs: %w", err)
} }
@ -170,7 +170,7 @@ func (u *MembershipUpdater) SetToLeave(senderUserID string, eventID string) ([]s
} }
// Look up the NID of the new leave event // Look up the NID of the new leave event
nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID}) nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID})
if err != nil { if err != nil {
return fmt.Errorf("u.d.EventNIDs: %w", err) return fmt.Errorf("u.d.EventNIDs: %w", err)
} }
@ -196,7 +196,7 @@ func (u *MembershipUpdater) SetToKnock(event *gomatrixserverlib.Event) (bool, er
} }
if u.membership != tables.MembershipStateKnock { if u.membership != tables.MembershipStateKnock {
// Look up the NID of the new knock event // Look up the NID of the new knock event
nIDs, err := u.d.EventNIDs(u.ctx, []string{event.EventID()}) nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{event.EventID()})
if err != nil { if err != nil {
return fmt.Errorf("u.d.EventNIDs: %w", err) return fmt.Errorf("u.d.EventNIDs: %w", err)
} }

View file

@ -187,6 +187,12 @@ func (u *RoomUpdater) EventIDs(
return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs) return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs)
} }
func (u *RoomUpdater) EventNIDs(
ctx context.Context, eventIDs []string,
) (map[string]types.EventNID, error) {
return u.d.eventNIDs(ctx, u.txn, eventIDs)
}
func (u *RoomUpdater) StateAtEventIDs( func (u *RoomUpdater) StateAtEventIDs(
ctx context.Context, eventIDs []string, ctx context.Context, eventIDs []string,
) ([]types.StateAtEvent, error) { ) ([]types.StateAtEvent, error) {

View file

@ -603,6 +603,8 @@ func (d *Database) storeEvent(
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
// We've already inserted the event so select the numeric event ID // We've already inserted the event so select the numeric event ID
eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID()) eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID())
} else if err != nil {
return fmt.Errorf("d.EventsTable.InsertEvent: %w", err)
} }
if err != nil { if err != nil {
return fmt.Errorf("d.EventsTable.SelectEvent: %w", err) return fmt.Errorf("d.EventsTable.SelectEvent: %w", err)

View file

@ -49,7 +49,8 @@ const eventsSchema = `
const insertEventSQL = ` const insertEventSQL = `
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected) INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO NOTHING ON CONFLICT DO UPDATE
SET is_rejected = $8 WHERE is_rejected = 0
RETURNING event_nid, state_snapshot_nid; RETURNING event_nid, state_snapshot_nid;
` `

View file

@ -83,6 +83,10 @@ type StateKeyTuple struct {
EventStateKeyNID EventStateKeyNID EventStateKeyNID EventStateKeyNID
} }
func (a StateKeyTuple) IsCreate() bool {
return a.EventTypeNID == MRoomCreateNID && a.EventStateKeyNID == EmptyStateKeyNID
}
// LessThan returns true if this state key is less than the other state key. // LessThan returns true if this state key is less than the other state key.
// The ordering is arbitrary and is used to implement binary search and to efficiently deduplicate entries. // The ordering is arbitrary and is used to implement binary search and to efficiently deduplicate entries.
func (a StateKeyTuple) LessThan(b StateKeyTuple) bool { func (a StateKeyTuple) LessThan(b StateKeyTuple) bool {
@ -209,6 +213,12 @@ type MissingEventError string
func (e MissingEventError) Error() string { return string(e) } func (e MissingEventError) Error() string { return string(e) }
// A RejectedError is returned when an event is stored as rejected. The error
// contains the reason why.
type RejectedError string
func (e RejectedError) Error() string { return string(e) }
// RoomInfo contains metadata about a room // RoomInfo contains metadata about a room
type RoomInfo struct { type RoomInfo struct {
RoomNID RoomNID RoomNID RoomNID

View file

@ -82,9 +82,15 @@ type EventRelationshipResponse struct {
Limited bool `json:"limited"` Limited bool `json:"limited"`
} }
func toClientResponse(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) *EventRelationshipResponse { type MSC2836EventRelationshipsResponse struct {
gomatrixserverlib.MSC2836EventRelationshipsResponse
ParsedEvents []*gomatrixserverlib.Event
ParsedAuthChain []*gomatrixserverlib.Event
}
func toClientResponse(res *MSC2836EventRelationshipsResponse) *EventRelationshipResponse {
out := &EventRelationshipResponse{ out := &EventRelationshipResponse{
Events: gomatrixserverlib.ToClientEvents(res.Events, gomatrixserverlib.FormatAll), Events: gomatrixserverlib.ToClientEvents(res.ParsedEvents, gomatrixserverlib.FormatAll),
Limited: res.Limited, Limited: res.Limited,
NextBatch: res.NextBatch, NextBatch: res.NextBatch,
} }
@ -210,7 +216,7 @@ func federatedEventRelationship(
// add auth chain information // add auth chain information
requiredAuthEventsSet := make(map[string]bool) requiredAuthEventsSet := make(map[string]bool)
var requiredAuthEvents []string var requiredAuthEvents []string
for _, ev := range res.Events { for _, ev := range res.ParsedEvents {
for _, a := range ev.AuthEventIDs() { for _, a := range ev.AuthEventIDs() {
if requiredAuthEventsSet[a] { if requiredAuthEventsSet[a] {
continue continue
@ -227,19 +233,24 @@ func federatedEventRelationship(
// they may already have the auth events so don't fail this request // they may already have the auth events so don't fail this request
util.GetLogger(ctx).WithError(err).Error("Failed to QueryAuthChain") util.GetLogger(ctx).WithError(err).Error("Failed to QueryAuthChain")
} }
res.AuthChain = make([]*gomatrixserverlib.Event, len(queryRes.AuthChain)) res.AuthChain = make(gomatrixserverlib.EventJSONs, len(queryRes.AuthChain))
for i := range queryRes.AuthChain { for i := range queryRes.AuthChain {
res.AuthChain[i] = queryRes.AuthChain[i].Unwrap() res.AuthChain[i] = queryRes.AuthChain[i].JSON()
}
res.Events = make(gomatrixserverlib.EventJSONs, len(res.ParsedEvents))
for i := range res.ParsedEvents {
res.Events[i] = res.ParsedEvents[i].JSON()
} }
return util.JSONResponse{ return util.JSONResponse{
Code: 200, Code: 200,
JSON: res, JSON: res.MSC2836EventRelationshipsResponse,
} }
} }
func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) { func (rc *reqCtx) process() (*MSC2836EventRelationshipsResponse, *util.JSONResponse) {
var res gomatrixserverlib.MSC2836EventRelationshipsResponse var res MSC2836EventRelationshipsResponse
var returnEvents []*gomatrixserverlib.HeaderedEvent var returnEvents []*gomatrixserverlib.HeaderedEvent
// Can the user see (according to history visibility) event_id? If no, reject the request, else continue. // Can the user see (according to history visibility) event_id? If no, reject the request, else continue.
event := rc.getLocalEvent(rc.req.EventID) event := rc.getLocalEvent(rc.req.EventID)
@ -290,11 +301,11 @@ func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsRespons
) )
returnEvents = append(returnEvents, events...) returnEvents = append(returnEvents, events...)
} }
res.Events = make([]*gomatrixserverlib.Event, len(returnEvents)) res.ParsedEvents = make([]*gomatrixserverlib.Event, len(returnEvents))
for i, ev := range returnEvents { for i, ev := range returnEvents {
// for each event, extract the children_count | hash and add it as unsigned data. // for each event, extract the children_count | hash and add it as unsigned data.
rc.addChildMetadata(ev) rc.addChildMetadata(ev)
res.Events[i] = ev.Unwrap() res.ParsedEvents[i] = ev.Unwrap()
} }
res.Limited = remaining == 0 || walkLimited res.Limited = remaining == 0 || walkLimited
return &res, nil return &res, nil
@ -357,7 +368,7 @@ func (rc *reqCtx) fetchUnknownEvent(eventID, roomID string) *gomatrixserverlib.H
continue continue
} }
rc.injectResponseToRoomserver(res) rc.injectResponseToRoomserver(res)
for _, ev := range res.Events { for _, ev := range res.ParsedEvents {
if ev.EventID() == eventID { if ev.EventID() == eventID {
return ev.Headered(ev.Version()) return ev.Headered(ev.Version())
} }
@ -384,7 +395,7 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen
if rc.hasUnexploredChildren(parentID) { if rc.hasUnexploredChildren(parentID) {
// we need to do a remote request to pull in the children as we are missing them locally. // we need to do a remote request to pull in the children as we are missing them locally.
serversToQuery := rc.getServersForEventID(parentID) serversToQuery := rc.getServersForEventID(parentID)
var result *gomatrixserverlib.MSC2836EventRelationshipsResponse var result *MSC2836EventRelationshipsResponse
for _, srv := range serversToQuery { for _, srv := range serversToQuery {
res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{ res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
EventID: parentID, EventID: parentID,
@ -397,7 +408,12 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen
if err != nil { if err != nil {
util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("includeChildren: failed to call MSC2836EventRelationships") util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("includeChildren: failed to call MSC2836EventRelationships")
} else { } else {
result = &res mscRes := &MSC2836EventRelationshipsResponse{
MSC2836EventRelationshipsResponse: res,
}
mscRes.ParsedEvents = res.Events.UntrustedEvents(rc.roomVersion)
mscRes.ParsedAuthChain = res.AuthChain.UntrustedEvents(rc.roomVersion)
result = mscRes
break break
} }
} }
@ -467,7 +483,7 @@ func walkThread(
} }
// MSC2836EventRelationships performs an /event_relationships request to a remote server // MSC2836EventRelationships performs an /event_relationships request to a remote server
func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverlib.ServerName, ver gomatrixserverlib.RoomVersion) (*gomatrixserverlib.MSC2836EventRelationshipsResponse, error) { func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverlib.ServerName, ver gomatrixserverlib.RoomVersion) (*MSC2836EventRelationshipsResponse, error) {
res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{ res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
EventID: eventID, EventID: eventID,
DepthFirst: rc.req.DepthFirst, DepthFirst: rc.req.DepthFirst,
@ -481,7 +497,12 @@ func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverli
util.GetLogger(rc.ctx).WithError(err).Error("Failed to call MSC2836EventRelationships") util.GetLogger(rc.ctx).WithError(err).Error("Failed to call MSC2836EventRelationships")
return nil, err return nil, err
} }
return &res, nil mscRes := &MSC2836EventRelationshipsResponse{
MSC2836EventRelationshipsResponse: res,
}
mscRes.ParsedEvents = res.Events.UntrustedEvents(ver)
mscRes.ParsedAuthChain = res.AuthChain.UntrustedEvents(ver)
return mscRes, nil
} }
@ -550,12 +571,12 @@ func (rc *reqCtx) getServersForEventID(eventID string) []gomatrixserverlib.Serve
return serversToQuery return serversToQuery
} }
func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse { func (rc *reqCtx) remoteEventRelationships(eventID string) *MSC2836EventRelationshipsResponse {
if rc.isFederatedRequest { if rc.isFederatedRequest {
return nil // we don't query remote servers for remote requests return nil // we don't query remote servers for remote requests
} }
serversToQuery := rc.getServersForEventID(eventID) serversToQuery := rc.getServersForEventID(eventID)
var res *gomatrixserverlib.MSC2836EventRelationshipsResponse var res *MSC2836EventRelationshipsResponse
var err error var err error
for _, srv := range serversToQuery { for _, srv := range serversToQuery {
res, err = rc.MSC2836EventRelationships(eventID, srv, rc.roomVersion) res, err = rc.MSC2836EventRelationships(eventID, srv, rc.roomVersion)
@ -577,7 +598,7 @@ func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent
if queryRes != nil { if queryRes != nil {
// inject all the events into the roomserver then return the event in question // inject all the events into the roomserver then return the event in question
rc.injectResponseToRoomserver(queryRes) rc.injectResponseToRoomserver(queryRes)
for _, ev := range queryRes.Events { for _, ev := range queryRes.ParsedEvents {
if ev.EventID() == eventID && rc.req.RoomID == ev.RoomID() { if ev.EventID() == eventID && rc.req.RoomID == ev.RoomID() {
return ev.Headered(ev.Version()) return ev.Headered(ev.Version())
} }
@ -619,12 +640,12 @@ func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent
// injectResponseToRoomserver injects the events // injectResponseToRoomserver injects the events
// into the roomserver as KindOutlier, with auth chains. // into the roomserver as KindOutlier, with auth chains.
func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) { func (rc *reqCtx) injectResponseToRoomserver(res *MSC2836EventRelationshipsResponse) {
var stateEvents []*gomatrixserverlib.Event var stateEvents gomatrixserverlib.EventJSONs
var messageEvents []*gomatrixserverlib.Event var messageEvents []*gomatrixserverlib.Event
for _, ev := range res.Events { for _, ev := range res.ParsedEvents {
if ev.StateKey() != nil { if ev.StateKey() != nil {
stateEvents = append(stateEvents, ev) stateEvents = append(stateEvents, ev.JSON())
} else { } else {
messageEvents = append(messageEvents, ev) messageEvents = append(messageEvents, ev)
} }
@ -633,7 +654,7 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event
AuthEvents: res.AuthChain, AuthEvents: res.AuthChain,
StateEvents: stateEvents, StateEvents: stateEvents,
} }
eventsInOrder, err := respState.Events() eventsInOrder, err := respState.Events(rc.roomVersion)
if err != nil { if err != nil {
util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse") util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse")
return return

View file

@ -282,6 +282,8 @@ func membershipEvents(res *types.Response) (joinUserIDs, leaveUserIDs []string)
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil { if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil {
if strings.Contains(string(ev.Content), `"join"`) { if strings.Contains(string(ev.Content), `"join"`) {
joinUserIDs = append(joinUserIDs, *ev.StateKey) joinUserIDs = append(joinUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"invite"`) {
joinUserIDs = append(joinUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"leave"`) { } else if strings.Contains(string(ev.Content), `"leave"`) {
leaveUserIDs = append(leaveUserIDs, *ev.StateKey) leaveUserIDs = append(leaveUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"ban"`) { } else if strings.Contains(string(ev.Content), `"ban"`) {

View file

@ -590,3 +590,5 @@ Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9 Can receive redactions from regular users over federation in room version 9
Forward extremities remain so even after the next events are populated as outliers Forward extremities remain so even after the next events are populated as outliers
If a device list update goes missing, the server resyncs on the next one If a device list update goes missing, the server resyncs on the next one
uploading self-signing key notifies over federation
uploading signed devices gets propagated over federation