Refactor relay querying

This commit is contained in:
Devon Hudson 2022-12-14 10:30:30 -07:00
parent d1121643f4
commit f300a4d0e9
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
9 changed files with 141 additions and 109 deletions

View file

@ -38,13 +38,16 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/relayapi"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
@ -216,6 +219,11 @@ func main() {
base, federation, rsAPI, base.Caches, keyRing, true,
)
// TODO : What if I had another dendrite function, where I just passed in the fed_client,
// rsAPI, userAPI, keyRing & any other info. Then that thing is what handles doing the
// async_events querying?
// ie. don't tie it into the federationInternalAPI & get rid of the associated mess.
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(userAPI)
@ -311,11 +319,32 @@ func main() {
relayServerSyncRunning := atomic.NewBool(false)
stopRelayServerSync := make(chan bool)
js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.FederationAPI.Matrix.JetStream)
producer := &producers.SyncAPIProducer{
JetStream: js,
TopicReceiptEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
TopicPresenceEvent: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
TopicDeviceListUpdate: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
TopicSigningKeyUpdate: base.Cfg.FederationAPI.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
Config: &base.Cfg.FederationAPI,
UserAPI: userAPI,
}
m := RelayServerRetriever{
Context: context.Background(),
ServerName: gomatrixserverlib.ServerName(pRouter.PublicKey().String()),
FederationAPI: fsAPI,
RelayServersQueried: make(map[gomatrixserverlib.ServerName]bool),
RelayAPI: relayapi.NewRelayAPI(
federation,
rsAPI,
keyRing,
producer,
cfg.Global.Presence.EnableInbound,
cfg.Global.ServerName,
),
}
m.InitializeRelayServers(eLog)
@ -358,6 +387,7 @@ type RelayServerRetriever struct {
ServerName gomatrixserverlib.ServerName
FederationAPI api.FederationInternalAPI
RelayServersQueried map[gomatrixserverlib.ServerName]bool
RelayAPI relayapi.RelayAPI
}
func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) {
@ -406,11 +436,19 @@ func (m *RelayServerRetriever) syncRelayServers(stop <-chan bool, running atomic
func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) {
logrus.Info("querying relay servers for async_events")
for _, server := range relayServers {
request := api.PerformRelayServerSyncRequest{RelayServer: server}
response := api.PerformRelayServerSyncResponse{}
err := m.FederationAPI.PerformRelayServerSync(m.Context, &request, &response)
userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false)
if err != nil {
return
}
err = m.RelayAPI.PerformRelayServerSync(*userID, server)
if err == nil {
m.RelayServersQueried[server] = true
// TODO : What happens if your relay receives new messages after this point?
// Should you continue to check with them, or should they try and contact you?
// They could send a "new_async_events" message your way maybe?
// Then you could mark them as needing to be queried again.
// What if you miss this message?
// Maybe you should try querying them again after a certain period of time as a backup?
} else {
logrus.Errorf("Failed querying relay server: %s", err.Error())
}

View file

@ -43,11 +43,6 @@ type FederationInternalAPI interface {
request *QueryRelayServersRequest,
response *QueryRelayServersResponse,
) error
PerformRelayServerSync(
ctx context.Context,
request *PerformRelayServerSyncRequest,
response *PerformRelayServerSyncResponse,
) error
}
type RelayServerAPI interface {
@ -273,14 +268,6 @@ type QueryRelayServersResponse struct {
RelayServers []gomatrixserverlib.ServerName
}
type PerformRelayServerSyncRequest struct {
RelayServer gomatrixserverlib.ServerName
}
type PerformRelayServerSyncResponse struct {
SyncComplete bool
}
type PerformStoreAsyncRequest struct {
Txn gomatrixserverlib.Transaction `json:"transaction"`
UserID gomatrixserverlib.UserID `json:"user_id"`

View file

@ -15,7 +15,6 @@ import (
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
"github.com/matrix-org/dendrite/federationapi/storage/shared"
"github.com/matrix-org/dendrite/internal"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
)
@ -849,69 +848,6 @@ func (r *FederationInternalAPI) QueryRelayServers(
return nil
}
// PerformRelayServerSync implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformRelayServerSync(
ctx context.Context,
request *api.PerformRelayServerSyncRequest,
response *api.PerformRelayServerSyncResponse,
) error {
userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.cfg.Matrix.ServerName), false)
if err != nil {
return err
}
asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.RelayServer)
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Transaction)
for asyncResponse.Remaining > 0 {
asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.RelayServer)
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Transaction)
}
return nil
}
func (r *FederationInternalAPI) processTransaction(txn *gomatrixserverlib.Transaction) {
logrus.Warn("Processing transaction from relay server")
mu := internal.NewMutexByRoom()
// js, _ := base.NATS.Prepare(base.ProcessContext, &r.cfg.Matrix.JetStream)
// producer := &producers.SyncAPIProducer{
// JetStream: js,
// TopicReceiptEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
// TopicSendToDeviceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
// TopicTypingEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
// TopicPresenceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
// TopicDeviceListUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
// TopicSigningKeyUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
// Config: r.cfg,
// UserAPI: r.userAPI,
// }
t := NewTxnReq(
r.rsAPI,
nil,
r.cfg.Matrix.ServerName,
r.keyRing,
mu,
nil,
nil, // TODO : assign producer to process EDUs
r.cfg.Matrix.Presence.EnableInbound,
txn.PDUs,
txn.EDUs,
txn.Origin,
txn.TransactionID,
txn.Destination)
t.ProcessTransaction(context.TODO())
}
// PerformStoreAsync implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformStoreAsync(
ctx context.Context,

View file

