Merge branch 'main' into kegan/eventjsons

This commit is contained in:
kegsay 2022-02-09 11:49:09 +00:00 committed by GitHub
commit c0d0eee685
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 163 additions and 321 deletions

View file

@ -2,6 +2,6 @@
<!-- Please read docs/CONTRIBUTING.md before submitting your pull request --> <!-- Please read docs/CONTRIBUTING.md before submitting your pull request -->
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/master/docs/CONTRIBUTING.md#sign-off) * [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off)
Signed-off-by: `Your Name <your@email.example.org>` Signed-off-by: `Your Name <your@email.example.org>`

View file

@ -2,9 +2,9 @@ name: "CodeQL"
on: on:
push: push:
branches: [master] branches: [main]
pull_request: pull_request:
branches: [master] branches: [main]
jobs: jobs:
analyze: analyze:
@ -14,7 +14,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
language: ['go'] language: ["go"]
steps: steps:
- name: Checkout repository - name: Checkout repository

View file

@ -2,7 +2,7 @@ name: Tests
on: on:
push: push:
branches: [ 'master' ] branches: ["main"]
pull_request: pull_request:
concurrency: concurrency:
@ -33,7 +33,7 @@ jobs:
path: dendrite path: dendrite
# Attempt to check out the same branch of Complement as the PR. If it # Attempt to check out the same branch of Complement as the PR. If it
# doesn't exist, fallback to master. # doesn't exist, fallback to main.
- name: Checkout complement - name: Checkout complement
shell: bash shell: bash
run: | run: |

View file

@ -7,7 +7,7 @@ if [ -d ".git" ]
then then
export BUILD=`git rev-parse --short HEAD || ""` export BUILD=`git rev-parse --short HEAD || ""`
export BRANCH=`(git symbolic-ref --short HEAD | tr -d \/ ) || ""` export BRANCH=`(git symbolic-ref --short HEAD | tr -d \/ ) || ""`
if [ "$BRANCH" = master ] if [ "$BRANCH" = main ]
then then
export BRANCH="" export BRANCH=""
fi fi

View file

@ -9,9 +9,9 @@ FROM golang:1.14-alpine AS gobuild
# Download and build dendrite # Download and build dendrite
WORKDIR /build WORKDIR /build
ADD https://github.com/matrix-org/dendrite/archive/master.tar.gz /build/master.tar.gz ADD https://github.com/matrix-org/dendrite/archive/main.tar.gz /build/main.tar.gz
RUN tar xvfz master.tar.gz RUN tar xvfz main.tar.gz
WORKDIR /build/dendrite-master WORKDIR /build/dendrite-main
RUN GOOS=js GOARCH=wasm go build -o main.wasm ./cmd/dendritejs RUN GOOS=js GOARCH=wasm go build -o main.wasm ./cmd/dendritejs
@ -21,7 +21,7 @@ RUN apt-get update && apt-get -y install python
# Download riot-web and libp2p repos # Download riot-web and libp2p repos
WORKDIR /build WORKDIR /build
ADD https://github.com/matrix-org/go-http-js-libp2p/archive/master.tar.gz /build/libp2p.tar.gz ADD https://github.com/matrix-org/go-http-js-libp2p/archive/main.tar.gz /build/libp2p.tar.gz
RUN tar xvfz libp2p.tar.gz RUN tar xvfz libp2p.tar.gz
ADD https://github.com/vector-im/element-web/archive/matthew/p2p.tar.gz /build/p2p.tar.gz ADD https://github.com/vector-im/element-web/archive/matthew/p2p.tar.gz /build/p2p.tar.gz
RUN tar xvfz p2p.tar.gz RUN tar xvfz p2p.tar.gz
@ -31,7 +31,7 @@ WORKDIR /build/element-web-matthew-p2p
RUN yarn install RUN yarn install
RUN ln -s /build/go-http-js-libp2p-master /build/element-web-matthew-p2p/node_modules/go-http-js-libp2p RUN ln -s /build/go-http-js-libp2p-master /build/element-web-matthew-p2p/node_modules/go-http-js-libp2p
RUN (cd node_modules/go-http-js-libp2p && yarn install) RUN (cd node_modules/go-http-js-libp2p && yarn install)
COPY --from=gobuild /build/dendrite-master/main.wasm ./src/vector/dendrite.wasm COPY --from=gobuild /build/dendrite-main/main.wasm ./src/vector/dendrite.wasm
# build it all # build it all
RUN yarn build:p2p RUN yarn build:p2p

