From 1b90cc95367947fa00616b4426d0c894b33c9862 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Wed, 15 Jun 2022 12:50:02 +0200 Subject: [PATCH 01/12] Fix rare panic when returning user devices over federation (#2534) --- federationapi/routing/devices.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/federationapi/routing/devices.go b/federationapi/routing/devices.go index 1a092645f..2f9da1f25 100644 --- a/federationapi/routing/devices.go +++ b/federationapi/routing/devices.go @@ -85,6 +85,9 @@ func GetUserDevices( if targetKey, ok := targetUser[gomatrixserverlib.KeyID(dev.DeviceID)]; ok { for sourceUserID, forSourceUser := range targetKey { for sourceKeyID, sourceKey := range forSourceUser { + if device.Keys.Signatures == nil { + device.Keys.Signatures = map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{} + } if _, ok := device.Keys.Signatures[sourceUserID]; !ok { device.Keys.Signatures[sourceUserID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{} } From 7120eb6bc943af6f725b0c61cfd110330f04064a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 15 Jun 2022 14:27:07 +0100 Subject: [PATCH 02/12] Add `InputDeviceListUpdate` to the keyserver, remove old input API (#2536) * Add `InputDeviceListUpdate` to the keyserver, remove old input API * Fix copyright * Log more information when a device list update fails --- federationapi/federationapi.go | 1 + federationapi/producers/syncapi.go | 17 +++++ federationapi/routing/send.go | 8 +-- keyserver/api/api.go | 10 --- keyserver/consumers/devicelistupdate.go | 82 +++++++++++++++++++++++++ keyserver/internal/internal.go | 11 ---- keyserver/inthttp/client.go | 14 ----- keyserver/inthttp/server.go | 11 ---- keyserver/keyserver.go | 10 ++- setup/jetstream/streams.go | 6 ++ syncapi/internal/keychange_test.go | 3 - 11 files changed, 117 insertions(+), 56 deletions(-) create mode 100644 keyserver/consumers/devicelistupdate.go diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index ff159beea..97bcc12a5 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -63,6 +63,7 @@ func AddPublicRoutes( TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), ServerName: cfg.Matrix.ServerName, UserAPI: userAPI, } diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 494150036..6453d026c 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -17,6 +17,7 @@ package producers import ( "context" "encoding/json" + "fmt" "strconv" "time" @@ -34,6 +35,7 @@ type SyncAPIProducer struct { TopicSendToDeviceEvent string TopicTypingEvent string TopicPresenceEvent string + TopicDeviceListUpdate string JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName UserAPI userapi.UserInternalAPI @@ -161,3 +163,18 @@ func (p *SyncAPIProducer) SendPresence( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendDeviceListUpdate( + ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent, +) (err error) { + m := nats.NewMsg(p.TopicDeviceListUpdate) + m.Header.Set(jetstream.UserID, deviceListUpdate.UserID) + m.Data, err = json.Marshal(deviceListUpdate) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + log.Debugf("Sending device list update: %+v", m.Header) + _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index c25dabce9..43003be38 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -501,11 +501,7 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli } else if serverName != t.Origin { return } - var inputRes keyapi.InputDeviceListUpdateResponse - t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{ - Event: payload, - }, &inputRes) - if inputRes.Error != nil { - util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate") + if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil { + util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate") } } diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 140f03569..c0a1eedbb 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -62,8 +62,6 @@ type FederationKeyAPI interface { QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse) QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse) - // InputDeviceListUpdate from a federated server EDU - InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse) PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) } @@ -337,11 +335,3 @@ type QuerySignaturesResponse struct { // The request error, if any Error *KeyError } - -type InputDeviceListUpdateRequest struct { - Event gomatrixserverlib.DeviceListUpdateEvent -} - -type InputDeviceListUpdateResponse struct { - Error *KeyError -} diff --git a/keyserver/consumers/devicelistupdate.go b/keyserver/consumers/devicelistupdate.go new file mode 100644 index 000000000..f4f246280 --- /dev/null +++ b/keyserver/consumers/devicelistupdate.go @@ -0,0 +1,82 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/keyserver/internal" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +// DeviceListUpdateConsumer consumes device list updates that came in over federation. +type DeviceListUpdateConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + updater *internal.DeviceListUpdater +} + +// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers. +func NewDeviceListUpdateConsumer( + process *process.ProcessContext, + cfg *config.KeyServer, + js nats.JetStreamContext, + updater *internal.DeviceListUpdater, +) *DeviceListUpdateConsumer { + return &DeviceListUpdateConsumer{ + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + updater: updater, + } +} + +// Start consuming from key servers +func (t *DeviceListUpdateConsumer) Start() error { + return jetstream.JetStreamConsumer( + t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +// onMessage is called in response to a message received on the +// key change events topic from the key server. +func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + var m gomatrixserverlib.DeviceListUpdateEvent + if err := json.Unmarshal(msg.Data, &m); err != nil { + logrus.WithError(err).Errorf("Failed to read from device list update input topic") + return true + } + err := t.updater.Update(ctx, m) + if err != nil { + logrus.WithFields(logrus.Fields{ + "user_id": m.UserID, + "device_id": m.DeviceID, + "stream_id": m.StreamID, + "prev_id": m.PrevID, + }).WithError(err).Errorf("Failed to update device list") + return false + } + return true +} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index f8d0d69c3..c146b2aa0 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) { a.UserAPI = i } -func (a *KeyInternalAPI) InputDeviceListUpdate( - ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, -) { - err := a.Updater.Update(ctx, req.Event) - if err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to update device list: %s", err), - } - } -} - func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset) if err != nil { diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go index abce81582..dac61d1ea 100644 --- a/keyserver/inthttp/client.go +++ b/keyserver/inthttp/client.go @@ -63,20 +63,6 @@ type httpKeyInternalAPI struct { func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) { // no-op: doesn't need it } -func (h *httpKeyInternalAPI) InputDeviceListUpdate( - ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, -) { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate") - defer span.Finish() - - apiURL := h.apiURL + InputDeviceListUpdatePath - err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) - if err != nil { - res.Error = &api.KeyError{ - Err: err.Error(), - } - } -} func (h *httpKeyInternalAPI) PerformClaimKeys( ctx context.Context, diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go index 8d557a768..5bf5976a8 100644 --- a/keyserver/inthttp/server.go +++ b/keyserver/inthttp/server.go @@ -25,17 +25,6 @@ import ( ) func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) { - internalAPIMux.Handle(InputDeviceListUpdatePath, - httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse { - request := api.InputDeviceListUpdateRequest{} - response := api.InputDeviceListUpdateResponse{} - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - s.InputDeviceListUpdate(req.Context(), &request, &response) - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) internalAPIMux.Handle(PerformClaimKeysPath, httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse { request := api.PerformClaimKeysRequest{} diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 3ffd3ba1e..cd506f981 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,6 +18,7 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" @@ -59,10 +60,17 @@ func NewInternalAPI( updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable ap.Updater = updater go func() { - if err := updater.Start(); err != nil { + if err = updater.Start(); err != nil { logrus.WithError(err).Panicf("failed to start device list updater") } }() + dlConsumer := consumers.NewDeviceListUpdateConsumer( + base.ProcessContext, cfg, js, updater, + ) + if err = dlConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start device list consumer") + } + return ap } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 6594e6941..110808b1b 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -16,6 +16,7 @@ const ( var ( InputRoomEvent = "InputRoomEvent" + InputDeviceListUpdate = "InputDeviceListUpdate" OutputRoomEvent = "OutputRoomEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent" @@ -45,6 +46,11 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: InputDeviceListUpdate, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, { Name: OutputRoomEvent, Retention: nats.InterestPolicy, diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index d9fb9cf82..219b35e2c 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -43,9 +43,6 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT } func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) { -} -func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) { - } func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) { } From 920a20821ba55a22248c5f78bb76b615fec60a7a Mon Sep 17 00:00:00 2001 From: Jean Lucas Date: Mon, 27 Jun 2022 04:15:19 -0400 Subject: [PATCH 03/12] Fix nats.go commit (#2540) Signed-off-by: Jean Lucas --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index b2a096751..1f2d5d9fd 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/matrix-org/dendrite replace github.com/nats-io/nats-server/v2 => github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f -replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e +replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 require ( github.com/Arceliar/ironwood v0.0.0-20220306165321-319147a02d98 diff --git a/go.sum b/go.sum index 3a35c47da..3ae7977f3 100644 --- a/go.sum +++ b/go.sum @@ -484,6 +484,8 @@ github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1: github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4= github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e h1:kNIzIzj2OvnlreA+sTJ12nWJzTP3OSLNKDL/Iq9mF6Y= github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M= +github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ= github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9/go.mod h1:NPHGhPc0/wudcaCqL/H5AOddkRf8GPRhzOujuUKGQu8= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= From 2086992caf67d033c42db3fb6bc5c2a294e72bfe Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Wed, 29 Jun 2022 10:49:12 +0200 Subject: [PATCH 04/12] Don't return `end` if there are not more messages (#2542) * Be more spec compliant * Move lazyLoadMembers to own method --- syncapi/routing/messages.go | 68 +++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index e55c661d6..24745cd55 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -50,7 +50,7 @@ type messagesReq struct { type messagesResp struct { Start string `json:"start"` StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token - End string `json:"end"` + End string `json:"end,omitempty"` Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` State []gomatrixserverlib.ClientEvent `json:"state"` } @@ -200,30 +200,6 @@ func OnIncomingMessagesRequest( return jsonerror.InternalServerError() } - // at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set - state := []gomatrixserverlib.ClientEvent{} - if filter.LazyLoadMembers { - membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent) - for _, evt := range clientEvents { - // Don't add membership events the client should already know about - if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached { - continue - } - membership, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user") - continue - } - if membership != nil { - membershipToUser[evt.Sender] = membership - lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID()) - } - } - for _, evt := range membershipToUser { - state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) - } - } - util.GetLogger(req.Context()).WithFields(logrus.Fields{ "from": from.String(), "to": to.String(), @@ -237,7 +213,13 @@ func OnIncomingMessagesRequest( Chunk: clientEvents, Start: start.String(), End: end.String(), - State: state, + } + res.applyLazyLoadMembers(req.Context(), db, roomID, device, filter.LazyLoadMembers, lazyLoadCache) + + // If we didn't return any events, set the end to an empty string, so it will be omitted + // in the response JSON. + if len(res.Chunk) == 0 { + res.End = "" } if fromStream != nil { res.StartStream = fromStream.String() @@ -250,6 +232,40 @@ func OnIncomingMessagesRequest( } } +// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has +// LazyLoadMembers enabled. +func (m *messagesResp) applyLazyLoadMembers( + ctx context.Context, + db storage.Database, + roomID string, + device *userapi.Device, + lazyLoad bool, + lazyLoadCache caching.LazyLoadCache, +) { + if !lazyLoad { + return + } + membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent) + for _, evt := range m.Chunk { + // Don't add membership events the client should already know about + if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached { + continue + } + membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user") + continue + } + if membership != nil { + membershipToUser[evt.Sender] = membership + lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID()) + } + } + for _, evt := range membershipToUser { + m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) + } +} + func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (forgotten bool, exists bool, err error) { req := api.QueryMembershipForUserRequest{ RoomID: roomID, From 2dea466685d0d4ab74d4cbd84af16b621d1269b3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 29 Jun 2022 12:32:24 +0100 Subject: [PATCH 05/12] Return an error if trying to invite a malformed user ID (#2543) --- roomserver/internal/perform/perform_invite.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go index b0148a314..644c954b6 100644 --- a/roomserver/internal/perform/perform_invite.go +++ b/roomserver/internal/perform/perform_invite.go @@ -56,7 +56,14 @@ func (r *Inviter) PerformInvite( return nil, fmt.Errorf("failed to load RoomInfo: %w", err) } - _, domain, _ := gomatrixserverlib.SplitID('@', targetUserID) + _, domain, err := gomatrixserverlib.SplitID('@', targetUserID) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("The user ID %q is invalid!", targetUserID), + } + return nil, nil + } isTargetLocal := domain == r.Cfg.Matrix.ServerName isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName From 519bc1124b051273019aae9b11617ebd796e962f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 29 Jun 2022 15:29:39 +0100 Subject: [PATCH 06/12] Add `evacuateUser` endpoint, use it when deactivating accounts (#2545) * Add `evacuateUser` endpoint, use it when deactivating accounts * Populate the API * Clean up user devices when deactivating * Include invites, delete pushers --- clientapi/routing/admin.go | 37 +++++++++++ clientapi/routing/routing.go | 6 ++ docs/administration/4_adminapi.md | 6 ++ roomserver/api/api.go | 8 +-- roomserver/api/api_trace.go | 9 +++ roomserver/api/perform.go | 9 +++ roomserver/internal/api.go | 1 + roomserver/internal/perform/perform_admin.go | 70 ++++++++++++++++++++ roomserver/inthttp/client.go | 18 +++++ roomserver/inthttp/server.go | 11 +++ userapi/internal/api.go | 26 ++++++++ userapi/userapi.go | 1 + 12 files changed, 197 insertions(+), 5 deletions(-) diff --git a/clientapi/routing/admin.go b/clientapi/routing/admin.go index 125b3847d..523b88c99 100644 --- a/clientapi/routing/admin.go +++ b/clientapi/routing/admin.go @@ -47,3 +47,40 @@ func AdminEvacuateRoom(req *http.Request, device *userapi.Device, rsAPI roomserv }, } } + +func AdminEvacuateUser(req *http.Request, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse { + if device.AccountType != userapi.AccountTypeAdmin { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("This API can only be used by admin users."), + } + } + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + userID, ok := vars["userID"] + if !ok { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Expecting user ID."), + } + } + res := &roomserverAPI.PerformAdminEvacuateUserResponse{} + rsAPI.PerformAdminEvacuateUser( + req.Context(), + &roomserverAPI.PerformAdminEvacuateUserRequest{ + UserID: userID, + }, + res, + ) + if err := res.Error; err != nil { + return err.JSONResponse() + } + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "affected": res.Affected, + }, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index aa4b5a235..0460850ef 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -129,6 +129,12 @@ func Setup( }), ).Methods(http.MethodGet, http.MethodOptions) + dendriteAdminRouter.Handle("/admin/evacuateUser/{userID}", + httputil.MakeAuthAPI("admin_evacuate_user", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return AdminEvacuateUser(req, device, rsAPI) + }), + ).Methods(http.MethodGet, http.MethodOptions) + // server notifications if cfg.Matrix.ServerNotices.Enabled { logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice") diff --git a/docs/administration/4_adminapi.md b/docs/administration/4_adminapi.md index e33482ec9..51f56374b 100644 --- a/docs/administration/4_adminapi.md +++ b/docs/administration/4_adminapi.md @@ -19,6 +19,12 @@ This endpoint will instruct Dendrite to part all local users from the given `roo in the URL. It may take some time to complete. A JSON body will be returned containing the user IDs of all affected users. +## `/_dendrite/admin/evacuateUser/{userID}` + +This endpoint will instruct Dendrite to part the given local `userID` in the URL from +all rooms which they are currently joined. A JSON body will be returned containing +the room IDs of all affected rooms. + ## `/_synapse/admin/v1/register` Shared secret registration — please see the [user creation page](createusers) for diff --git a/roomserver/api/api.go b/roomserver/api/api.go index f87ff2962..38baa617f 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -140,11 +140,8 @@ type ClientRoomserverAPI interface { // PerformRoomUpgrade upgrades a room to a newer version PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) - PerformAdminEvacuateRoom( - ctx context.Context, - req *PerformAdminEvacuateRoomRequest, - res *PerformAdminEvacuateRoomResponse, - ) + PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) + PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error @@ -161,6 +158,7 @@ type UserRoomserverAPI interface { QueryLatestEventsAndStateAPI QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error + PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) } type FederationRoomserverAPI interface { diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 92c5c1b1d..211f320ff 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -113,6 +113,15 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateRoom( util.GetLogger(ctx).Infof("PerformAdminEvacuateRoom req=%+v res=%+v", js(req), js(res)) } +func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser( + ctx context.Context, + req *PerformAdminEvacuateUserRequest, + res *PerformAdminEvacuateUserResponse, +) { + t.Impl.PerformAdminEvacuateUser(ctx, req, res) + util.GetLogger(ctx).Infof("PerformAdminEvacuateUser req=%+v res=%+v", js(req), js(res)) +} + func (t *RoomserverInternalAPITrace) PerformInboundPeek( ctx context.Context, req *PerformInboundPeekRequest, diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 30aa2cf1b..d9ea9dd1c 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -223,3 +223,12 @@ type PerformAdminEvacuateRoomResponse struct { Affected []string `json:"affected"` Error *PerformError } + +type PerformAdminEvacuateUserRequest struct { + UserID string `json:"user_id"` +} + +type PerformAdminEvacuateUserResponse struct { + Affected []string `json:"affected"` + Error *PerformError +} diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index afef52da4..acdaeef6f 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -170,6 +170,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio Cfg: r.Cfg, Inputer: r.Inputer, Queryer: r.Queryer, + Leaver: r.Leaver, } if err := r.Inputer.Start(); err != nil { diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 2de6477cc..d3fb71099 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -16,6 +16,7 @@ package perform import ( "context" + "database/sql" "encoding/json" "fmt" "time" @@ -34,6 +35,7 @@ type Admin struct { Cfg *config.RoomServer Queryer *query.Queryer Inputer *input.Inputer + Leaver *Leaver } // PerformEvacuateRoom will remove all local users from the given room. @@ -160,3 +162,71 @@ func (r *Admin) PerformAdminEvacuateRoom( inputRes := &api.InputRoomEventsResponse{} r.Inputer.InputRoomEvents(ctx, inputReq, inputRes) } + +func (r *Admin) PerformAdminEvacuateUser( + ctx context.Context, + req *api.PerformAdminEvacuateUserRequest, + res *api.PerformAdminEvacuateUserResponse, +) { + _, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Malformed user ID: %s", err), + } + return + } + if domain != r.Cfg.Matrix.ServerName { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: "Can only evacuate local users using this endpoint", + } + return + } + + roomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Join) + if err != nil && err != sql.ErrNoRows { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err), + } + return + } + + inviteRoomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Invite) + if err != nil && err != sql.ErrNoRows { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err), + } + return + } + + for _, roomID := range append(roomIDs, inviteRoomIDs...) { + leaveReq := &api.PerformLeaveRequest{ + RoomID: roomID, + UserID: req.UserID, + } + leaveRes := &api.PerformLeaveResponse{} + outputEvents, err := r.Leaver.PerformLeave(ctx, leaveReq, leaveRes) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.Leaver.PerformLeave: %s", err), + } + return + } + if len(outputEvents) == 0 { + continue + } + if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err), + } + return + } + + res.Affected = append(res.Affected, roomID) + } +} diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 7b10ae657..2fa8afc49 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -40,6 +40,7 @@ const ( RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" RoomserverPerformForgetPath = "/roomserver/performForget" RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom" + RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -305,6 +306,23 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom( } } +func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser( + ctx context.Context, + req *api.PerformAdminEvacuateUserRequest, + res *api.PerformAdminEvacuateUserResponse, +) { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformAdminEvacuateUser") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverPerformAdminEvacuateUserPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) + if err != nil { + res.Error = &api.PerformError{ + Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err), + } + } +} + // QueryLatestEventsAndState implements RoomserverQueryAPI func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState( ctx context.Context, diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index ad4fdc469..993381585 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -129,6 +129,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(RoomserverPerformAdminEvacuateUserPath, + httputil.MakeInternalAPI("performAdminEvacuateUser", func(req *http.Request) util.JSONResponse { + var request api.PerformAdminEvacuateUserRequest + var response api.PerformAdminEvacuateUserResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + r.PerformAdminEvacuateUser(req.Context(), &request, &response) + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle( RoomserverQueryPublishedRoomsPath, httputil.MakeInternalAPI("queryPublishedRooms", func(req *http.Request) util.JSONResponse { diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 9d2f63c72..27ed15a01 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/sqlutil" keyapi "github.com/matrix-org/dendrite/keyserver/api" + rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/producers" @@ -49,6 +50,7 @@ type UserInternalAPI struct { // AppServices is the list of all registered AS AppServices []config.ApplicationService KeyAPI keyapi.UserKeyAPI + RSAPI rsapi.UserRoomserverAPI } func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error { @@ -452,6 +454,30 @@ func (a *UserInternalAPI) queryAppServiceToken(ctx context.Context, token, appSe // PerformAccountDeactivation deactivates the user's account, removing all ability for the user to login again. func (a *UserInternalAPI) PerformAccountDeactivation(ctx context.Context, req *api.PerformAccountDeactivationRequest, res *api.PerformAccountDeactivationResponse) error { + evacuateReq := &rsapi.PerformAdminEvacuateUserRequest{ + UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName), + } + evacuateRes := &rsapi.PerformAdminEvacuateUserResponse{} + a.RSAPI.PerformAdminEvacuateUser(ctx, evacuateReq, evacuateRes) + if err := evacuateRes.Error; err != nil { + logrus.WithError(err).Errorf("Failed to evacuate user after account deactivation") + } + + deviceReq := &api.PerformDeviceDeletionRequest{ + UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName), + } + deviceRes := &api.PerformDeviceDeletionResponse{} + if err := a.PerformDeviceDeletion(ctx, deviceReq, deviceRes); err != nil { + return err + } + + pusherReq := &api.PerformPusherDeletionRequest{ + Localpart: req.Localpart, + } + if err := a.PerformPusherDeletion(ctx, pusherReq, &struct{}{}); err != nil { + return err + } + err := a.DB.DeactivateAccount(ctx, req.Localpart) res.AccountDeactivated = err == nil return err diff --git a/userapi/userapi.go b/userapi/userapi.go index 603b416bf..2e86d6aa7 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -78,6 +78,7 @@ func NewInternalAPI( ServerName: cfg.Matrix.ServerName, AppServices: appServices, KeyAPI: keyAPI, + RSAPI: rsAPI, DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, } From 561c159ad71d49da1eb16c492ef3e53fa876480b Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Thu, 30 Jun 2022 12:34:37 +0200 Subject: [PATCH 07/12] Silence presence logs (#2547) --- federationapi/consumers/presence.go | 2 +- federationapi/producers/syncapi.go | 2 +- syncapi/consumers/presence.go | 2 +- syncapi/streams/stream_presence.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go index bfce1b28b..a65d2aa04 100644 --- a/federationapi/consumers/presence.go +++ b/federationapi/consumers/presence.go @@ -133,7 +133,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) b return true } - log.Debugf("sending presence EDU to %d servers", len(joined)) + log.Tracef("sending presence EDU to %d servers", len(joined)) if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil { log.WithError(err).Error("failed to send EDU") return false diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 6453d026c..e371baaaa 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -159,7 +159,7 @@ func (p *SyncAPIProducer) SendPresence( lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond))) m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS))) - log.Debugf("Sending presence to syncAPI: %+v", m.Header) + log.Tracef("Sending presence to syncAPI: %+v", m.Header) _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index bfd72d604..0217e1956 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -138,7 +138,7 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { presence := msg.Header.Get("presence") timestamp := msg.Header.Get("last_active_ts") fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync")) - logrus.Debugf("syncAPI received presence event: %+v", msg.Header) + logrus.Tracef("syncAPI received presence event: %+v", msg.Header) if fromSync { // do not process local presence changes; we already did this synchronously. return true diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 35ce53cb6..877bcf141 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -111,7 +111,7 @@ func (p *PresenceStreamProvider) IncrementalSync( currentlyActive := prevPresence.CurrentlyActive() skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID if skip { - req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID) + req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID) continue } } From 54bed4c5937c82e8565ed7839d711dc498848500 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 1 Jul 2022 09:37:54 +0100 Subject: [PATCH 08/12] Blacklist `Guest users can join guest_access rooms` test until it can be investigated --- sytest-blacklist | 1 + sytest-whitelist | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/sytest-blacklist b/sytest-blacklist index be0826eee..bcc345f6e 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -48,3 +48,4 @@ Notifications can be viewed with GET /notifications # More flakey If remote user leaves room we no longer receive device updates +Guest users can join guest_access rooms diff --git a/sytest-whitelist b/sytest-whitelist index 60a3b73f6..ea25c75d0 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -241,7 +241,6 @@ Inbound federation can receive v2 /send_join Message history can be paginated Backfill works correctly with history visibility set to joined Guest user cannot call /events globally -Guest users can join guest_access rooms Guest user can set display names Guest user cannot upgrade other users Guest non-joined user cannot call /events on shared room From 086f182e24e0651d1320199e90215f280350ef44 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 1 Jul 2022 09:50:06 +0100 Subject: [PATCH 09/12] Disable WebAssembly builds for now --- .github/workflows/dendrite.yml | 1 + build.sh | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index 5d60301c7..6ebef4e13 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -17,6 +17,7 @@ jobs: name: WASM build test timeout-minutes: 5 runs-on: ubuntu-latest + if: ${{ false }} # disable for now steps: - uses: actions/checkout@v2 diff --git a/build.sh b/build.sh index 700e6434f..f8b5001bf 100755 --- a/build.sh +++ b/build.sh @@ -21,4 +21,4 @@ mkdir -p bin CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/... -CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone +# CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone From 89cd0e8fc13b040470aebe2eb4d36a9235b1473d Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Fri, 1 Jul 2022 11:49:26 +0200 Subject: [PATCH 10/12] Try to fix backfilling (#2548) * Try to fix backfilling * Return start/end to not confuse clients * Update GMSL * Update GMSL --- go.mod | 2 +- go.sum | 6 +-- .../internal/perform/perform_backfill.go | 45 +++++++++++++------ syncapi/routing/context.go | 21 ++++++++- 4 files changed, 54 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index 1f2d5d9fd..4c432611e 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a + github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 diff --git a/go.sum b/go.sum index 3ae7977f3..777a31228 100644 --- a/go.sum +++ b/go.sum @@ -418,8 +418,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a h1:jOkrb6twViAGTHHadA51sQwdloHT0Vx1MCptk9InTHo= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f h1:XF2+J6sOq07yhK1I7ItwsgRwXorjj7gqiCvgZ4dn8W8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= @@ -482,8 +482,6 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1:Fc+TjdV1mOy0oISSzfoxNWdTqjg7tN/Vdgf+B2cwvdo= github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4= -github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e h1:kNIzIzj2OvnlreA+sTJ12nWJzTP3OSLNKDL/Iq9mF6Y= -github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M= github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ= diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 1bc4c75ce..9eddca733 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/getsentry/sentry-go" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -206,8 +207,17 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom } logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result) for _, res := range result { - if res.Error != nil { - logger.WithError(res.Error).Warn("event failed PDU checks") + switch err := res.Error.(type) { + case nil: + case gomatrixserverlib.SignatureErr: + // The signature of the event might not be valid anymore, for example if + // the key ID was reused with a different signature. + logger.WithError(err).Errorf("event failed PDU checks, storing anyway") + case gomatrixserverlib.AuthChainErr, gomatrixserverlib.AuthRulesErr: + logger.WithError(err).Warn("event failed PDU checks") + continue + default: + logger.WithError(err).Warn("event failed PDU checks") continue } missingMap[id] = res.Event @@ -306,6 +316,7 @@ FederationHit: b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res return res, nil } + sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit return nil, lastErr } @@ -366,19 +377,25 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr } } - c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fsAPI, - RememberAuthEvents: false, - Server: b.servers[0], + var lastErr error + for _, srv := range b.servers { + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fsAPI, + RememberAuthEvents: false, + Server: srv, + } + result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) + if err != nil { + lastErr = err + continue + } + for eventID, ev := range result { + b.eventIDMap[eventID] = ev + } + return result, nil } - result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) - if err != nil { - return nil, err - } - for eventID, ev := range result { - b.eventIDMap[eventID] = ev - } - return result, nil + sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit + return nil, lastErr } // ServersAtEvent is called when trying to determine which server to request from. diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index d021d365d..f6b4d15e0 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -15,6 +15,7 @@ package routing import ( + "context" "database/sql" "encoding/json" "fmt" @@ -25,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" roomserver "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -149,13 +151,30 @@ func Context( if len(response.State) > filter.Limit { response.State = response.State[len(response.State)-filter.Limit:] } - + start, end, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter) + if err == nil { + response.End = end.String() + response.Start = start.String() + } return util.JSONResponse{ Code: http.StatusOK, JSON: response, } } +func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { + if len(startEvents) > 0 { + start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID()) + if err != nil { + return + } + } + if len(endEvents) > 0 { + end, err = syncDB.EventPositionInTopology(ctx, endEvents[0].EventID()) + } + return +} + func applyLazyLoadMembers( device *userapi.Device, filter *gomatrixserverlib.RoomEventFilter, From b50a24c666c4c45e1410dfc35d5ab2dc7e530a0f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 1 Jul 2022 10:54:07 +0100 Subject: [PATCH 11/12] Roomserver producers package (#2546) * Give the roomserver a producers package * Change init point * Populate ACLs API * Fix build issues * `RoomEventProducer` naming --- roomserver/internal/alias.go | 3 +- roomserver/internal/api.go | 46 +++++----- roomserver/internal/input/input.go | 87 +++--------------- roomserver/internal/input/input_events.go | 4 +- .../internal/input/input_latest_events.go | 2 +- roomserver/internal/perform/perform_admin.go | 2 +- .../internal/perform/perform_inbound_peek.go | 2 +- roomserver/internal/perform/perform_peek.go | 2 +- roomserver/internal/perform/perform_unpeek.go | 2 +- roomserver/producers/roomevent.go | 89 +++++++++++++++++++ roomserver/roomserver.go | 1 - 11 files changed, 137 insertions(+), 103 deletions(-) create mode 100644 roomserver/producers/roomevent.go diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index f47ae47fe..175bb9310 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( return err } - err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) + err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) if err != nil { return err } - } } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index acdaeef6f..d59b8be7a 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -12,8 +12,10 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/perform" "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -49,17 +51,21 @@ type RoomserverInternalAPI struct { JetStream nats.JetStreamContext Durable string InputRoomEventTopic string // JetStream topic for new input room events - OutputRoomEventTopic string // JetStream topic for new output room events + OutputProducer *producers.RoomEventProducer PerspectiveServerNames []gomatrixserverlib.ServerName } func NewRoomserverAPI( processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database, - consumer nats.JetStreamContext, nc *nats.Conn, - inputRoomEventTopic, outputRoomEventTopic string, + js nats.JetStreamContext, nc *nats.Conn, inputRoomEventTopic string, caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName, ) *RoomserverInternalAPI { serverACLs := acls.NewServerACLs(roomserverDB) + producer := &producers.RoomEventProducer{ + Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent)), + JetStream: js, + ACLs: serverACLs, + } a := &RoomserverInternalAPI{ ProcessContext: processCtx, DB: roomserverDB, @@ -68,8 +74,8 @@ func NewRoomserverAPI( ServerName: cfg.Matrix.ServerName, PerspectiveServerNames: perspectiveServerNames, InputRoomEventTopic: inputRoomEventTopic, - OutputRoomEventTopic: outputRoomEventTopic, - JetStream: consumer, + OutputProducer: producer, + JetStream: js, NATSClient: nc, Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), ServerACLs: serverACLs, @@ -92,19 +98,19 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio r.KeyRing = keyRing r.Inputer = &input.Inputer{ - Cfg: r.Cfg, - ProcessContext: r.ProcessContext, - DB: r.DB, - InputRoomEventTopic: r.InputRoomEventTopic, - OutputRoomEventTopic: r.OutputRoomEventTopic, - JetStream: r.JetStream, - NATSClient: r.NATSClient, - Durable: nats.Durable(r.Durable), - ServerName: r.Cfg.Matrix.ServerName, - FSAPI: fsAPI, - KeyRing: keyRing, - ACLs: r.ServerACLs, - Queryer: r.Queryer, + Cfg: r.Cfg, + ProcessContext: r.ProcessContext, + DB: r.DB, + InputRoomEventTopic: r.InputRoomEventTopic, + OutputProducer: r.OutputProducer, + JetStream: r.JetStream, + NATSClient: r.NATSClient, + Durable: nats.Durable(r.Durable), + ServerName: r.Cfg.Matrix.ServerName, + FSAPI: fsAPI, + KeyRing: keyRing, + ACLs: r.ServerACLs, + Queryer: r.Queryer, } r.Inviter = &perform.Inviter{ DB: r.DB, @@ -199,7 +205,7 @@ func (r *RoomserverInternalAPI) PerformInvite( if len(outputEvents) == 0 { return nil } - return r.WriteOutputEvents(req.Event.RoomID(), outputEvents) + return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents) } func (r *RoomserverInternalAPI) PerformLeave( @@ -215,7 +221,7 @@ func (r *RoomserverInternalAPI) PerformLeave( if len(outputEvents) == 0 { return nil } - return r.WriteOutputEvents(req.RoomID, outputEvents) + return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents) } func (r *RoomserverInternalAPI) PerformForget( diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 600994c5a..fa07c1d2b 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" @@ -37,16 +38,8 @@ import ( "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" - "github.com/tidwall/gjson" ) -var keyContentFields = map[string]string{ - "m.room.join_rules": "join_rule", - "m.room.history_visibility": "history_visibility", - "m.room.member": "membership", -} - // Inputer is responsible for consuming from the roomserver input // streams and processing the events. All input events are queued // into a single NATS stream and the order is preserved strictly. @@ -75,19 +68,19 @@ var keyContentFields = map[string]string{ // up, so they will do nothing until a new event comes in for B // or C. type Inputer struct { - Cfg *config.RoomServer - ProcessContext *process.ProcessContext - DB storage.Database - NATSClient *nats.Conn - JetStream nats.JetStreamContext - Durable nats.SubOpt - ServerName gomatrixserverlib.ServerName - FSAPI fedapi.RoomserverFederationAPI - KeyRing gomatrixserverlib.JSONVerifier - ACLs *acls.ServerACLs - InputRoomEventTopic string - OutputRoomEventTopic string - workers sync.Map // room ID -> *worker + Cfg *config.RoomServer + ProcessContext *process.ProcessContext + DB storage.Database + NATSClient *nats.Conn + JetStream nats.JetStreamContext + Durable nats.SubOpt + ServerName gomatrixserverlib.ServerName + FSAPI fedapi.RoomserverFederationAPI + KeyRing gomatrixserverlib.JSONVerifier + ACLs *acls.ServerACLs + InputRoomEventTopic string + OutputProducer *producers.RoomEventProducer + workers sync.Map // room ID -> *worker Queryer *query.Queryer } @@ -370,58 +363,6 @@ func (r *Inputer) InputRoomEvents( } } -// WriteOutputEvents implements OutputRoomEventWriter -func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { - var err error - for _, update := range updates { - msg := &nats.Msg{ - Subject: r.OutputRoomEventTopic, - Header: nats.Header{}, - } - msg.Header.Set(jetstream.RoomID, roomID) - msg.Data, err = json.Marshal(update) - if err != nil { - return err - } - logger := log.WithFields(log.Fields{ - "room_id": roomID, - "type": update.Type, - }) - if update.NewRoomEvent != nil { - eventType := update.NewRoomEvent.Event.Type() - logger = logger.WithFields(log.Fields{ - "event_type": eventType, - "event_id": update.NewRoomEvent.Event.EventID(), - "adds_state": len(update.NewRoomEvent.AddsStateEventIDs), - "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs), - "send_as_server": update.NewRoomEvent.SendAsServer, - "sender": update.NewRoomEvent.Event.Sender(), - }) - if update.NewRoomEvent.Event.StateKey() != nil { - logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey()) - } - contentKey := keyContentFields[eventType] - if contentKey != "" { - value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey) - if value.Exists() { - logger = logger.WithField("content_value", value.String()) - } - } - - if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") { - ev := update.NewRoomEvent.Event.Unwrap() - defer r.ACLs.OnServerACLUpdate(ev) - } - } - logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic) - if _, err := r.JetStream.PublishMsg(msg); err != nil { - logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err) - return err - } - } - return nil -} - var roomserverInputBackpressure = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dendrite", diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index ff05f798c..743b1efe6 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent( return fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: - err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{ { Type: api.OutputTypeOldRoomEvent, OldRoomEvent: &api.OutputOldRoomEvent{ @@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent( // so notify downstream components to redact this event - they should have it if they've // been tracking our output log. if redactedEventID != "" { - err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{ { Type: api.OutputTypeRedactedEvent, RedactedEvent: &api.OutputRedactedEvent{ diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index e76f4ba8d..f7d15fdb5 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil { return fmt.Errorf("u.api.WriteOutputEvents: %w", err) } diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index d3fb71099..1cb52966a 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -219,7 +219,7 @@ func (r *Admin) PerformAdminEvacuateUser( if len(outputEvents) == 0 { continue } - if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil { + if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil { res.Error = &api.PerformError{ Code: api.PerformErrorBadRequest, Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err), diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index d19fc8386..32c81e849 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek( response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) } - err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{ { Type: api.OutputTypeNewInboundPeek, NewInboundPeek: &api.OutputNewInboundPeek{ diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go index 45e63888d..5560916b2 100644 --- a/roomserver/internal/perform/perform_peek.go +++ b/roomserver/internal/perform/perform_peek.go @@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID( // TODO: handle federated peeks - err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{ { Type: api.OutputTypeNewPeek, NewPeek: &api.OutputNewPeek{ diff --git a/roomserver/internal/perform/perform_unpeek.go b/roomserver/internal/perform/perform_unpeek.go index 1057499cb..1fe8d5a0f 100644 --- a/roomserver/internal/perform/perform_unpeek.go +++ b/roomserver/internal/perform/perform_unpeek.go @@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID( // TODO: handle federated peeks - err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{ { Type: api.OutputTypeRetirePeek, RetirePeek: &api.OutputRetirePeek{ diff --git a/roomserver/producers/roomevent.go b/roomserver/producers/roomevent.go new file mode 100644 index 000000000..987e6c942 --- /dev/null +++ b/roomserver/producers/roomevent.go @@ -0,0 +1,89 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/roomserver/acls" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" +) + +var keyContentFields = map[string]string{ + "m.room.join_rules": "join_rule", + "m.room.history_visibility": "history_visibility", + "m.room.member": "membership", +} + +type RoomEventProducer struct { + Topic string + ACLs *acls.ServerACLs + JetStream nats.JetStreamContext +} + +func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error { + var err error + for _, update := range updates { + msg := &nats.Msg{ + Subject: r.Topic, + Header: nats.Header{}, + } + msg.Header.Set(jetstream.RoomID, roomID) + msg.Data, err = json.Marshal(update) + if err != nil { + return err + } + logger := log.WithFields(log.Fields{ + "room_id": roomID, + "type": update.Type, + }) + if update.NewRoomEvent != nil { + eventType := update.NewRoomEvent.Event.Type() + logger = logger.WithFields(log.Fields{ + "event_type": eventType, + "event_id": update.NewRoomEvent.Event.EventID(), + "adds_state": len(update.NewRoomEvent.AddsStateEventIDs), + "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs), + "send_as_server": update.NewRoomEvent.SendAsServer, + "sender": update.NewRoomEvent.Event.Sender(), + }) + if update.NewRoomEvent.Event.StateKey() != nil { + logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey()) + } + contentKey := keyContentFields[eventType] + if contentKey != "" { + value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey) + if value.Exists() { + logger = logger.WithField("content_value", value.String()) + } + } + + if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") { + ev := update.NewRoomEvent.Event.Unwrap() + defer r.ACLs.OnServerACLUpdate(ev) + } + } + logger.Tracef("Producing to topic '%s'", r.Topic) + if _, err := r.JetStream.PublishMsg(msg); err != nil { + logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err) + return err + } + } + return nil +} diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 1480e8942..eb68100fe 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -55,7 +55,6 @@ func NewInternalAPI( return internal.NewRoomserverAPI( base.ProcessContext, cfg, roomserverDB, js, nc, cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), - cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), base.Caches, perspectiveServerNames, ) } From b5c55faf9886bd66a33e5555ad0bb20465bf08f7 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 1 Jul 2022 12:00:32 +0100 Subject: [PATCH 12/12] Version 0.8.9 (#2549) * Version 0.8.9 * Update changelog --- CHANGES.md | 17 +++++++++++++++++ internal/version.go | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 0db25f05a..3df03b2f6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,22 @@ # Changelog +## Dendrite 0.8.9 (2022-07-01) + +### Features + +* Incoming device list updates over federation are now queued in JetStream for processing so that they will no longer block incoming federation transactions and should never end up dropped, which will hopefully help E2EE reliability +* The `/context` endpoint now returns `"start"` and `"end"` parameters to allow pagination from a context call +* The `/messages` endpoint will no longer return `"end"` when there are no more messages remaining +* Deactivated user accounts will now leave all rooms automatically +* New admin endpoint `/_dendrite/admin/evacuateUser/{userID}` has been added for forcing a local user to leave all joined rooms +* Dendrite will now automatically attempt to raise the file descriptor limit at startup if it is too low + +### Fixes + +* A rare crash when retrieving remote device lists has been fixed +* Fixes a bug where events were not redacted properly over federation +* The `/invite` endpoints will now return an error instead of silently proceeding if the user ID is obviously malformed + ## Dendrite 0.8.8 (2022-06-09) ### Features diff --git a/internal/version.go b/internal/version.go index e29996f36..9568f08cb 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 8 - VersionPatch = 8 + VersionPatch = 9 VersionTag = "" // example: "rc1" )