From f300a4d0e9c8dde7e8d58f1fc45212fd8da98677 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 14 Dec 2022 10:30:30 -0700 Subject: [PATCH] Refactor relay querying --- cmd/dendrite-demo-pinecone/main.go | 44 ++++++++- federationapi/api/api.go | 13 --- federationapi/internal/perform.go | 64 ------------- federationapi/inthttp/client.go | 13 --- federationapi/routing/routing.go | 2 +- federationapi/routing/send.go | 4 +- federationapi/routing/send_test.go | 8 +- .../transactionrequest.go | 9 +- relayapi/relayapi.go | 93 +++++++++++++++++++ 9 files changed, 141 insertions(+), 109 deletions(-) rename {federationapi/internal => internal}/transactionrequest.go (97%) create mode 100644 relayapi/relayapi.go diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 8ed133a0e..d594cb863 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -38,13 +38,16 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/relayapi" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" @@ -216,6 +219,11 @@ func main() { base, federation, rsAPI, base.Caches, keyRing, true, ) + // TODO : What if I had another dendrite function, where I just passed in the fed_client, + // rsAPI, userAPI, keyRing & any other info. Then that thing is what handles doing the + // async_events querying? + // ie. don't tie it into the federationInternalAPI & get rid of the associated mess. + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) keyAPI.SetUserAPI(userAPI) @@ -311,11 +319,32 @@ func main() { relayServerSyncRunning := atomic.NewBool(false) stopRelayServerSync := make(chan bool) + js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.FederationAPI.Matrix.JetStream) + producer := &producers.SyncAPIProducer{ + JetStream: js, + TopicReceiptEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + TopicTypingEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + TopicPresenceEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicDeviceListUpdate: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + TopicSigningKeyUpdate: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), + Config: &base.Cfg.FederationAPI, + UserAPI: userAPI, + } + m := RelayServerRetriever{ Context: context.Background(), ServerName: gomatrixserverlib.ServerName(pRouter.PublicKey().String()), FederationAPI: fsAPI, RelayServersQueried: make(map[gomatrixserverlib.ServerName]bool), + RelayAPI: relayapi.NewRelayAPI( + federation, + rsAPI, + keyRing, + producer, + cfg.Global.Presence.EnableInbound, + cfg.Global.ServerName, + ), } m.InitializeRelayServers(eLog) @@ -358,6 +387,7 @@ type RelayServerRetriever struct { ServerName gomatrixserverlib.ServerName FederationAPI api.FederationInternalAPI RelayServersQueried map[gomatrixserverlib.ServerName]bool + RelayAPI relayapi.RelayAPI } func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { @@ -406,11 +436,19 @@ func (m *RelayServerRetriever) syncRelayServers(stop <-chan bool, running atomic func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { logrus.Info("querying relay servers for async_events") for _, server := range relayServers { - request := api.PerformRelayServerSyncRequest{RelayServer: server} - response := api.PerformRelayServerSyncResponse{} - err := m.FederationAPI.PerformRelayServerSync(m.Context, &request, &response) + userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false) + if err != nil { + return + } + err = m.RelayAPI.PerformRelayServerSync(*userID, server) if err == nil { m.RelayServersQueried[server] = true + // TODO : What happens if your relay receives new messages after this point? + // Should you continue to check with them, or should they try and contact you? + // They could send a "new_async_events" message your way maybe? + // Then you could mark them as needing to be queried again. + // What if you miss this message? + // Maybe you should try querying them again after a certain period of time as a backup? } else { logrus.Errorf("Failed querying relay server: %s", err.Error()) } diff --git a/federationapi/api/api.go b/federationapi/api/api.go index 69e0d1e43..3b234d5eb 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -43,11 +43,6 @@ type FederationInternalAPI interface { request *QueryRelayServersRequest, response *QueryRelayServersResponse, ) error - PerformRelayServerSync( - ctx context.Context, - request *PerformRelayServerSyncRequest, - response *PerformRelayServerSyncResponse, - ) error } type RelayServerAPI interface { @@ -273,14 +268,6 @@ type QueryRelayServersResponse struct { RelayServers []gomatrixserverlib.ServerName } -type PerformRelayServerSyncRequest struct { - RelayServer gomatrixserverlib.ServerName -} - -type PerformRelayServerSyncResponse struct { - SyncComplete bool -} - type PerformStoreAsyncRequest struct { Txn gomatrixserverlib.Transaction `json:"transaction"` UserID gomatrixserverlib.UserID `json:"user_id"` diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 5e5086bcf..edb943526 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -15,7 +15,6 @@ import ( "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/storage/shared" - "github.com/matrix-org/dendrite/internal" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" ) @@ -849,69 +848,6 @@ func (r *FederationInternalAPI) QueryRelayServers( return nil } -// PerformRelayServerSync implements api.FederationInternalAPI -func (r *FederationInternalAPI) PerformRelayServerSync( - ctx context.Context, - request *api.PerformRelayServerSyncRequest, - response *api.PerformRelayServerSyncResponse, -) error { - userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.cfg.Matrix.ServerName), false) - if err != nil { - return err - } - - asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.RelayServer) - if err != nil { - logrus.Errorf("GetAsyncEvents: %s", err.Error()) - return err - } - r.processTransaction(&asyncResponse.Transaction) - - for asyncResponse.Remaining > 0 { - asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.RelayServer) - if err != nil { - logrus.Errorf("GetAsyncEvents: %s", err.Error()) - return err - } - r.processTransaction(&asyncResponse.Transaction) - } - - return nil -} - -func (r *FederationInternalAPI) processTransaction(txn *gomatrixserverlib.Transaction) { - logrus.Warn("Processing transaction from relay server") - mu := internal.NewMutexByRoom() - // js, _ := base.NATS.Prepare(base.ProcessContext, &r.cfg.Matrix.JetStream) - // producer := &producers.SyncAPIProducer{ - // JetStream: js, - // TopicReceiptEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), - // TopicSendToDeviceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - // TopicTypingEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), - // TopicPresenceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), - // TopicDeviceListUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), - // TopicSigningKeyUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), - // Config: r.cfg, - // UserAPI: r.userAPI, - // } - t := NewTxnReq( - r.rsAPI, - nil, - r.cfg.Matrix.ServerName, - r.keyRing, - mu, - nil, - nil, // TODO : assign producer to process EDUs - r.cfg.Matrix.Presence.EnableInbound, - txn.PDUs, - txn.EDUs, - txn.Origin, - txn.TransactionID, - txn.Destination) - - t.ProcessTransaction(context.TODO()) -} - // PerformStoreAsync implements api.FederationInternalAPI func (r *FederationInternalAPI) PerformStoreAsync( ctx context.Context, diff --git a/federationapi/inthttp/client.go b/federationapi/inthttp/client.go index d1537c0b1..cc63cd96a 100644 --- a/federationapi/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -25,7 +25,6 @@ const ( FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU" FederationAPIPerformWakeupServers = "/federationapi/performWakeupServers" FederationAPIQueryRelayServers = "/federationapi/queryRelayServers" - FederationAPIPerformRelayServerSync = "/federationapi/performRelayServerSync" FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync" FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions" @@ -527,18 +526,6 @@ func (h *httpFederationInternalAPI) QueryRelayServers( ) } -// PerformRelayServerSync implements api.FederationInternalAPI -func (h *httpFederationInternalAPI) PerformRelayServerSync( - ctx context.Context, - request *api.PerformRelayServerSyncRequest, - response *api.PerformRelayServerSyncResponse, -) error { - return httputil.CallInternalRPCAPI( - "PerformRelayServerSync", h.federationAPIURL+FederationAPIPerformRelayServerSync, - h.httpClient, ctx, request, response, - ) -} - func (h *httpFederationInternalAPI) PerformStoreAsync( ctx context.Context, request *api.PerformStoreAsyncRequest, diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 04bf28e08..beabf894a 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -62,7 +62,7 @@ func Setup( producer *producers.SyncAPIProducer, ) { prometheus.MustRegister( - fedInternal.PDUCountTotal, fedInternal.EDUCountTotal, + internal.PDUCountTotal, internal.EDUCountTotal, ) v2keysmux := keyMux.PathPrefix("/v2").Subrouter() diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 31d5462bf..67b513c90 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -26,7 +26,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" federationAPI "github.com/matrix-org/dendrite/federationapi/api" - fedInternal "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/internal" keyapi "github.com/matrix-org/dendrite/keyserver/api" @@ -118,13 +117,12 @@ func Send( } } - t := fedInternal.NewTxnReq( + t := internal.NewTxnReq( rsAPI, keyAPI, cfg.Matrix.ServerName, keys, mu, - servers, producer, cfg.Matrix.Presence.EnableInbound, txnEvents.PDUs, diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index aa4923087..76e5c750c 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - fedInternal "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/test" @@ -185,15 +184,14 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, origin, s gomatr return c.getMissingEvents(missing) } -func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMessage) *fedInternal.TxnReq { - t := fedInternal.NewTxnReq( +func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMessage) *internal.TxnReq { + t := internal.NewTxnReq( rsAPI, nil, "", &test.NopJSONVerifier{}, internal.NewMutexByRoom(), nil, - nil, false, pdus, nil, @@ -207,7 +205,7 @@ func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMes return &t } -func mustProcessTransaction(t *testing.T, txn *fedInternal.TxnReq, pdusWithErrors []string) { +func mustProcessTransaction(t *testing.T, txn *internal.TxnReq, pdusWithErrors []string) { res, err := txn.ProcessTransaction(context.Background()) if err != nil { t.Errorf("txn.processTransaction returned an error: %v", err) diff --git a/federationapi/internal/transactionrequest.go b/internal/transactionrequest.go similarity index 97% rename from federationapi/internal/transactionrequest.go rename to internal/transactionrequest.go index 3659ecccd..77f388966 100644 --- a/federationapi/internal/transactionrequest.go +++ b/internal/transactionrequest.go @@ -22,10 +22,8 @@ import ( "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/clientapi/jsonerror" - federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/federationapi/types" - "github.com/matrix-org/dendrite/internal" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" syncTypes "github.com/matrix-org/dendrite/syncapi/types" @@ -61,8 +59,7 @@ type TxnReq struct { keyAPI keyapi.FederationKeyAPI ourServerName gomatrixserverlib.ServerName keys gomatrixserverlib.JSONVerifier - roomsMu *internal.MutexByRoom - servers federationAPI.ServersInRoomProvider + roomsMu *MutexByRoom producer *producers.SyncAPIProducer inboundPresenceEnabled bool } @@ -72,8 +69,7 @@ func NewTxnReq( keyAPI keyapi.FederationKeyAPI, ourServerName gomatrixserverlib.ServerName, keys gomatrixserverlib.JSONVerifier, - roomsMu *internal.MutexByRoom, - servers federationAPI.ServersInRoomProvider, + roomsMu *MutexByRoom, producer *producers.SyncAPIProducer, inboundPresenceEnabled bool, pdus []json.RawMessage, @@ -88,7 +84,6 @@ func NewTxnReq( ourServerName: ourServerName, keys: keys, roomsMu: roomsMu, - servers: servers, producer: producer, inboundPresenceEnabled: inboundPresenceEnabled, } diff --git a/relayapi/relayapi.go b/relayapi/relayapi.go new file mode 100644 index 000000000..2237b0431 --- /dev/null +++ b/relayapi/relayapi.go @@ -0,0 +1,93 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package relayapi + +import ( + "context" + + "github.com/matrix-org/dendrite/federationapi/producers" + "github.com/matrix-org/dendrite/internal" + rsAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +type RelayAPI struct { + fedClient *gomatrixserverlib.FederationClient + rsAPI rsAPI.RoomserverInternalAPI + keyRing *gomatrixserverlib.KeyRing + producer *producers.SyncAPIProducer + presenceEnabledInbound bool + serverName gomatrixserverlib.ServerName +} + +func NewRelayAPI( + fedClient *gomatrixserverlib.FederationClient, + rsAPI rsAPI.RoomserverInternalAPI, + keyRing *gomatrixserverlib.KeyRing, + producer *producers.SyncAPIProducer, + presenceEnabledInbound bool, + serverName gomatrixserverlib.ServerName, +) RelayAPI { + return RelayAPI{ + fedClient: fedClient, + rsAPI: rsAPI, + keyRing: keyRing, + producer: producer, + presenceEnabledInbound: presenceEnabledInbound, + serverName: serverName, + } +} + +// PerformRelayServerSync implements api.FederationInternalAPI +func (r *RelayAPI) PerformRelayServerSync(userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName) error { + asyncResponse, err := r.fedClient.GetAsyncEvents(context.Background(), userID, relayServer) + if err != nil { + logrus.Errorf("GetAsyncEvents: %s", err.Error()) + return err + } + r.processTransaction(&asyncResponse.Transaction) + + for asyncResponse.Remaining > 0 { + asyncResponse, err := r.fedClient.GetAsyncEvents(context.Background(), userID, relayServer) + if err != nil { + logrus.Errorf("GetAsyncEvents: %s", err.Error()) + return err + } + r.processTransaction(&asyncResponse.Transaction) + } + + return nil +} + +func (r *RelayAPI) processTransaction(txn *gomatrixserverlib.Transaction) { + logrus.Warn("Processing transaction from relay server") + mu := internal.NewMutexByRoom() + t := internal.NewTxnReq( + r.rsAPI, + nil, + r.serverName, + r.keyRing, + mu, + r.producer, + r.presenceEnabledInbound, + txn.PDUs, + txn.EDUs, + txn.Origin, + txn.TransactionID, + txn.Destination) + + t.ProcessTransaction(context.TODO()) +}