View file

@ -37,7 +37,7 @@ If a job fails, click the "details" button and you should be taken to the job's
logs. logs.
![Click the details button on the failing build ![Click the details button on the failing build
step](https://raw.githubusercontent.com/matrix-org/dendrite/master/docs/images/details-button-location.jpg) step](https://raw.githubusercontent.com/matrix-org/dendrite/main/docs/images/details-button-location.jpg)
Scroll down to the failing step and you should see some log output. Scan the Scroll down to the failing step and you should see some log output. Scan the
logs until you find what it's complaining about, fix it, submit a new commit, logs until you find what it's complaining about, fix it, submit a new commit,
@ -57,7 +57,7 @@ significant amount of CPU and RAM.
Once the code builds, run [Sytest](https://github.com/matrix-org/sytest) Once the code builds, run [Sytest](https://github.com/matrix-org/sytest)
according to the guide in according to the guide in
[docs/sytest.md](https://github.com/matrix-org/dendrite/blob/master/docs/sytest.md#using-a-sytest-docker-image) [docs/sytest.md](https://github.com/matrix-org/dendrite/blob/main/docs/sytest.md#using-a-sytest-docker-image)
so you can see whether something is being broken and whether there are newly so you can see whether something is being broken and whether there are newly
passing tests. passing tests.
@ -94,4 +94,4 @@ For more general questions please use
We ask that everyone who contributes to the project signs off their We ask that everyone who contributes to the project signs off their
contributions, in accordance with the contributions, in accordance with the
[DCO](https://github.com/matrix-org/matrix-doc/blob/master/CONTRIBUTING.rst#sign-off). [DCO](https://github.com/matrix-org/matrix-doc/blob/main/CONTRIBUTING.rst#sign-off).

View file

@ -6,7 +6,7 @@ These are the instructions for setting up P2P Dendrite, current as of May 2020.
#### Build #### Build
- The `master` branch has a WASM-only binary for dendrite: `./cmd/dendritejs`. - The `main` branch has a WASM-only binary for dendrite: `./cmd/dendritejs`.
- Build it and copy assets to riot-web. - Build it and copy assets to riot-web.
``` ```
$ ./build-dendritejs.sh $ ./build-dendritejs.sh

View file

@ -100,10 +100,4 @@ type EDUServerInputAPI interface {
request *InputReceiptEventRequest, request *InputReceiptEventRequest,
response *InputReceiptEventResponse, response *InputReceiptEventResponse,
) error ) error
InputCrossSigningKeyUpdate(
ctx context.Context,
request *InputCrossSigningKeyUpdateRequest,
response *InputCrossSigningKeyUpdateResponse,
) error
} }

View file

@ -51,7 +51,6 @@ func NewInternalAPI(
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
OutputKeyChangeEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -40,8 +39,6 @@ type EDUServerInputAPI struct {
OutputSendToDeviceEventTopic string OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to // The kafka topic to output new receipt events to
OutputReceiptEventTopic string OutputReceiptEventTopic string
// The kafka topic to output new key change events to
OutputKeyChangeEventTopic string
// kafka producer // kafka producer
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
// Internal user query API // Internal user query API
@ -80,34 +77,6 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
return t.sendToDeviceEvent(ise) return t.sendToDeviceEvent(ise)
} }
// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
ctx context.Context,
request *api.InputCrossSigningKeyUpdateRequest,
response *api.InputCrossSigningKeyUpdateResponse,
) error {
eventJSON, err := json.Marshal(&keyapi.DeviceMessage{
Type: keyapi.TypeCrossSigningUpdate,
OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
},
})
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"user_id": request.UserID,
}).Tracef("Producing to topic '%s'", t.OutputKeyChangeEventTopic)
_, err = t.JetStream.PublishMsg(&nats.Msg{
Subject: t.OutputKeyChangeEventTopic,
Header: nats.Header{},
Data: eventJSON,
})
return err
}
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{ ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping, Type: gomatrixserverlib.MTyping,

View file

@ -15,7 +15,6 @@ const (
EDUServerInputTypingEventPath = "/eduserver/input" EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
EDUServerInputReceiptEventPath = "/eduserver/receipt" EDUServerInputReceiptEventPath = "/eduserver/receipt"
EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate"
) )
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@ -69,16 +68,3 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// InputCrossSigningKeyUpdate implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate(
ctx context.Context,
request *api.InputCrossSigningKeyUpdateRequest,
response *api.InputCrossSigningKeyUpdateResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -51,17 +51,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath,
httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
var request api.InputCrossSigningKeyUpdateRequest
var response api.InputCrossSigningKeyUpdateResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
} }

