From 07a4ca968cccc06cd7b1a99e1ce8c30db60acc13 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 6 Dec 2022 13:59:20 -0700 Subject: [PATCH] Hook in mailserver retrieval to pinecone demos --- build/gobind-pinecone/monolith.go | 109 ++++-- cmd/dendrite-demo-pinecone/main.go | 87 ++++- federationapi/api/api.go | 56 ++- federationapi/internal/perform.go | 193 +++++++--- federationapi/internal/transactionrequest.go | 361 +++++++++++++++++++ federationapi/inthttp/client.go | 74 ++-- federationapi/queue/destinationqueue.go | 3 +- federationapi/routing/asyncevents.go | 2 + federationapi/routing/routing.go | 2 +- federationapi/routing/send.go | 341 +----------------- federationapi/routing/send_test.go | 33 +- 11 files changed, 805 insertions(+), 456 deletions(-) create mode 100644 federationapi/internal/transactionrequest.go diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index e8ed8fe85..d49ad20af 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -67,24 +67,27 @@ import ( ) const ( - PeerTypeRemote = pineconeRouter.PeerTypeRemote - PeerTypeMulticast = pineconeRouter.PeerTypeMulticast - PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth - PeerTypeBonjour = pineconeRouter.PeerTypeBonjour + PeerTypeRemote = pineconeRouter.PeerTypeRemote + PeerTypeMulticast = pineconeRouter.PeerTypeMulticast + PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth + PeerTypeBonjour = pineconeRouter.PeerTypeBonjour + mailserverRetryInterval = time.Second * 30 ) type DendriteMonolith struct { - logger logrus.Logger - PineconeRouter *pineconeRouter.Router - PineconeMulticast *pineconeMulticast.Multicast - PineconeQUIC *pineconeSessions.Sessions - PineconeManager *pineconeConnections.ConnectionManager - StorageDirectory string - CacheDirectory string - listener net.Listener - httpServer *http.Server - processContext *process.ProcessContext - userAPI userapiAPI.UserInternalAPI + logger logrus.Logger + PineconeRouter *pineconeRouter.Router + PineconeMulticast *pineconeMulticast.Multicast + PineconeQUIC *pineconeSessions.Sessions + PineconeManager *pineconeConnections.ConnectionManager + StorageDirectory string + CacheDirectory string + listener net.Listener + httpServer *http.Server + processContext *process.ProcessContext + userAPI userapiAPI.UserInternalAPI + federationAPI api.FederationInternalAPI + mailserversQueried map[gomatrixserverlib.ServerName]bool } func (m *DendriteMonolith) PublicKey() string { @@ -346,11 +349,11 @@ func (m *DendriteMonolith) Start() { rsAPI := roomserver.NewInternalAPI(base) - fsAPI := federationapi.NewInternalAPI( + m.federationAPI = federationapi.NewInternalAPI( base, federation, rsAPI, base.Caches, keyRing, true, ) - keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, m.federationAPI) m.userAPI = userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(m.userAPI) @@ -358,10 +361,10 @@ func (m *DendriteMonolith) Start() { // The underlying roomserver implementation needs to be able to call the fedsender. // This is different to rsAPI which can be the http client which doesn't need this dependency - rsAPI.SetFederationAPI(fsAPI, keyRing) + rsAPI.SetFederationAPI(m.federationAPI, keyRing) userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation) - roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, m.federationAPI, federation) monolith := setup.Monolith{ Config: base.Cfg, @@ -370,7 +373,7 @@ func (m *DendriteMonolith) Start() { KeyRing: keyRing, AppserviceAPI: asAPI, - FederationAPI: fsAPI, + FederationAPI: m.federationAPI, RoomserverAPI: rsAPI, UserAPI: m.userAPI, KeyAPI: keyAPI, @@ -436,25 +439,45 @@ func (m *DendriteMonolith) Start() { go func(ch <-chan pineconeEvents.Event) { eLog := logrus.WithField("pinecone", "events") + mailserverSyncRunning := atomic.NewBool(false) + stopMailserverSync := make(chan bool) + + // Setup mailserver info + request := api.QueryMailserversRequest{Server: gomatrixserverlib.ServerName(m.PublicKey())} + response := api.QueryMailserversResponse{} + err := m.federationAPI.QueryMailservers(m.processContext.Context(), &request, &response) + if err != nil { + // TODO + } + m.mailserversQueried = make(map[gomatrixserverlib.ServerName]bool) + for _, server := range response.Mailservers { + m.mailserversQueried[server] = false + } for event := range ch { switch e := event.(type) { case pineconeEvents.PeerAdded: + if !mailserverSyncRunning.Load() { + go m.syncMailservers(stopMailserverSync, *mailserverSyncRunning) + } case pineconeEvents.PeerRemoved: + if mailserverSyncRunning.Load() && m.PineconeRouter.PeerCount(-1) == 0 { + stopMailserverSync <- true + } case pineconeEvents.TreeParentUpdate: case pineconeEvents.SnakeDescUpdate: case pineconeEvents.TreeRootAnnUpdate: case pineconeEvents.SnakeEntryAdded: case pineconeEvents.SnakeEntryRemoved: case pineconeEvents.BroadcastReceived: - eLog.Info("Broadcast received from: ", e.PeerID) + // eLog.Info("Broadcast received from: ", e.PeerID) req := &api.PerformWakeupServersRequest{ ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, } res := &api.PerformWakeupServersResponse{} - if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { - logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID) + if err := m.federationAPI.PerformWakeupServers(base.Context(), req, res); err != nil { + eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) } case pineconeEvents.BandwidthReport: default: @@ -463,6 +486,46 @@ func (m *DendriteMonolith) Start() { }(pineconeEventChannel) } +func (m *DendriteMonolith) syncMailservers(stop <-chan bool, running atomic.Bool) { + defer running.Store(false) + + t := time.NewTimer(mailserverRetryInterval) + for { + mailserversToQuery := []gomatrixserverlib.ServerName{} + for server, complete := range m.mailserversQueried { + if !complete { + mailserversToQuery = append(mailserversToQuery, server) + } + } + if len(mailserversToQuery) == 0 { + // All mailservers have been synced. + return + } + m.queryMailservers(mailserversToQuery) + t.Reset(mailserverRetryInterval) + + select { + case <-stop: + if !t.Stop() { + <-t.C + } + return + case <-t.C: + } + } +} + +func (m *DendriteMonolith) queryMailservers(mailservers []gomatrixserverlib.ServerName) { + for _, server := range mailservers { + request := api.PerformMailserverSyncRequest{Mailserver: server} + response := api.PerformMailserverSyncResponse{} + err := m.federationAPI.PerformMailserverSync(m.processContext.Context(), &request, &response) + if err == nil { + m.mailserversQueried[server] = true + } + } +} + func (m *DendriteMonolith) Stop() { m.processContext.ShutdownDendrite() _ = m.listener.Close() diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 2f647a41b..89c19c63d 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -48,6 +48,7 @@ import ( "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" @@ -66,6 +67,8 @@ var ( instanceDir = flag.String("dir", ".", "the directory to store the databases in (if --config not specified)") ) +const mailserverRetryInterval = time.Second * 30 + // nolint:gocyclo func main() { flag.Parse() @@ -305,25 +308,41 @@ func main() { go func(ch <-chan pineconeEvents.Event) { eLog := logrus.WithField("pinecone", "events") + mailserverSyncRunning := atomic.NewBool(false) + stopMailserverSync := make(chan bool) + + m := MailserverRetriever{ + Context: context.Background(), + ServerName: gomatrixserverlib.ServerName(pRouter.PublicKey().String()), + FederationAPI: fsAPI, + MailserversQueried: make(map[gomatrixserverlib.ServerName]bool), + } + m.InitializeMailservers(eLog) for event := range ch { switch e := event.(type) { case pineconeEvents.PeerAdded: + if !mailserverSyncRunning.Load() { + go m.syncMailservers(stopMailserverSync, *mailserverSyncRunning) + } case pineconeEvents.PeerRemoved: + if mailserverSyncRunning.Load() && pRouter.PeerCount(-1) == 0 { + stopMailserverSync <- true + } case pineconeEvents.TreeParentUpdate: case pineconeEvents.SnakeDescUpdate: case pineconeEvents.TreeRootAnnUpdate: case pineconeEvents.SnakeEntryAdded: case pineconeEvents.SnakeEntryRemoved: case pineconeEvents.BroadcastReceived: - eLog.Info("Broadcast received from: ", e.PeerID) + // eLog.Info("Broadcast received from: ", e.PeerID) req := &api.PerformWakeupServersRequest{ ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, } res := &api.PerformWakeupServersResponse{} if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { - logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID) + eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) } case pineconeEvents.BandwidthReport: default: @@ -333,3 +352,67 @@ func main() { base.WaitForShutdown() } + +type MailserverRetriever struct { + Context context.Context + ServerName gomatrixserverlib.ServerName + FederationAPI api.FederationInternalAPI + MailserversQueried map[gomatrixserverlib.ServerName]bool +} + +func (m *MailserverRetriever) InitializeMailservers(eLog *logrus.Entry) { + request := api.QueryMailserversRequest{Server: gomatrixserverlib.ServerName(m.ServerName)} + response := api.QueryMailserversResponse{} + err := m.FederationAPI.QueryMailservers(m.Context, &request, &response) + if err != nil { + // TODO + } + for _, server := range response.Mailservers { + m.MailserversQueried[server] = false + } + + eLog.Infof("Registered mailservers: %v", response.Mailservers) +} + +func (m *MailserverRetriever) syncMailservers(stop <-chan bool, running atomic.Bool) { + defer running.Store(false) + + t := time.NewTimer(mailserverRetryInterval) + for { + mailserversToQuery := []gomatrixserverlib.ServerName{} + for server, complete := range m.MailserversQueried { + if !complete { + mailserversToQuery = append(mailserversToQuery, server) + } + } + if len(mailserversToQuery) == 0 { + // All mailservers have been synced. + return + } + m.queryMailservers(mailserversToQuery) + t.Reset(mailserverRetryInterval) + + select { + case <-stop: + if !t.Stop() { + <-t.C + } + return + case <-t.C: + } + } +} + +func (m *MailserverRetriever) queryMailservers(mailservers []gomatrixserverlib.ServerName) { + logrus.Info("querying mailservers for async_events") + for _, server := range mailservers { + request := api.PerformMailserverSyncRequest{Mailserver: server} + response := api.PerformMailserverSyncResponse{} + err := m.FederationAPI.PerformMailserverSync(m.Context, &request, &response) + if err == nil { + m.MailserversQueried[server] = true + } else { + logrus.Errorf("Failed querying mailserver: %s", err.Error()) + } + } +} diff --git a/federationapi/api/api.go b/federationapi/api/api.go index 85fb09875..ef5b38b86 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -36,6 +36,18 @@ type FederationInternalAPI interface { request *PerformWakeupServersRequest, response *PerformWakeupServersResponse, ) error + + // Mailserver sync api used in the pinecone demos. + QueryMailservers( + ctx context.Context, + request *QueryMailserversRequest, + response *QueryMailserversResponse, + ) error + PerformMailserverSync( + ctx context.Context, + request *PerformMailserverSyncRequest, + response *PerformMailserverSyncResponse, + ) error } type MailserverAPI interface { @@ -239,6 +251,36 @@ type PerformBroadcastEDURequest struct { type PerformBroadcastEDUResponse struct { } +type PerformWakeupServersRequest struct { + ServerNames []gomatrixserverlib.ServerName `json:"server_names"` +} + +type PerformWakeupServersResponse struct { +} + +type InputPublicKeysRequest struct { + Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` +} + +type InputPublicKeysResponse struct { +} + +type QueryMailserversRequest struct { + Server gomatrixserverlib.ServerName +} + +type QueryMailserversResponse struct { + Mailservers []gomatrixserverlib.ServerName +} + +type PerformMailserverSyncRequest struct { + Mailserver gomatrixserverlib.ServerName +} + +type PerformMailserverSyncResponse struct { + SyncComplete bool +} + type PerformStoreAsyncRequest struct { Txn gomatrixserverlib.Transaction `json:"transaction"` UserID gomatrixserverlib.UserID `json:"user_id"` @@ -255,17 +297,3 @@ type QueryAsyncTransactionsResponse struct { Txn gomatrixserverlib.Transaction `json:"transaction"` RemainingCount uint32 `json:"remaining"` } - -type PerformWakeupServersRequest struct { - ServerNames []gomatrixserverlib.ServerName `json:"server_names"` -} - -type PerformWakeupServersResponse struct { -} - -type InputPublicKeysRequest struct { - Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` -} - -type InputPublicKeysResponse struct { -} diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index cf375f848..b36d5ce5d 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -15,6 +15,7 @@ import ( "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/storage/shared" + "github.com/matrix-org/dendrite/internal" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" ) @@ -696,60 +697,6 @@ func (r *FederationInternalAPI) PerformBroadcastEDU( return nil } -// PerformStoreAsync implements api.FederationInternalAPI -func (r *FederationInternalAPI) PerformStoreAsync( - ctx context.Context, - request *api.PerformStoreAsyncRequest, - response *api.PerformStoreAsyncResponse, -) error { - receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn) - if err != nil { - return err - } - err = r.db.AssociateAsyncTransactionWithDestinations( - ctx, - map[gomatrixserverlib.UserID]struct{}{ - request.UserID: {}, - }, - request.Txn.TransactionID, - receipt) - - return err -} - -// QueryAsyncTransactions implements api.FederationInternalAPI -func (r *FederationInternalAPI) QueryAsyncTransactions( - ctx context.Context, - request *api.QueryAsyncTransactionsRequest, - response *api.QueryAsyncTransactionsResponse, -) error { - transaction, receipt, err := r.db.GetAsyncTransaction(ctx, request.UserID) - if err != nil { - return err - } - - // TODO : Shouldn't be deleting unless the transaction was successfully returned... - // TODO : Should delete transaction json from table if no more associations - if transaction != nil && receipt != nil { - err = r.db.CleanAsyncTransactions(ctx, request.UserID, []*shared.Receipt{receipt}) - if err != nil { - return err - } - } - - // TODO : These db calls should happen at the same time right? - count, err := r.db.GetAsyncTransactionCount(ctx, request.UserID) - if err != nil { - return err - } - - response.RemainingCount = uint32(count) - if transaction != nil { - response.Txn = *transaction - } - return nil -} - // PerformWakeupServers implements api.FederationInternalAPI func (r *FederationInternalAPI) PerformWakeupServers( ctx context.Context, @@ -885,3 +832,141 @@ func federatedAuthProvider( return returning, nil } } + +// QueryMailservers implements api.FederationInternalAPI +func (r *FederationInternalAPI) QueryMailservers( + ctx context.Context, + request *api.QueryMailserversRequest, + response *api.QueryMailserversResponse, +) error { + logrus.Infof("Getting mailservers for: %s", request.Server) + mailservers, err := r.db.GetMailserversForServer(request.Server) + if err != nil { + return err + } + + response.Mailservers = mailservers + return nil +} + +// PerformMailserverSync implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformMailserverSync( + ctx context.Context, + request *api.PerformMailserverSyncRequest, + response *api.PerformMailserverSyncResponse, +) error { + userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.cfg.Matrix.ServerName), false) + if err != nil { + return err + } + + asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.Mailserver) + if err != nil { + logrus.Errorf("GetAsyncEvents: %s", err.Error()) + return err + } + r.processTransaction(&asyncResponse.Transaction) + + for asyncResponse.Remaining > 0 { + asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.Mailserver) + if err != nil { + logrus.Errorf("GetAsyncEvents: %s", err.Error()) + return err + } + r.processTransaction(&asyncResponse.Transaction) + } + + return nil +} + +func (r *FederationInternalAPI) processTransaction(txn *gomatrixserverlib.Transaction) { + logrus.Warn("Processing transaction from mailserver") + mu := internal.NewMutexByRoom() + // js, _ := base.NATS.Prepare(base.ProcessContext, &r.cfg.Matrix.JetStream) + // producer := &producers.SyncAPIProducer{ + // JetStream: js, + // TopicReceiptEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + // TopicSendToDeviceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + // TopicTypingEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + // TopicPresenceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + // TopicDeviceListUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + // TopicSigningKeyUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), + // Config: r.cfg, + // UserAPI: r.userAPI, + // } + t := NewTxnReq( + r.rsAPI, + nil, + r.cfg.Matrix.ServerName, + r.keyRing, + mu, + nil, + nil, // TODO : assign producer to process EDUs + r.cfg.Matrix.Presence.EnableInbound, + txn.PDUs, + txn.EDUs, + txn.Origin, + txn.TransactionID, + txn.Destination) + + t.ProcessTransaction(context.TODO()) +} + +// PerformStoreAsync implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformStoreAsync( + ctx context.Context, + request *api.PerformStoreAsyncRequest, + response *api.PerformStoreAsyncResponse, +) error { + logrus.Warnf("Storing transaction for %v", request.UserID) + receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn) + if err != nil { + return err + } + err = r.db.AssociateAsyncTransactionWithDestinations( + ctx, + map[gomatrixserverlib.UserID]struct{}{ + request.UserID: {}, + }, + request.Txn.TransactionID, + receipt) + + return err +} + +// QueryAsyncTransactions implements api.FederationInternalAPI +func (r *FederationInternalAPI) QueryAsyncTransactions( + ctx context.Context, + request *api.QueryAsyncTransactionsRequest, + response *api.QueryAsyncTransactionsResponse, +) error { + logrus.Warnf("Obtaining transaction for %v", request.UserID) + transaction, receipt, err := r.db.GetAsyncTransaction(ctx, request.UserID) + if err != nil { + return err + } + + // TODO : Shouldn't be deleting unless the transaction was successfully returned... + // TODO : Should delete transaction json from table if no more associations + if transaction != nil && receipt != nil { + err = r.db.CleanAsyncTransactions(ctx, request.UserID, []*shared.Receipt{receipt}) + if err != nil { + return err + } + + // TODO : Clean async transactions json + } + + // TODO : These db calls should happen at the same time right? + count, err := r.db.GetAsyncTransactionCount(ctx, request.UserID) + if err != nil { + return err + } + + response.RemainingCount = uint32(count) + if transaction != nil { + response.Txn = *transaction + logrus.Warnf("Obtained transaction: %v", transaction.TransactionID) + } + return nil +} diff --git a/federationapi/internal/transactionrequest.go b/federationapi/internal/transactionrequest.go new file mode 100644 index 000000000..3659ecccd --- /dev/null +++ b/federationapi/internal/transactionrequest.go @@ -0,0 +1,361 @@ +// Copyright 2022 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/producers" + "github.com/matrix-org/dendrite/federationapi/types" + "github.com/matrix-org/dendrite/internal" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" + syncTypes "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +var ( + PDUCountTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dendrite", + Subsystem: "federationapi", + Name: "recv_pdus", + Help: "Number of incoming PDUs from remote servers with labels for success", + }, + []string{"status"}, // 'success' or 'total' + ) + EDUCountTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "dendrite", + Subsystem: "federationapi", + Name: "recv_edus", + Help: "Number of incoming EDUs from remote servers", + }, + ) +) + +type TxnReq struct { + gomatrixserverlib.Transaction + rsAPI api.FederationRoomserverAPI + keyAPI keyapi.FederationKeyAPI + ourServerName gomatrixserverlib.ServerName + keys gomatrixserverlib.JSONVerifier + roomsMu *internal.MutexByRoom + servers federationAPI.ServersInRoomProvider + producer *producers.SyncAPIProducer + inboundPresenceEnabled bool +} + +func NewTxnReq( + rsAPI api.FederationRoomserverAPI, + keyAPI keyapi.FederationKeyAPI, + ourServerName gomatrixserverlib.ServerName, + keys gomatrixserverlib.JSONVerifier, + roomsMu *internal.MutexByRoom, + servers federationAPI.ServersInRoomProvider, + producer *producers.SyncAPIProducer, + inboundPresenceEnabled bool, + pdus []json.RawMessage, + edus []gomatrixserverlib.EDU, + origin gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, + destination gomatrixserverlib.ServerName, +) TxnReq { + t := TxnReq{ + rsAPI: rsAPI, + keyAPI: keyAPI, + ourServerName: ourServerName, + keys: keys, + roomsMu: roomsMu, + servers: servers, + producer: producer, + inboundPresenceEnabled: inboundPresenceEnabled, + } + + t.PDUs = pdus + t.EDUs = edus + t.Origin = origin + t.TransactionID = transactionID + t.Destination = destination + + return t +} + +func (t *TxnReq) ProcessTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if t.producer != nil { + t.processEDUs(ctx) + } + }() + + results := make(map[string]gomatrixserverlib.PDUResult) + roomVersions := make(map[string]gomatrixserverlib.RoomVersion) + getRoomVersion := func(roomID string) gomatrixserverlib.RoomVersion { + if v, ok := roomVersions[roomID]; ok { + return v + } + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil { + util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to query room version for room", verReq.RoomID) + return "" + } + roomVersions[roomID] = verRes.RoomVersion + return verRes.RoomVersion + } + + for _, pdu := range t.PDUs { + PDUCountTotal.WithLabelValues("total").Inc() + var header struct { + RoomID string `json:"room_id"` + } + if err := json.Unmarshal(pdu, &header); err != nil { + util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to extract room ID from event") + // We don't know the event ID at this point so we can't return the + // failure in the PDU results + continue + } + roomVersion := getRoomVersion(header.RoomID) + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) + if err != nil { + if _, ok := err.(gomatrixserverlib.BadJSONError); ok { + // Room version 6 states that homeservers should strictly enforce canonical JSON + // on PDUs. + // + // This enforces that the entire transaction is rejected if a single bad PDU is + // sent. It is unclear if this is the correct behaviour or not. + // + // See https://github.com/matrix-org/synapse/issues/7543 + return nil, &util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON("PDU contains bad JSON"), + } + } + util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu)) + continue + } + if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") { + continue + } + if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) { + results[event.EventID()] = gomatrixserverlib.PDUResult{ + Error: "Forbidden by server ACLs", + } + continue + } + if err = event.VerifyEventSignatures(ctx, t.keys); err != nil { + util.GetLogger(ctx).WithError(err).Debugf("Transaction: Couldn't validate signature of event %q", event.EventID()) + results[event.EventID()] = gomatrixserverlib.PDUResult{ + Error: err.Error(), + } + continue + } + + // pass the event to the roomserver which will do auth checks + // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently + // discarded by the caller of this function + if err = api.SendEvents( + ctx, + t.rsAPI, + api.KindNew, + []*gomatrixserverlib.HeaderedEvent{ + event.Headered(roomVersion), + }, + t.Destination, + t.Origin, + api.DoNotSendToOtherServers, + nil, + true, + ); err != nil { + util.GetLogger(ctx).WithError(err).Errorf("Transaction: Couldn't submit event %q to input queue: %s", event.EventID(), err) + results[event.EventID()] = gomatrixserverlib.PDUResult{ + Error: err.Error(), + } + continue + } + + results[event.EventID()] = gomatrixserverlib.PDUResult{} + PDUCountTotal.WithLabelValues("success").Inc() + } + + wg.Wait() + return &gomatrixserverlib.RespSend{PDUs: results}, nil +} + +// nolint:gocyclo +func (t *TxnReq) processEDUs(ctx context.Context) { + for _, e := range t.EDUs { + EDUCountTotal.Inc() + switch e.Type { + case gomatrixserverlib.MTyping: + // https://matrix.org/docs/spec/server_server/latest#typing-notifications + var typingPayload struct { + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` + } + if err := json.Unmarshal(e.Content, &typingPayload); err != nil { + util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal typing event") + continue + } + if _, serverName, err := gomatrixserverlib.SplitID('@', typingPayload.UserID); err != nil { + continue + } else if serverName == t.ourServerName { + continue + } else if serverName != t.Origin { + continue + } + if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream") + } + case gomatrixserverlib.MDirectToDevice: + // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema + var directPayload gomatrixserverlib.ToDeviceMessage + if err := json.Unmarshal(e.Content, &directPayload); err != nil { + util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal send-to-device events") + continue + } + if _, serverName, err := gomatrixserverlib.SplitID('@', directPayload.Sender); err != nil { + continue + } else if serverName == t.ourServerName { + continue + } else if serverName != t.Origin { + continue + } + for userID, byUser := range directPayload.Messages { + for deviceID, message := range byUser { + // TODO: check that the user and the device actually exist here + if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { + sentry.CaptureException(err) + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "sender": directPayload.Sender, + "user_id": userID, + "device_id": deviceID, + }).Error("Failed to send send-to-device event to JetStream") + } + } + } + case gomatrixserverlib.MDeviceListUpdate: + if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil { + sentry.CaptureException(err) + util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate") + } + case gomatrixserverlib.MReceipt: + // https://matrix.org/docs/spec/server_server/r0.1.4#receipts + payload := map[string]types.FederationReceiptMRead{} + + if err := json.Unmarshal(e.Content, &payload); err != nil { + util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event") + continue + } + + for roomID, receipt := range payload { + for userID, mread := range receipt.User { + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + util.GetLogger(ctx).WithError(err).Debug("Failed to split domain from receipt event sender") + continue + } + if t.Origin != domain { + util.GetLogger(ctx).Debugf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin) + continue + } + if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "sender": t.Origin, + "user_id": userID, + "room_id": roomID, + "events": mread.EventIDs, + }).Error("Failed to send receipt event to JetStream") + continue + } + } + } + case types.MSigningKeyUpdate: + if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil { + sentry.CaptureException(err) + logrus.WithError(err).Errorf("Failed to process signing key update") + } + case gomatrixserverlib.MPresence: + if t.inboundPresenceEnabled { + if err := t.processPresence(ctx, e); err != nil { + logrus.WithError(err).Errorf("Failed to process presence update") + } + } + default: + util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") + } + } +} + +// processPresence handles m.receipt events +func (t *TxnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error { + payload := types.Presence{} + if err := json.Unmarshal(e.Content, &payload); err != nil { + return err + } + for _, content := range payload.Push { + if _, serverName, err := gomatrixserverlib.SplitID('@', content.UserID); err != nil { + continue + } else if serverName == t.ourServerName { + continue + } else if serverName != t.Origin { + continue + } + presence, ok := syncTypes.PresenceFromString(content.Presence) + if !ok { + continue + } + if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil { + return err + } + } + return nil +} + +// processReceiptEvent sends receipt events to JetStream +func (t *TxnReq) processReceiptEvent(ctx context.Context, + userID, roomID, receiptType string, + timestamp gomatrixserverlib.Timestamp, + eventIDs []string, +) error { + if _, serverName, err := gomatrixserverlib.SplitID('@', userID); err != nil { + return nil + } else if serverName == t.ourServerName { + return nil + } else if serverName != t.Origin { + return nil + } + // store every event + for _, eventID := range eventIDs { + if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil { + return fmt.Errorf("unable to set receipt event: %w", err) + } + } + + return nil +} diff --git a/federationapi/inthttp/client.go b/federationapi/inthttp/client.go index 81a74e7df..6eaf344b2 100644 --- a/federationapi/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -23,9 +23,12 @@ const ( FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest" FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest" FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU" - FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync" - FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions" FederationAPIPerformWakeupServers = "/federationapi/performWakeupServers" + FederationAPIQueryMailservers = "/federationapi/queryMailservers" + FederationAPIPerformMailserverSync = "/federationapi/performMailserverSync" + + FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync" + FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions" FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices" FederationAPIClaimKeysPath = "/federationapi/client/claimKeys" @@ -153,28 +156,6 @@ func (h *httpFederationInternalAPI) PerformBroadcastEDU( ) } -func (h *httpFederationInternalAPI) PerformStoreAsync( - ctx context.Context, - request *api.PerformStoreAsyncRequest, - response *api.PerformStoreAsyncResponse, -) error { - return httputil.CallInternalRPCAPI( - "PerformStoreAsync", h.federationAPIURL+FederationAPIPerformStoreAsyncPath, - h.httpClient, ctx, request, response, - ) -} - -func (h *httpFederationInternalAPI) QueryAsyncTransactions( - ctx context.Context, - request *api.QueryAsyncTransactionsRequest, - response *api.QueryAsyncTransactionsResponse, -) error { - return httputil.CallInternalRPCAPI( - "QueryAsyncTransactions", h.federationAPIURL+FederationAPIQueryAsyncTransactionsPath, - h.httpClient, ctx, request, response, - ) -} - // Handle an instruction to remove the respective servers from being blacklisted. func (h *httpFederationInternalAPI) PerformWakeupServers( ctx context.Context, @@ -534,3 +515,48 @@ func (h *httpFederationInternalAPI) QueryPublicKeys( h.httpClient, ctx, request, response, ) } + +func (h *httpFederationInternalAPI) QueryMailservers( + ctx context.Context, + request *api.QueryMailserversRequest, + response *api.QueryMailserversResponse, +) error { + return httputil.CallInternalRPCAPI( + "QueryMailservers", h.federationAPIURL+FederationAPIQueryMailservers, + h.httpClient, ctx, request, response, + ) +} + +// PerformMailserverSync implements api.FederationInternalAPI +func (h *httpFederationInternalAPI) PerformMailserverSync( + ctx context.Context, + request *api.PerformMailserverSyncRequest, + response *api.PerformMailserverSyncResponse, +) error { + return httputil.CallInternalRPCAPI( + "PerformMailserverSync", h.federationAPIURL+FederationAPIPerformMailserverSync, + h.httpClient, ctx, request, response, + ) +} + +func (h *httpFederationInternalAPI) PerformStoreAsync( + ctx context.Context, + request *api.PerformStoreAsyncRequest, + response *api.PerformStoreAsyncResponse, +) error { + return httputil.CallInternalRPCAPI( + "PerformStoreAsync", h.federationAPIURL+FederationAPIPerformStoreAsyncPath, + h.httpClient, ctx, request, response, + ) +} + +func (h *httpFederationInternalAPI) QueryAsyncTransactions( + ctx context.Context, + request *api.QueryAsyncTransactionsRequest, + response *api.QueryAsyncTransactionsResponse, +) error { + return httputil.CallInternalRPCAPI( + "QueryAsyncTransactions", h.federationAPIURL+FederationAPIQueryAsyncTransactionsPath, + h.httpClient, ctx, request, response, + ) +} diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index f39d97fa1..72fdeb662 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -411,8 +411,9 @@ func (oq *destinationQueue) nextTransaction( mailservers := oq.statistics.KnownMailservers() if oq.statistics.AssumedOffline() && len(mailservers) > 0 { + logrus.Infof("Sending to mailservers: %v", mailservers) // TODO : how to pass through actual userID here?!?!?!?! - userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.origin), false) + userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false) if userErr != nil { return userErr, false } diff --git a/federationapi/routing/asyncevents.go b/federationapi/routing/asyncevents.go index c8cbf1dd9..549c7321c 100644 --- a/federationapi/routing/asyncevents.go +++ b/federationapi/routing/asyncevents.go @@ -6,6 +6,7 @@ import ( "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) type AsyncEventsResponse struct { @@ -21,6 +22,7 @@ func GetAsyncEvents( fedAPI api.FederationInternalAPI, userID gomatrixserverlib.UserID, ) util.JSONResponse { + logrus.Infof("Handling async_events for %v", userID) var response api.QueryAsyncTransactionsResponse err := fedAPI.QueryAsyncTransactions(httpReq.Context(), &api.QueryAsyncTransactionsRequest{UserID: userID}, &response) if err != nil { diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 828230747..04bf28e08 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -62,7 +62,7 @@ func Setup( producer *producers.SyncAPIProducer, ) { prometheus.MustRegister( - pduCountTotal, eduCountTotal, + fedInternal.PDUCountTotal, fedInternal.EDUCountTotal, ) v2keysmux := keyMux.PathPrefix("/v2").Subrouter() diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a146d85bd..31d5462bf 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -17,26 +17,21 @@ package routing import ( "context" "encoding/json" - "fmt" "net/http" "sync" "time" - "github.com/getsentry/sentry-go" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/jsonerror" federationAPI "github.com/matrix-org/dendrite/federationapi/api" + fedInternal "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/producers" - "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - syncTypes "github.com/matrix-org/dendrite/syncapi/types" ) const ( @@ -56,26 +51,6 @@ const ( MetricsWorkMissingPrevEvents = "missing_prev_events" ) -var ( - pduCountTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "dendrite", - Subsystem: "federationapi", - Name: "recv_pdus", - Help: "Number of incoming PDUs from remote servers with labels for success", - }, - []string{"status"}, // 'success' or 'total' - ) - eduCountTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "dendrite", - Subsystem: "federationapi", - Name: "recv_edus", - Help: "Number of incoming EDUs from remote servers", - }, - ) -) - var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse // Send implements /_matrix/federation/v1/send/{txnID} @@ -123,18 +98,6 @@ func Send( defer close(ch) defer inFlightTxnsPerOrigin.Delete(index) - t := txnReq{ - rsAPI: rsAPI, - keys: keys, - ourServerName: cfg.Matrix.ServerName, - federation: federation, - servers: servers, - keyAPI: keyAPI, - roomsMu: mu, - producer: producer, - inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound, - } - var txnEvents struct { PDUs []json.RawMessage `json:"pdus"` EDUs []gomatrixserverlib.EDU `json:"edus"` @@ -155,16 +118,24 @@ func Send( } } - // TODO: Really we should have a function to convert FederationRequest to txnReq - t.PDUs = txnEvents.PDUs - t.EDUs = txnEvents.EDUs - t.Origin = request.Origin() - t.TransactionID = txnID - t.Destination = cfg.Matrix.ServerName + t := fedInternal.NewTxnReq( + rsAPI, + keyAPI, + cfg.Matrix.ServerName, + keys, + mu, + servers, + producer, + cfg.Matrix.Presence.EnableInbound, + txnEvents.PDUs, + txnEvents.EDUs, + request.Origin(), + txnID, + cfg.Matrix.ServerName) util.GetLogger(httpReq.Context()).Debugf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(httpReq.Context()) + resp, jsonErr := t.ProcessTransaction(httpReq.Context()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -181,283 +152,3 @@ func Send( ch <- res return res } - -type txnReq struct { - gomatrixserverlib.Transaction - rsAPI api.FederationRoomserverAPI - keyAPI keyapi.FederationKeyAPI - ourServerName gomatrixserverlib.ServerName - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient - roomsMu *internal.MutexByRoom - servers federationAPI.ServersInRoomProvider - producer *producers.SyncAPIProducer - inboundPresenceEnabled bool -} - -// A subset of FederationClient functionality that txn requires. Useful for testing. -type txnFederationClient interface { - LookupState(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( - res gomatrixserverlib.RespState, err error, - ) - LookupStateIDs(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) - GetEvent(ctx context.Context, origin, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) - LookupMissingEvents(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, - roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) -} - -func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - t.processEDUs(ctx) - }() - - results := make(map[string]gomatrixserverlib.PDUResult) - roomVersions := make(map[string]gomatrixserverlib.RoomVersion) - getRoomVersion := func(roomID string) gomatrixserverlib.RoomVersion { - if v, ok := roomVersions[roomID]; ok { - return v - } - verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} - verRes := api.QueryRoomVersionForRoomResponse{} - if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil { - util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to query room version for room", verReq.RoomID) - return "" - } - roomVersions[roomID] = verRes.RoomVersion - return verRes.RoomVersion - } - - for _, pdu := range t.PDUs { - pduCountTotal.WithLabelValues("total").Inc() - var header struct { - RoomID string `json:"room_id"` - } - if err := json.Unmarshal(pdu, &header); err != nil { - util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to extract room ID from event") - // We don't know the event ID at this point so we can't return the - // failure in the PDU results - continue - } - roomVersion := getRoomVersion(header.RoomID) - event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) - if err != nil { - if _, ok := err.(gomatrixserverlib.BadJSONError); ok { - // Room version 6 states that homeservers should strictly enforce canonical JSON - // on PDUs. - // - // This enforces that the entire transaction is rejected if a single bad PDU is - // sent. It is unclear if this is the correct behaviour or not. - // - // See https://github.com/matrix-org/synapse/issues/7543 - return nil, &util.JSONResponse{ - Code: 400, - JSON: jsonerror.BadJSON("PDU contains bad JSON"), - } - } - util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu)) - continue - } - if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") { - continue - } - if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) { - results[event.EventID()] = gomatrixserverlib.PDUResult{ - Error: "Forbidden by server ACLs", - } - continue - } - if err = event.VerifyEventSignatures(ctx, t.keys); err != nil { - util.GetLogger(ctx).WithError(err).Debugf("Transaction: Couldn't validate signature of event %q", event.EventID()) - results[event.EventID()] = gomatrixserverlib.PDUResult{ - Error: err.Error(), - } - continue - } - - // pass the event to the roomserver which will do auth checks - // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently - // discarded by the caller of this function - if err = api.SendEvents( - ctx, - t.rsAPI, - api.KindNew, - []*gomatrixserverlib.HeaderedEvent{ - event.Headered(roomVersion), - }, - t.Destination, - t.Origin, - api.DoNotSendToOtherServers, - nil, - true, - ); err != nil { - util.GetLogger(ctx).WithError(err).Errorf("Transaction: Couldn't submit event %q to input queue: %s", event.EventID(), err) - results[event.EventID()] = gomatrixserverlib.PDUResult{ - Error: err.Error(), - } - continue - } - - results[event.EventID()] = gomatrixserverlib.PDUResult{} - pduCountTotal.WithLabelValues("success").Inc() - } - - wg.Wait() - return &gomatrixserverlib.RespSend{PDUs: results}, nil -} - -// nolint:gocyclo -func (t *txnReq) processEDUs(ctx context.Context) { - for _, e := range t.EDUs { - eduCountTotal.Inc() - switch e.Type { - case gomatrixserverlib.MTyping: - // https://matrix.org/docs/spec/server_server/latest#typing-notifications - var typingPayload struct { - RoomID string `json:"room_id"` - UserID string `json:"user_id"` - Typing bool `json:"typing"` - } - if err := json.Unmarshal(e.Content, &typingPayload); err != nil { - util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal typing event") - continue - } - if _, serverName, err := gomatrixserverlib.SplitID('@', typingPayload.UserID); err != nil { - continue - } else if serverName == t.ourServerName { - continue - } else if serverName != t.Origin { - continue - } - if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { - util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream") - } - case gomatrixserverlib.MDirectToDevice: - // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema - var directPayload gomatrixserverlib.ToDeviceMessage - if err := json.Unmarshal(e.Content, &directPayload); err != nil { - util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal send-to-device events") - continue - } - if _, serverName, err := gomatrixserverlib.SplitID('@', directPayload.Sender); err != nil { - continue - } else if serverName == t.ourServerName { - continue - } else if serverName != t.Origin { - continue - } - for userID, byUser := range directPayload.Messages { - for deviceID, message := range byUser { - // TODO: check that the user and the device actually exist here - if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { - sentry.CaptureException(err) - util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ - "sender": directPayload.Sender, - "user_id": userID, - "device_id": deviceID, - }).Error("Failed to send send-to-device event to JetStream") - } - } - } - case gomatrixserverlib.MDeviceListUpdate: - if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil { - sentry.CaptureException(err) - util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate") - } - case gomatrixserverlib.MReceipt: - // https://matrix.org/docs/spec/server_server/r0.1.4#receipts - payload := map[string]types.FederationReceiptMRead{} - - if err := json.Unmarshal(e.Content, &payload); err != nil { - util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event") - continue - } - - for roomID, receipt := range payload { - for userID, mread := range receipt.User { - _, domain, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - util.GetLogger(ctx).WithError(err).Debug("Failed to split domain from receipt event sender") - continue - } - if t.Origin != domain { - util.GetLogger(ctx).Debugf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin) - continue - } - if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil { - util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ - "sender": t.Origin, - "user_id": userID, - "room_id": roomID, - "events": mread.EventIDs, - }).Error("Failed to send receipt event to JetStream") - continue - } - } - } - case types.MSigningKeyUpdate: - if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil { - sentry.CaptureException(err) - logrus.WithError(err).Errorf("Failed to process signing key update") - } - case gomatrixserverlib.MPresence: - if t.inboundPresenceEnabled { - if err := t.processPresence(ctx, e); err != nil { - logrus.WithError(err).Errorf("Failed to process presence update") - } - } - default: - util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") - } - } -} - -// processPresence handles m.receipt events -func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error { - payload := types.Presence{} - if err := json.Unmarshal(e.Content, &payload); err != nil { - return err - } - for _, content := range payload.Push { - if _, serverName, err := gomatrixserverlib.SplitID('@', content.UserID); err != nil { - continue - } else if serverName == t.ourServerName { - continue - } else if serverName != t.Origin { - continue - } - presence, ok := syncTypes.PresenceFromString(content.Presence) - if !ok { - continue - } - if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil { - return err - } - } - return nil -} - -// processReceiptEvent sends receipt events to JetStream -func (t *txnReq) processReceiptEvent(ctx context.Context, - userID, roomID, receiptType string, - timestamp gomatrixserverlib.Timestamp, - eventIDs []string, -) error { - if _, serverName, err := gomatrixserverlib.SplitID('@', userID); err != nil { - return nil - } else if serverName == t.ourServerName { - return nil - } else if serverName != t.Origin { - return nil - } - // store every event - for _, eventID := range eventIDs { - if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil { - return fmt.Errorf("unable to set receipt event: %w", err) - } - } - - return nil -} diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index b8bfe0221..aa4923087 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + fedInternal "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/test" @@ -184,22 +185,30 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, origin, s gomatr return c.getMissingEvents(missing) } -func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { - t := &txnReq{ - rsAPI: rsAPI, - keys: &test.NopJSONVerifier{}, - federation: fedClient, - roomsMu: internal.NewMutexByRoom(), - } +func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMessage) *fedInternal.TxnReq { + t := fedInternal.NewTxnReq( + rsAPI, + nil, + "", + &test.NopJSONVerifier{}, + internal.NewMutexByRoom(), + nil, + nil, + false, + pdus, + nil, + testOrigin, + gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano())), + testDestination) t.PDUs = pdus t.Origin = testOrigin t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano())) t.Destination = testDestination - return t + return &t } -func mustProcessTransaction(t *testing.T, txn *txnReq, pdusWithErrors []string) { - res, err := txn.processTransaction(context.Background()) +func mustProcessTransaction(t *testing.T, txn *fedInternal.TxnReq, pdusWithErrors []string) { + res, err := txn.ProcessTransaction(context.Background()) if err != nil { t.Errorf("txn.processTransaction returned an error: %v", err) return @@ -262,7 +271,7 @@ func TestBasicTransaction(t *testing.T) { pdus := []json.RawMessage{ testData[len(testData)-1], // a message event } - txn := mustCreateTransaction(rsAPI, &txnFedClient{}, pdus) + txn := mustCreateTransaction(rsAPI, pdus) mustProcessTransaction(t, txn, nil) assertInputRoomEvents(t, rsAPI.inputRoomEvents, []*gomatrixserverlib.HeaderedEvent{testEvents[len(testEvents)-1]}) } @@ -274,7 +283,7 @@ func TestTransactionFailAuthChecks(t *testing.T) { pdus := []json.RawMessage{ testData[len(testData)-1], // a message event } - txn := mustCreateTransaction(rsAPI, &txnFedClient{}, pdus) + txn := mustCreateTransaction(rsAPI, pdus) mustProcessTransaction(t, txn, []string{}) // expect message to be sent to the roomserver assertInputRoomEvents(t, rsAPI.inputRoomEvents, []*gomatrixserverlib.HeaderedEvent{testEvents[len(testEvents)-1]})