From 96d5467e031c185cf415f78f9f94e4fc7b7531fe Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Mon, 19 Dec 2022 14:09:16 -0700 Subject: [PATCH] Cleanup relayapi usage in p2p demos --- build/gobind-pinecone/monolith.go | 117 ++++++++++++++++-- cmd/dendrite-demo-pinecone/main.go | 2 +- relayapi/internal/api.go | 4 +- relayapi/relayapi.go | 2 +- relayapi/relayapi_test.go | 7 +- relayapi/routing/asyncevents_test.go | 12 +- relayapi/routing/forwardasync_test.go | 13 +- relayapi/routing/routing_test.go | 25 +++- .../tables/relay_queue_json_table_test.go | 3 +- 9 files changed, 147 insertions(+), 38 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index ba771f3b4..e96167197 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -41,12 +41,16 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/relayapi" + relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/userapi" @@ -329,6 +333,7 @@ func (m *DendriteMonolith) Start() { cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", filepath.Join(m.StorageDirectory, prefix))) cfg.MediaAPI.BasePath = config.Path(filepath.Join(m.CacheDirectory, "media")) cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(m.CacheDirectory, "media")) + cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(m.StorageDirectory, prefix))) cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} cfg.ClientAPI.RegistrationDisabled = false cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true @@ -366,6 +371,20 @@ func (m *DendriteMonolith) Start() { userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation) roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, m.federationAPI, federation) + js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + producer := &producers.SyncAPIProducer{ + JetStream: js, + TopicReceiptEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + TopicTypingEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent), + TopicPresenceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicDeviceListUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + TopicSigningKeyUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), + Config: &base.Cfg.FederationAPI, + UserAPI: m.userAPI, + } + relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer) + monolith := setup.Monolith{ Config: base.Cfg, Client: conn.CreateClient(base, m.PineconeQUIC), @@ -377,6 +396,7 @@ func (m *DendriteMonolith) Start() { RoomserverAPI: rsAPI, UserAPI: m.userAPI, KeyAPI: keyAPI, + RelayAPI: relayAPI, ExtPublicRoomsProvider: roomProvider, ExtUserDirectoryProvider: userProvider, } @@ -442,23 +462,20 @@ func (m *DendriteMonolith) Start() { relayServerSyncRunning := atomic.NewBool(false) stopRelayServerSync := make(chan bool) - // Setup relay server info - request := api.QueryRelayServersRequest{Server: gomatrixserverlib.ServerName(m.PublicKey())} - response := api.QueryRelayServersResponse{} - err := m.federationAPI.QueryRelayServers(m.processContext.Context(), &request, &response) - if err != nil { - // TODO - } - m.relayServersQueried = make(map[gomatrixserverlib.ServerName]bool) - for _, server := range response.RelayServers { - m.relayServersQueried[server] = false + relayRetriever := RelayServerRetriever{ + Context: context.Background(), + ServerName: gomatrixserverlib.ServerName(m.PineconeRouter.PublicKey().String()), + FederationAPI: m.federationAPI, + RelayServersQueried: make(map[gomatrixserverlib.ServerName]bool), + RelayAPI: monolith.RelayAPI, } + relayRetriever.InitializeRelayServers(eLog) for event := range ch { switch e := event.(type) { case pineconeEvents.PeerAdded: if !relayServerSyncRunning.Load() { - // go m.syncRelayServers(stopRelayServerSync, *relayServerSyncRunning) + go relayRetriever.syncRelayServers(stopRelayServerSync, *relayServerSyncRunning) } case pineconeEvents.PeerRemoved: if relayServerSyncRunning.Load() && m.PineconeRouter.PeerCount(-1) == 0 { @@ -495,6 +512,84 @@ func (m *DendriteMonolith) Stop() { m.processContext.WaitForComponentsToFinish() } +type RelayServerRetriever struct { + Context context.Context + ServerName gomatrixserverlib.ServerName + FederationAPI api.FederationInternalAPI + RelayServersQueried map[gomatrixserverlib.ServerName]bool + RelayAPI relayServerAPI.RelayInternalAPI +} + +func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { + request := api.QueryRelayServersRequest{Server: gomatrixserverlib.ServerName(m.ServerName)} + response := api.QueryRelayServersResponse{} + err := m.FederationAPI.QueryRelayServers(m.Context, &request, &response) + if err != nil { + // TODO + } + for _, server := range response.RelayServers { + m.RelayServersQueried[server] = false + } + + eLog.Infof("Registered relay servers: %v", response.RelayServers) +} + +func (m *RelayServerRetriever) syncRelayServers(stop <-chan bool, running atomic.Bool) { + defer running.Store(false) + + t := time.NewTimer(relayServerRetryInterval) + for { + relayServersToQuery := []gomatrixserverlib.ServerName{} + for server, complete := range m.RelayServersQueried { + if !complete { + relayServersToQuery = append(relayServersToQuery, server) + } + } + if len(relayServersToQuery) == 0 { + // All relay servers have been synced. + return + } + m.queryRelayServers(relayServersToQuery) + t.Reset(relayServerRetryInterval) + + select { + case <-stop: + if !t.Stop() { + <-t.C + } + return + case <-t.C: + } + } +} + +func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { + logrus.Info("querying relay servers for async_events") + for _, server := range relayServers { + userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false) + if err != nil { + return + } + request := relayServerAPI.PerformRelayServerSyncRequest{ + UserID: *userID, + RelayServer: server, + } + response := relayServerAPI.PerformRelayServerSyncResponse{} + err = m.RelayAPI.PerformRelayServerSync(context.Background(), &request, &response) + if err == nil { + m.RelayServersQueried[server] = true + // TODO : What happens if your relay receives new messages after this point? + // Should you continue to check with them, or should they try and contact you? + // They could send a "new_async_events" message your way maybe? + // Then you could mark them as needing to be queried again. + // What if you miss this message? + // Maybe you should try querying them again after a certain period of time as a backup? + } else { + logrus.Errorf("Failed querying relay server: %s", err.Error()) + } + } +} + const MaxFrameSize = types.MaxFrameSize type Conduit struct { diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index edd808d68..fe2405dd9 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -257,7 +257,7 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, - RelayAPI: &relayAPI, + RelayAPI: relayAPI, ExtPublicRoomsProvider: roomProvider, ExtUserDirectoryProvider: userProvider, } diff --git a/relayapi/internal/api.go b/relayapi/internal/api.go index a0cb892be..3ff8c2add 100644 --- a/relayapi/internal/api.go +++ b/relayapi/internal/api.go @@ -40,8 +40,8 @@ func NewRelayInternalAPI( producer *producers.SyncAPIProducer, presenceEnabledInbound bool, serverName gomatrixserverlib.ServerName, -) RelayInternalAPI { - return RelayInternalAPI{ +) *RelayInternalAPI { + return &RelayInternalAPI{ db: db, fedClient: fedClient, rsAPI: rsAPI, diff --git a/relayapi/relayapi.go b/relayapi/relayapi.go index 6be7249af..df282294f 100644 --- a/relayapi/relayapi.go +++ b/relayapi/relayapi.go @@ -62,7 +62,7 @@ func NewRelayInternalAPI( rsAPI rsAPI.RoomserverInternalAPI, keyRing *gomatrixserverlib.KeyRing, producer *producers.SyncAPIProducer, -) internal.RelayInternalAPI { +) api.RelayInternalAPI { cfg := &base.Cfg.RelayAPI relayDB, err := storage.NewDatabase(base, &cfg.Database, base.Caches, base.Cfg.Global.IsLocalServerName) diff --git a/relayapi/relayapi_test.go b/relayapi/relayapi_test.go index 41c6abc50..0572b8d7e 100644 --- a/relayapi/relayapi_test.go +++ b/relayapi/relayapi_test.go @@ -1,5 +1,4 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// +// Copyright 2022 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -58,7 +57,7 @@ func TestCreateRelayInternalRoutes(t *testing.T) { relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) assert.NotNil(t, relayAPI) - relayapi.AddInternalRoutes(base.InternalAPIMux, &relayAPI, false) + relayapi.AddInternalRoutes(base.InternalAPIMux, relayAPI, false) } func TestCreateInvalidRelayPublicRoutesPanics(t *testing.T) { @@ -79,5 +78,5 @@ func TestCreateRelayPublicRoutes(t *testing.T) { relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) assert.NotNil(t, relayAPI) - relayapi.AddPublicRoutes(base, nil, &relayAPI) + relayapi.AddPublicRoutes(base, nil, relayAPI) } diff --git a/relayapi/routing/asyncevents_test.go b/relayapi/routing/asyncevents_test.go index 9e6e6145f..01b24d58e 100644 --- a/relayapi/routing/asyncevents_test.go +++ b/relayapi/routing/asyncevents_test.go @@ -47,7 +47,7 @@ func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) { ) request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) - response := routing.GetAsyncEvents(httpReq, &request, &relayAPI, *userID) + response := routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse := response.JSON.(routing.AsyncEventsResponse) @@ -88,7 +88,7 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) { ) request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) - response := routing.GetAsyncEvents(httpReq, &request, &relayAPI, *userID) + response := routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse := response.JSON.(routing.AsyncEventsResponse) @@ -97,7 +97,7 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) { // And once more to clear the queue request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) - response = routing.GetAsyncEvents(httpReq, &request, &relayAPI, *userID) + response = routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse = response.JSON.(routing.AsyncEventsResponse) @@ -151,7 +151,7 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { ) request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) - response := routing.GetAsyncEvents(httpReq, &request, &relayAPI, *userID) + response := routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse := response.JSON.(routing.AsyncEventsResponse) @@ -159,7 +159,7 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { assert.Equal(t, transaction, jsonResponse.Txn) request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) - response = routing.GetAsyncEvents(httpReq, &request, &relayAPI, *userID) + response = routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse = response.JSON.(routing.AsyncEventsResponse) @@ -168,7 +168,7 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { // And once more to clear the queue request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) - response = routing.GetAsyncEvents(httpReq, &request, &relayAPI, *userID) + response = routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse = response.JSON.(routing.AsyncEventsResponse) diff --git a/relayapi/routing/forwardasync_test.go b/relayapi/routing/forwardasync_test.go index 0b9da822b..e65463c2f 100644 --- a/relayapi/routing/forwardasync_test.go +++ b/relayapi/routing/forwardasync_test.go @@ -16,8 +16,7 @@ import ( ) const ( - testOrigin = gomatrixserverlib.ServerName("kaer.morhen") - testDestination = gomatrixserverlib.ServerName("white.orchard") + testOrigin = gomatrixserverlib.ServerName("kaer.morhen") ) func createTransaction() gomatrixserverlib.Transaction { @@ -62,7 +61,7 @@ func TestForwardEmptyReturnsOk(t *testing.T) { &db, nil, nil, nil, nil, false, "", ) - response := routing.ForwardAsync(httpReq, &request, &relayAPI, "1", *userID) + response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID) assert.Equal(t, 200, response.Code) } @@ -91,7 +90,7 @@ func TestForwardBadJSONReturnsError(t *testing.T) { &db, nil, nil, nil, nil, false, "", ) - response := routing.ForwardAsync(httpReq, &request, &relayAPI, "1", *userID) + response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID) assert.NotEqual(t, 200, response.Code) } @@ -125,7 +124,7 @@ func TestForwardTooManyPDUsReturnsError(t *testing.T) { &db, nil, nil, nil, nil, false, "", ) - response := routing.ForwardAsync(httpReq, &request, &relayAPI, "1", *userID) + response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID) assert.NotEqual(t, 200, response.Code) } @@ -159,7 +158,7 @@ func TestForwardTooManyEDUsReturnsError(t *testing.T) { &db, nil, nil, nil, nil, false, "", ) - response := routing.ForwardAsync(httpReq, &request, &relayAPI, "1", *userID) + response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID) assert.NotEqual(t, 200, response.Code) } @@ -183,7 +182,7 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) { ) response := routing.ForwardAsync( - httpReq, &request, &relayAPI, txn.TransactionID, *userID) + httpReq, &request, relayAPI, txn.TransactionID, *userID) transaction, _, err := db.GetAsyncTransaction(context.TODO(), *userID) assert.NoError(t, err, "Failed retrieving transaction") diff --git a/relayapi/routing/routing_test.go b/relayapi/routing/routing_test.go index f77ec2525..c36a4e031 100644 --- a/relayapi/routing/routing_test.go +++ b/relayapi/routing/routing_test.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/relayapi" + "github.com/matrix-org/dendrite/relayapi/internal" "github.com/matrix-org/dendrite/relayapi/routing" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" @@ -53,7 +54,11 @@ func TestHandleForwardAsync(t *testing.T) { relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - routing.Setup(fedMux, &cfg, &relayAPI, keyRing) + r, ok := relayAPI.(*internal.RelayInternalAPI) + if !ok { + panic("This is a programming error.") + } + routing.Setup(fedMux, &cfg, r, keyRing) handler := fedMux.Get(routing.ForwardAsyncRouteName).GetHandler().ServeHTTP _, sk, _ := ed25519.GenerateKey(nil) @@ -95,7 +100,11 @@ func TestHandleForwardAsyncBadUserID(t *testing.T) { relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - routing.Setup(fedMux, &cfg, &relayAPI, keyRing) + r, ok := relayAPI.(*internal.RelayInternalAPI) + if !ok { + panic("This is a programming error.") + } + routing.Setup(fedMux, &cfg, r, keyRing) handler := fedMux.Get(routing.ForwardAsyncRouteName).GetHandler().ServeHTTP _, sk, _ := ed25519.GenerateKey(nil) @@ -137,7 +146,11 @@ func TestHandleAsyncEvents(t *testing.T) { relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - routing.Setup(fedMux, &cfg, &relayAPI, keyRing) + r, ok := relayAPI.(*internal.RelayInternalAPI) + if !ok { + panic("This is a programming error.") + } + routing.Setup(fedMux, &cfg, r, keyRing) handler := fedMux.Get(routing.AsyncEventsRouteName).GetHandler().ServeHTTP _, sk, _ := ed25519.GenerateKey(nil) @@ -179,7 +192,11 @@ func TestHandleAsyncEventsBadUserID(t *testing.T) { relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - routing.Setup(fedMux, &cfg, &relayAPI, keyRing) + r, ok := relayAPI.(*internal.RelayInternalAPI) + if !ok { + panic("This is a programming error.") + } + routing.Setup(fedMux, &cfg, r, keyRing) handler := fedMux.Get(routing.AsyncEventsRouteName).GetHandler().ServeHTTP _, sk, _ := ed25519.GenerateKey(nil) diff --git a/relayapi/storage/tables/relay_queue_json_table_test.go b/relayapi/storage/tables/relay_queue_json_table_test.go index 99c4867da..04aadb3dd 100644 --- a/relayapi/storage/tables/relay_queue_json_table_test.go +++ b/relayapi/storage/tables/relay_queue_json_table_test.go @@ -17,8 +17,7 @@ import ( ) const ( - testOrigin = gomatrixserverlib.ServerName("kaer.morhen") - testDestination = gomatrixserverlib.ServerName("white.orchard") + testOrigin = gomatrixserverlib.ServerName("kaer.morhen") ) func mustCreateTransaction() gomatrixserverlib.Transaction {