View file

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/federationapi/types"

View file

@ -297,7 +297,7 @@ func (oq *destinationQueue) backgroundSend() {
// We haven't backed off yet, so wait for the suggested amount of // We haven't backed off yet, so wait for the suggested amount of
// time. // time.
duration := time.Until(*until) duration := time.Until(*until)
logrus.Warnf("Backing off %q for %s", oq.destination, duration) logrus.Debugf("Backing off %q for %s", oq.destination, duration)
oq.backingOff.Store(true) oq.backingOff.Store(true)
destinationQueueBackingOff.Inc() destinationQueueBackingOff.Inc()
select { select {

View file

@ -382,20 +382,8 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
case eduserverAPI.MSigningKeyUpdate: case eduserverAPI.MSigningKeyUpdate:
var updatePayload eduserverAPI.CrossSigningKeyUpdate if err := t.processSigningKeyUpdate(ctx, e); err != nil {
if err := json.Unmarshal(e.Content, &updatePayload); err != nil { logrus.WithError(err).Errorf("Failed to process signing key update")
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
}).Debug("Failed to send signing key update to edu server")
continue
}
inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
CrossSigningKeyUpdate: updatePayload,
}
inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update")
continue
} }
default: default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
@ -403,6 +391,34 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
var updatePayload eduserverAPI.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
}).Debug("Failed to unmarshal signing key update")
return err
}
keys := gomatrixserverlib.CrossSigningKeys{}
if updatePayload.MasterKey != nil {
keys.MasterKey = *updatePayload.MasterKey
}
if updatePayload.SelfSigningKey != nil {
keys.SelfSigningKey = *updatePayload.SelfSigningKey
}
uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
CrossSigningKeys: keys,
UserID: updatePayload.UserID,
}
uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes)
if uploadRes.Error != nil {
return uploadRes.Error
}
return nil
}
// processReceiptEvent sends receipt events to the edu server // processReceiptEvent sends receipt events to the edu server
func (t *txnReq) processReceiptEvent(ctx context.Context, func (t *txnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string, userID, roomID, receiptType string,

View file

@ -1,123 +0,0 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
type OutputCrossSigningKeyUpdateConsumer struct {
ctx context.Context
keyDB storage.Database
keyAPI api.KeyInternalAPI
serverName string
jetstream nats.JetStreamContext
durable string
topic string
}
func NewOutputCrossSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
js nats.JetStreamContext,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputCrossSigningKeyUpdateConsumer {
// The keyserver both produces and consumes on the TopicOutputKeyChangeEvent
// topic. We will only produce events where the UserID matches our server name,
// and we will only consume events where the UserID does NOT match our server
// name (because the update came from a remote server).
s := &OutputCrossSigningKeyUpdateConsumer{
ctx: process.Context(),
keyDB: keyDB,
jetstream: js,
durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
keyAPI: keyAPI,
serverName: string(cfg.Global.ServerName),
}
return s
}
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return true
}
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
default:
return true
}
}
func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
return true
}
if host == gomatrixserverlib.ServerName(s.serverName) {
// Ignore any messages that contain information about our own users, as
// they already originated from this server.
return true
}
uploadReq := &api.PerformUploadDeviceKeysRequest{
UserID: output.UserID,
}
if output.MasterKey != nil {
uploadReq.MasterKey = *output.MasterKey
}
if output.SelfSigningKey != nil {
uploadReq.SelfSigningKey = *output.SelfSigningKey
}
uploadRes := &api.PerformUploadDeviceKeysResponse{}
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
if uploadRes.Error != nil {
// If the error is due to a missing or invalid parameter then we'd might
// as well just acknowledge the message, because otherwise otherwise we'll
// just keep getting delivered a faulty message over and over again.
return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam
}
return true
}