@ -25,7 +25,6 @@ const (
FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU"
FederationAPIPerformWakeupServers = "/federationapi/performWakeupServers"
FederationAPIQueryRelayServers = "/federationapi/queryRelayServers"
FederationAPIPerformRelayServerSync = "/federationapi/performRelayServerSync"
FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync"
FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions"
@ -527,18 +526,6 @@ func (h *httpFederationInternalAPI) QueryRelayServers(
)
}
// PerformRelayServerSync implements api.FederationInternalAPI
func (h *httpFederationInternalAPI) PerformRelayServerSync(
ctx context.Context,
request *api.PerformRelayServerSyncRequest,
response *api.PerformRelayServerSyncResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformRelayServerSync", h.federationAPIURL+FederationAPIPerformRelayServerSync,
h.httpClient, ctx, request, response,
)
}
func (h *httpFederationInternalAPI) PerformStoreAsync(
ctx context.Context,
request *api.PerformStoreAsyncRequest,

View file

@ -62,7 +62,7 @@ func Setup(
producer *producers.SyncAPIProducer,
) {
prometheus.MustRegister(
fedInternal.PDUCountTotal, fedInternal.EDUCountTotal,
internal.PDUCountTotal, internal.EDUCountTotal,
)
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()

View file

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
@ -118,13 +117,12 @@ func Send(
}
}
t := fedInternal.NewTxnReq(
t := internal.NewTxnReq(
rsAPI,
keyAPI,
cfg.Matrix.ServerName,
keys,
mu,
servers,
producer,
cfg.Matrix.Presence.EnableInbound,
txnEvents.PDUs,

View file

@ -7,7 +7,6 @@ import (
"testing"
"time"
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/test"
@ -185,15 +184,14 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, origin, s gomatr
return c.getMissingEvents(missing)
}
func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMessage) *fedInternal.TxnReq {
t := fedInternal.NewTxnReq(
func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMessage) *internal.TxnReq {
t := internal.NewTxnReq(
rsAPI,
nil,
"",
&test.NopJSONVerifier{},
internal.NewMutexByRoom(),
nil,
nil,
false,
pdus,
nil,
@ -207,7 +205,7 @@ func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMes
return &t
}
func mustProcessTransaction(t *testing.T, txn *fedInternal.TxnReq, pdusWithErrors []string) {
func mustProcessTransaction(t *testing.T, txn *internal.TxnReq, pdusWithErrors []string) {
res, err := txn.ProcessTransaction(context.Background())
if err != nil {
t.Errorf("txn.processTransaction returned an error: %v", err)

View file

@ -22,10 +22,8 @@ import (
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
@ -61,8 +59,7 @@ type TxnReq struct {
keyAPI keyapi.FederationKeyAPI
ourServerName gomatrixserverlib.ServerName
keys gomatrixserverlib.JSONVerifier
roomsMu *internal.MutexByRoom
servers federationAPI.ServersInRoomProvider
roomsMu *MutexByRoom
producer *producers.SyncAPIProducer
inboundPresenceEnabled bool
}
@ -72,8 +69,7 @@ func NewTxnReq(
keyAPI keyapi.FederationKeyAPI,
ourServerName gomatrixserverlib.ServerName,
keys gomatrixserverlib.JSONVerifier,
roomsMu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
roomsMu *MutexByRoom,
producer *producers.SyncAPIProducer,
inboundPresenceEnabled bool,
pdus []json.RawMessage,
@ -88,7 +84,6 @@ func NewTxnReq(
ourServerName: ourServerName,
keys: keys,
roomsMu: roomsMu,
servers: servers,
producer: producer,
inboundPresenceEnabled: inboundPresenceEnabled,
}

93
relayapi/relayapi.go Normal file
View file

@ -0,0 +1,93 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package relayapi
import (
"context"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal"
rsAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type RelayAPI struct {
fedClient *gomatrixserverlib.FederationClient
rsAPI rsAPI.RoomserverInternalAPI
keyRing *gomatrixserverlib.KeyRing
producer *producers.SyncAPIProducer
presenceEnabledInbound bool
serverName gomatrixserverlib.ServerName
}
func NewRelayAPI(
fedClient *gomatrixserverlib.FederationClient,
rsAPI rsAPI.RoomserverInternalAPI,
keyRing *gomatrixserverlib.KeyRing,
producer *producers.SyncAPIProducer,
presenceEnabledInbound bool,
serverName gomatrixserverlib.ServerName,
) RelayAPI {
return RelayAPI{
fedClient: fedClient,
rsAPI: rsAPI,
keyRing: keyRing,
producer: producer,
presenceEnabledInbound: presenceEnabledInbound,
serverName: serverName,
}
}
// PerformRelayServerSync implements api.FederationInternalAPI
func (r *RelayAPI) PerformRelayServerSync(userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName) error {
asyncResponse, err := r.fedClient.GetAsyncEvents(context.Background(), userID, relayServer)
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Transaction)
for asyncResponse.Remaining > 0 {
asyncResponse, err := r.fedClient.GetAsyncEvents(context.Background(), userID, relayServer)
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Transaction)
}
return nil
}
func (r *RelayAPI) processTransaction(txn *gomatrixserverlib.Transaction) {
logrus.Warn("Processing transaction from relay server")
mu := internal.NewMutexByRoom()
t := internal.NewTxnReq(
r.rsAPI,
nil,
r.serverName,
r.keyRing,
mu,
r.producer,
r.presenceEnabledInbound,
txn.PDUs,
txn.EDUs,
txn.Origin,
txn.TransactionID,
txn.Destination)
t.ProcessTransaction(context.TODO())
}