View file

@ -219,7 +219,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
} }
// Finally, generate a notification that we updated the keys. // Finally, generate a notification that we updated the keys.
if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
update := eduserverAPI.CrossSigningKeyUpdate{ update := eduserverAPI.CrossSigningKeyUpdate{
UserID: req.UserID, UserID: req.UserID,
} }
@ -239,7 +238,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
return return
} }
} }
}
func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) { func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) {
// Before we do anything, we need the master and self-signing keys for this user. // Before we do anything, we need the master and self-signing keys for this user.
@ -310,7 +308,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
// Finally, generate a notification that we updated the signatures. // Finally, generate a notification that we updated the signatures.
for userID := range req.Signatures { for userID := range req.Signatures {
if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer {
update := eduserverAPI.CrossSigningKeyUpdate{ update := eduserverAPI.CrossSigningKeyUpdate{
UserID: userID, UserID: userID,
} }
@ -322,7 +319,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
} }
} }
} }
}
func (a *KeyInternalAPI) processSelfSignatures( func (a *KeyInternalAPI) processSelfSignatures(
ctx context.Context, ctx context.Context,

View file

@ -18,7 +18,6 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/consumers"
"github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/producers"
@ -65,12 +64,5 @@ func NewInternalAPI(
} }
}() }()
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
base.ProcessContext, base.Cfg, js, db, ap,
)
if err := keyconsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
}
return ap return ap
} }

View file

@ -195,10 +195,27 @@ func (r *Inputer) processRoomEvent(
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
for _, authEventID := range authEventIDs { for _, authEventID := range authEventIDs {
if _, ok := knownEvents[authEventID]; !ok { if _, ok := knownEvents[authEventID]; !ok {
return rollbackTransaction, fmt.Errorf("missing auth event %s", authEventID) // Unknown auth events only really matter if the event actually failed
// auth. If it passed auth then we can assume that everything that was
// known was sufficient, even if extraneous auth events were specified
// but weren't found.
if isRejected {
if event.StateKey() != nil {
return commitTransaction, fmt.Errorf(
"missing auth event %s for state event %s (type %q, state key %q)",
authEventID, event.EventID(), event.Type(), *event.StateKey(),
)
} else {
return commitTransaction, fmt.Errorf(
"missing auth event %s for timeline event %s (type %q)",
authEventID, event.EventID(), event.Type(),
)
} }
}
} else {
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID) authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
} }
}
var softfail bool var softfail bool
if input.Kind == api.KindNew { if input.Kind == api.KindNew {
@ -416,6 +433,10 @@ func (r *Inputer) fetchAuthEvents(
return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers) return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers)
} }
// Reuse these to reduce allocations.
authEventNIDs := make([]types.EventNID, 0, 5)
isRejected := false
nextAuthEvent:
for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering( for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering(
res.AuthEvents.UntrustedEvents(event.RoomVersion), res.AuthEvents.UntrustedEvents(event.RoomVersion),
gomatrixserverlib.TopologicalOrderByAuthEvents, gomatrixserverlib.TopologicalOrderByAuthEvents,
@ -424,36 +445,30 @@ func (r *Inputer) fetchAuthEvents(
// need to store it again or do anything further with it, so just skip // need to store it again or do anything further with it, so just skip
// over it rather than wasting cycles. // over it rather than wasting cycles.
if ev, ok := known[authEvent.EventID()]; ok && ev != nil { if ev, ok := known[authEvent.EventID()]; ok && ev != nil {
continue continue nextAuthEvent
} }
// Check the signatures of the event. If this fails then we'll simply // Check the signatures of the event. If this fails then we'll simply
// skip it, because gomatrixserverlib.Allowed() will notice a problem // skip it, because gomatrixserverlib.Allowed() will notice a problem
// if a critical event is missing anyway. // if a critical event is missing anyway.
if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
continue continue nextAuthEvent
} }
// In order to store the new auth event, we need to know its auth chain // In order to store the new auth event, we need to know its auth chain
// as NIDs for the `auth_event_nids` column. Let's see if we can find those. // as NIDs for the `auth_event_nids` column. Let's see if we can find those.
authEventNIDs := make([]types.EventNID, 0, len(authEvent.AuthEventIDs())) authEventNIDs = authEventNIDs[:0]
for _, eventID := range authEvent.AuthEventIDs() { for _, eventID := range authEvent.AuthEventIDs() {
knownEvent, ok := known[eventID] knownEvent, ok := known[eventID]
if !ok { if !ok {
return fmt.Errorf("missing auth event %s for %s", eventID, authEvent.EventID()) continue nextAuthEvent
} }
authEventNIDs = append(authEventNIDs, knownEvent.EventNID) authEventNIDs = append(authEventNIDs, knownEvent.EventNID)
} }
// Let's take a note of the fact that we now know about this event.
if err := auth.AddEvent(authEvent); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
// Check if the auth event should be rejected. // Check if the auth event should be rejected.
isRejected := false err := gomatrixserverlib.Allowed(authEvent, auth)
if err := gomatrixserverlib.Allowed(authEvent, auth); err != nil { if isRejected = err != nil; isRejected {
isRejected = true
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID()) logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
} }
@ -463,6 +478,14 @@ func (r *Inputer) fetchAuthEvents(
return fmt.Errorf("updater.StoreEvent: %w", err) return fmt.Errorf("updater.StoreEvent: %w", err)
} }
// Let's take a note of the fact that we now know about this event for
// authenticating future events.
if !isRejected {
if err := auth.AddEvent(authEvent); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
}
// Now we know about this event, it was stored and the signatures were OK. // Now we know about this event, it was stored and the signatures were OK.
known[authEvent.EventID()] = &types.Event{ known[authEvent.EventID()] = &types.Event{
EventNID: eventNID, EventNID: eventNID,

View file

@ -71,10 +71,10 @@ CREATE TABLE IF NOT EXISTS roomserver_events (
` `
const insertEventSQL = "" + const insertEventSQL = "" +
"INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + "INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" +
" ON CONFLICT ON CONSTRAINT roomserver_event_id_unique" + " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique DO UPDATE" +
" DO NOTHING" + " SET is_rejected = $8 WHERE e.is_rejected = FALSE" +
" RETURNING event_nid, state_snapshot_nid" " RETURNING event_nid, state_snapshot_nid"
const selectEventSQL = "" + const selectEventSQL = "" +

View file

@ -49,7 +49,8 @@ const eventsSchema = `
const insertEventSQL = ` const insertEventSQL = `
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected) INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO NOTHING ON CONFLICT DO UPDATE
SET is_rejected = $8 WHERE is_rejected = 0
RETURNING event_nid, state_snapshot_nid; RETURNING event_nid, state_snapshot_nid;
` `

View file

@ -282,6 +282,8 @@ func membershipEvents(res *types.Response) (joinUserIDs, leaveUserIDs []string)
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil { if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil {
if strings.Contains(string(ev.Content), `"join"`) { if strings.Contains(string(ev.Content), `"join"`) {
joinUserIDs = append(joinUserIDs, *ev.StateKey) joinUserIDs = append(joinUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"invite"`) {
joinUserIDs = append(joinUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"leave"`) { } else if strings.Contains(string(ev.Content), `"leave"`) {
leaveUserIDs = append(leaveUserIDs, *ev.StateKey) leaveUserIDs = append(leaveUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"ban"`) { } else if strings.Contains(string(ev.Content), `"ban"`) {

View file

@ -590,3 +590,4 @@ Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9 Can receive redactions from regular users over federation in room version 9
Forward extremities remain so even after the next events are populated as outliers Forward extremities remain so even after the next events are populated as outliers
If a device list update goes missing, the server resyncs on the next one If a device list update goes missing, the server resyncs on the next one
uploading self-signing key notifies over federation