Hook in mailserver retrieval to pinecone demos

This commit is contained in:
Devon Hudson 2022-12-06 13:59:20 -07:00
parent 5d45b5baad
commit 07a4ca968c
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
11 changed files with 805 additions and 456 deletions

View file

@ -67,24 +67,27 @@ import (
)
const (
PeerTypeRemote = pineconeRouter.PeerTypeRemote
PeerTypeMulticast = pineconeRouter.PeerTypeMulticast
PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth
PeerTypeBonjour = pineconeRouter.PeerTypeBonjour
PeerTypeRemote = pineconeRouter.PeerTypeRemote
PeerTypeMulticast = pineconeRouter.PeerTypeMulticast
PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth
PeerTypeBonjour = pineconeRouter.PeerTypeBonjour
mailserverRetryInterval = time.Second * 30
)
type DendriteMonolith struct {
logger logrus.Logger
PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions
PineconeManager *pineconeConnections.ConnectionManager
StorageDirectory string
CacheDirectory string
listener net.Listener
httpServer *http.Server
processContext *process.ProcessContext
userAPI userapiAPI.UserInternalAPI
logger logrus.Logger
PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions
PineconeManager *pineconeConnections.ConnectionManager
StorageDirectory string
CacheDirectory string
listener net.Listener
httpServer *http.Server
processContext *process.ProcessContext
userAPI userapiAPI.UserInternalAPI
federationAPI api.FederationInternalAPI
mailserversQueried map[gomatrixserverlib.ServerName]bool
}
func (m *DendriteMonolith) PublicKey() string {
@ -346,11 +349,11 @@ func (m *DendriteMonolith) Start() {
rsAPI := roomserver.NewInternalAPI(base)
fsAPI := federationapi.NewInternalAPI(
m.federationAPI = federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, m.federationAPI)
m.userAPI = userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(m.userAPI)
@ -358,10 +361,10 @@ func (m *DendriteMonolith) Start() {
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsAPI.SetFederationAPI(fsAPI, keyRing)
rsAPI.SetFederationAPI(m.federationAPI, keyRing)
userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation)
roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation)
roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, m.federationAPI, federation)
monolith := setup.Monolith{
Config: base.Cfg,
@ -370,7 +373,7 @@ func (m *DendriteMonolith) Start() {
KeyRing: keyRing,
AppserviceAPI: asAPI,
FederationAPI: fsAPI,
FederationAPI: m.federationAPI,
RoomserverAPI: rsAPI,
UserAPI: m.userAPI,
KeyAPI: keyAPI,
@ -436,25 +439,45 @@ func (m *DendriteMonolith) Start() {
go func(ch <-chan pineconeEvents.Event) {
eLog := logrus.WithField("pinecone", "events")
mailserverSyncRunning := atomic.NewBool(false)
stopMailserverSync := make(chan bool)
// Setup mailserver info
request := api.QueryMailserversRequest{Server: gomatrixserverlib.ServerName(m.PublicKey())}
response := api.QueryMailserversResponse{}
err := m.federationAPI.QueryMailservers(m.processContext.Context(), &request, &response)
if err != nil {
// TODO
}
m.mailserversQueried = make(map[gomatrixserverlib.ServerName]bool)
for _, server := range response.Mailservers {
m.mailserversQueried[server] = false
}
for event := range ch {
switch e := event.(type) {
case pineconeEvents.PeerAdded:
if !mailserverSyncRunning.Load() {
go m.syncMailservers(stopMailserverSync, *mailserverSyncRunning)
}
case pineconeEvents.PeerRemoved:
if mailserverSyncRunning.Load() && m.PineconeRouter.PeerCount(-1) == 0 {
stopMailserverSync <- true
}
case pineconeEvents.TreeParentUpdate:
case pineconeEvents.SnakeDescUpdate:
case pineconeEvents.TreeRootAnnUpdate:
case pineconeEvents.SnakeEntryAdded:
case pineconeEvents.SnakeEntryRemoved:
case pineconeEvents.BroadcastReceived:
eLog.Info("Broadcast received from: ", e.PeerID)
// eLog.Info("Broadcast received from: ", e.PeerID)
req := &api.PerformWakeupServersRequest{
ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
}
res := &api.PerformWakeupServersResponse{}
if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID)
if err := m.federationAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
}
case pineconeEvents.BandwidthReport:
default:
@ -463,6 +486,46 @@ func (m *DendriteMonolith) Start() {
}(pineconeEventChannel)
}
func (m *DendriteMonolith) syncMailservers(stop <-chan bool, running atomic.Bool) {
defer running.Store(false)
t := time.NewTimer(mailserverRetryInterval)
for {
mailserversToQuery := []gomatrixserverlib.ServerName{}
for server, complete := range m.mailserversQueried {
if !complete {
mailserversToQuery = append(mailserversToQuery, server)
}
}
if len(mailserversToQuery) == 0 {
// All mailservers have been synced.
return
}
m.queryMailservers(mailserversToQuery)
t.Reset(mailserverRetryInterval)
select {
case <-stop:
if !t.Stop() {
<-t.C
}
return
case <-t.C:
}
}
}
func (m *DendriteMonolith) queryMailservers(mailservers []gomatrixserverlib.ServerName) {
for _, server := range mailservers {
request := api.PerformMailserverSyncRequest{Mailserver: server}
response := api.PerformMailserverSyncResponse{}
err := m.federationAPI.PerformMailserverSync(m.processContext.Context(), &request, &response)
if err == nil {
m.mailserversQueried[server] = true
}
}
}
func (m *DendriteMonolith) Stop() {
m.processContext.ShutdownDendrite()
_ = m.listener.Close()

View file

@ -48,6 +48,7 @@ import (
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
@ -66,6 +67,8 @@ var (
instanceDir = flag.String("dir", ".", "the directory to store the databases in (if --config not specified)")
)
const mailserverRetryInterval = time.Second * 30
// nolint:gocyclo
func main() {
flag.Parse()
@ -305,25 +308,41 @@ func main() {
go func(ch <-chan pineconeEvents.Event) {
eLog := logrus.WithField("pinecone", "events")
mailserverSyncRunning := atomic.NewBool(false)
stopMailserverSync := make(chan bool)
m := MailserverRetriever{
Context: context.Background(),
ServerName: gomatrixserverlib.ServerName(pRouter.PublicKey().String()),
FederationAPI: fsAPI,
MailserversQueried: make(map[gomatrixserverlib.ServerName]bool),
}
m.InitializeMailservers(eLog)
for event := range ch {
switch e := event.(type) {
case pineconeEvents.PeerAdded:
if !mailserverSyncRunning.Load() {
go m.syncMailservers(stopMailserverSync, *mailserverSyncRunning)
}
case pineconeEvents.PeerRemoved:
if mailserverSyncRunning.Load() && pRouter.PeerCount(-1) == 0 {
stopMailserverSync <- true
}
case pineconeEvents.TreeParentUpdate:
case pineconeEvents.SnakeDescUpdate:
case pineconeEvents.TreeRootAnnUpdate:
case pineconeEvents.SnakeEntryAdded:
case pineconeEvents.SnakeEntryRemoved:
case pineconeEvents.BroadcastReceived:
eLog.Info("Broadcast received from: ", e.PeerID)
// eLog.Info("Broadcast received from: ", e.PeerID)
req := &api.PerformWakeupServersRequest{
ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
}
res := &api.PerformWakeupServersResponse{}
if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID)
eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
}
case pineconeEvents.BandwidthReport:
default:
@ -333,3 +352,67 @@ func main() {
base.WaitForShutdown()
}
type MailserverRetriever struct {
Context context.Context
ServerName gomatrixserverlib.ServerName
FederationAPI api.FederationInternalAPI
MailserversQueried map[gomatrixserverlib.ServerName]bool
}
func (m *MailserverRetriever) InitializeMailservers(eLog *logrus.Entry) {
request := api.QueryMailserversRequest{Server: gomatrixserverlib.ServerName(m.ServerName)}
response := api.QueryMailserversResponse{}
err := m.FederationAPI.QueryMailservers(m.Context, &request, &response)
if err != nil {
// TODO
}
for _, server := range response.Mailservers {
m.MailserversQueried[server] = false
}
eLog.Infof("Registered mailservers: %v", response.Mailservers)
}
func (m *MailserverRetriever) syncMailservers(stop <-chan bool, running atomic.Bool) {
defer running.Store(false)
t := time.NewTimer(mailserverRetryInterval)
for {
mailserversToQuery := []gomatrixserverlib.ServerName{}
for server, complete := range m.MailserversQueried {
if !complete {
mailserversToQuery = append(mailserversToQuery, server)
}
}
if len(mailserversToQuery) == 0 {
// All mailservers have been synced.
return
}
m.queryMailservers(mailserversToQuery)
t.Reset(mailserverRetryInterval)
select {
case <-stop:
if !t.Stop() {
<-t.C
}
return
case <-t.C:
}
}
}
func (m *MailserverRetriever) queryMailservers(mailservers []gomatrixserverlib.ServerName) {
logrus.Info("querying mailservers for async_events")
for _, server := range mailservers {
request := api.PerformMailserverSyncRequest{Mailserver: server}
response := api.PerformMailserverSyncResponse{}
err := m.FederationAPI.PerformMailserverSync(m.Context, &request, &response)
if err == nil {
m.MailserversQueried[server] = true
} else {
logrus.Errorf("Failed querying mailserver: %s", err.Error())
}
}
}

View file

@ -36,6 +36,18 @@ type FederationInternalAPI interface {
request *PerformWakeupServersRequest,
response *PerformWakeupServersResponse,
) error
// Mailserver sync api used in the pinecone demos.
QueryMailservers(
ctx context.Context,
request *QueryMailserversRequest,
response *QueryMailserversResponse,
) error
PerformMailserverSync(
ctx context.Context,
request *PerformMailserverSyncRequest,
response *PerformMailserverSyncResponse,
) error
}
type MailserverAPI interface {
@ -239,6 +251,36 @@ type PerformBroadcastEDURequest struct {
type PerformBroadcastEDUResponse struct {
}
type PerformWakeupServersRequest struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
type PerformWakeupServersResponse struct {
}
type InputPublicKeysRequest struct {
Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"`
}
type InputPublicKeysResponse struct {
}
type QueryMailserversRequest struct {
Server gomatrixserverlib.ServerName
}
type QueryMailserversResponse struct {
Mailservers []gomatrixserverlib.ServerName
}
type PerformMailserverSyncRequest struct {
Mailserver gomatrixserverlib.ServerName
}
type PerformMailserverSyncResponse struct {
SyncComplete bool
}
type PerformStoreAsyncRequest struct {
Txn gomatrixserverlib.Transaction `json:"transaction"`
UserID gomatrixserverlib.UserID `json:"user_id"`
@ -255,17 +297,3 @@ type QueryAsyncTransactionsResponse struct {
Txn gomatrixserverlib.Transaction `json:"transaction"`
RemainingCount uint32 `json:"remaining"`
}
type PerformWakeupServersRequest struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
type PerformWakeupServersResponse struct {
}
type InputPublicKeysRequest struct {
Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"`
}
type InputPublicKeysResponse struct {
}

View file

@ -15,6 +15,7 @@ import (
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
"github.com/matrix-org/dendrite/federationapi/storage/shared"
"github.com/matrix-org/dendrite/internal"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
)
@ -696,60 +697,6 @@ func (r *FederationInternalAPI) PerformBroadcastEDU(
return nil
}
// PerformStoreAsync implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformStoreAsync(
ctx context.Context,
request *api.PerformStoreAsyncRequest,
response *api.PerformStoreAsyncResponse,
) error {
receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn)
if err != nil {
return err
}
err = r.db.AssociateAsyncTransactionWithDestinations(
ctx,
map[gomatrixserverlib.UserID]struct{}{
request.UserID: {},
},
request.Txn.TransactionID,
receipt)
return err
}
// QueryAsyncTransactions implements api.FederationInternalAPI
func (r *FederationInternalAPI) QueryAsyncTransactions(
ctx context.Context,
request *api.QueryAsyncTransactionsRequest,
response *api.QueryAsyncTransactionsResponse,
) error {
transaction, receipt, err := r.db.GetAsyncTransaction(ctx, request.UserID)
if err != nil {
return err
}
// TODO : Shouldn't be deleting unless the transaction was successfully returned...
// TODO : Should delete transaction json from table if no more associations
if transaction != nil && receipt != nil {
err = r.db.CleanAsyncTransactions(ctx, request.UserID, []*shared.Receipt{receipt})
if err != nil {
return err
}
}
// TODO : These db calls should happen at the same time right?
count, err := r.db.GetAsyncTransactionCount(ctx, request.UserID)
if err != nil {
return err
}
response.RemainingCount = uint32(count)
if transaction != nil {
response.Txn = *transaction
}
return nil
}
// PerformWakeupServers implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformWakeupServers(
ctx context.Context,
@ -885,3 +832,141 @@ func federatedAuthProvider(
return returning, nil
}
}
// QueryMailservers implements api.FederationInternalAPI
func (r *FederationInternalAPI) QueryMailservers(
ctx context.Context,
request *api.QueryMailserversRequest,
response *api.QueryMailserversResponse,
) error {
logrus.Infof("Getting mailservers for: %s", request.Server)
mailservers, err := r.db.GetMailserversForServer(request.Server)
if err != nil {
return err
}
response.Mailservers = mailservers
return nil
}
// PerformMailserverSync implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformMailserverSync(
ctx context.Context,
request *api.PerformMailserverSyncRequest,
response *api.PerformMailserverSyncResponse,
) error {
userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.cfg.Matrix.ServerName), false)
if err != nil {
return err
}
asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.Mailserver)
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Transaction)
for asyncResponse.Remaining > 0 {
asyncResponse, err := r.federation.GetAsyncEvents(ctx, *userID, request.Mailserver)
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Transaction)
}
return nil
}
func (r *FederationInternalAPI) processTransaction(txn *gomatrixserverlib.Transaction) {
logrus.Warn("Processing transaction from mailserver")
mu := internal.NewMutexByRoom()
// js, _ := base.NATS.Prepare(base.ProcessContext, &r.cfg.Matrix.JetStream)
// producer := &producers.SyncAPIProducer{
// JetStream: js,
// TopicReceiptEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
// TopicSendToDeviceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
// TopicTypingEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
// TopicPresenceEvent: r.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
// TopicDeviceListUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
// TopicSigningKeyUpdate: r.cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
// Config: r.cfg,
// UserAPI: r.userAPI,
// }
t := NewTxnReq(
r.rsAPI,
nil,
r.cfg.Matrix.ServerName,
r.keyRing,
mu,
nil,
nil, // TODO : assign producer to process EDUs
r.cfg.Matrix.Presence.EnableInbound,
txn.PDUs,
txn.EDUs,
txn.Origin,
txn.TransactionID,
txn.Destination)
t.ProcessTransaction(context.TODO())
}
// PerformStoreAsync implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformStoreAsync(
ctx context.Context,
request *api.PerformStoreAsyncRequest,
response *api.PerformStoreAsyncResponse,
) error {
logrus.Warnf("Storing transaction for %v", request.UserID)
receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn)
if err != nil {
return err
}
err = r.db.AssociateAsyncTransactionWithDestinations(
ctx,
map[gomatrixserverlib.UserID]struct{}{
request.UserID: {},
},
request.Txn.TransactionID,
receipt)
return err
}
// QueryAsyncTransactions implements api.FederationInternalAPI
func (r *FederationInternalAPI) QueryAsyncTransactions(
ctx context.Context,
request *api.QueryAsyncTransactionsRequest,
response *api.QueryAsyncTransactionsResponse,
) error {
logrus.Warnf("Obtaining transaction for %v", request.UserID)
transaction, receipt, err := r.db.GetAsyncTransaction(ctx, request.UserID)
if err != nil {
return err
}
// TODO : Shouldn't be deleting unless the transaction was successfully returned...
// TODO : Should delete transaction json from table if no more associations
if transaction != nil && receipt != nil {
err = r.db.CleanAsyncTransactions(ctx, request.UserID, []*shared.Receipt{receipt})
if err != nil {
return err
}
// TODO : Clean async transactions json
}
// TODO : These db calls should happen at the same time right?
count, err := r.db.GetAsyncTransactionCount(ctx, request.UserID)
if err != nil {
return err
}
response.RemainingCount = uint32(count)
if transaction != nil {
response.Txn = *transaction
logrus.Warnf("Obtained transaction: %v", transaction.TransactionID)
}
return nil
}

View file

@ -0,0 +1,361 @@
// Copyright 2022 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
var (
PDUCountTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_pdus",
Help: "Number of incoming PDUs from remote servers with labels for success",
},
[]string{"status"}, // 'success' or 'total'
)
EDUCountTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_edus",
Help: "Number of incoming EDUs from remote servers",
},
)
)
type TxnReq struct {
gomatrixserverlib.Transaction
rsAPI api.FederationRoomserverAPI
keyAPI keyapi.FederationKeyAPI
ourServerName gomatrixserverlib.ServerName
keys gomatrixserverlib.JSONVerifier
roomsMu *internal.MutexByRoom
servers federationAPI.ServersInRoomProvider
producer *producers.SyncAPIProducer
inboundPresenceEnabled bool
}
func NewTxnReq(
rsAPI api.FederationRoomserverAPI,
keyAPI keyapi.FederationKeyAPI,
ourServerName gomatrixserverlib.ServerName,
keys gomatrixserverlib.JSONVerifier,
roomsMu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
producer *producers.SyncAPIProducer,
inboundPresenceEnabled bool,
pdus []json.RawMessage,
edus []gomatrixserverlib.EDU,
origin gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID,
destination gomatrixserverlib.ServerName,
) TxnReq {
t := TxnReq{
rsAPI: rsAPI,
keyAPI: keyAPI,
ourServerName: ourServerName,
keys: keys,
roomsMu: roomsMu,
servers: servers,
producer: producer,
inboundPresenceEnabled: inboundPresenceEnabled,
}
t.PDUs = pdus
t.EDUs = edus
t.Origin = origin
t.TransactionID = transactionID
t.Destination = destination
return t
}
func (t *TxnReq) ProcessTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if t.producer != nil {
t.processEDUs(ctx)
}
}()
results := make(map[string]gomatrixserverlib.PDUResult)
roomVersions := make(map[string]gomatrixserverlib.RoomVersion)
getRoomVersion := func(roomID string) gomatrixserverlib.RoomVersion {
if v, ok := roomVersions[roomID]; ok {
return v
}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to query room version for room", verReq.RoomID)
return ""
}
roomVersions[roomID] = verRes.RoomVersion
return verRes.RoomVersion
}
for _, pdu := range t.PDUs {
PDUCountTotal.WithLabelValues("total").Inc()
var header struct {
RoomID string `json:"room_id"`
}
if err := json.Unmarshal(pdu, &header); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to extract room ID from event")
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
}
roomVersion := getRoomVersion(header.RoomID)
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if err != nil {
if _, ok := err.(gomatrixserverlib.BadJSONError); ok {
// Room version 6 states that homeservers should strictly enforce canonical JSON
// on PDUs.
//
// This enforces that the entire transaction is rejected if a single bad PDU is
// sent. It is unclear if this is the correct behaviour or not.
//
// See https://github.com/matrix-org/synapse/issues/7543
return nil, &util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON("PDU contains bad JSON"),
}
}
util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu))
continue
}
if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
continue
}
if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: "Forbidden by server ACLs",
}
continue
}
if err = event.VerifyEventSignatures(ctx, t.keys); err != nil {
util.GetLogger(ctx).WithError(err).Debugf("Transaction: Couldn't validate signature of event %q", event.EventID())
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: err.Error(),
}
continue
}
// pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
// discarded by the caller of this function
if err = api.SendEvents(
ctx,
t.rsAPI,
api.KindNew,
[]*gomatrixserverlib.HeaderedEvent{
event.Headered(roomVersion),
},
t.Destination,
t.Origin,
api.DoNotSendToOtherServers,
nil,
true,
); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Transaction: Couldn't submit event %q to input queue: %s", event.EventID(), err)
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: err.Error(),
}
continue
}
results[event.EventID()] = gomatrixserverlib.PDUResult{}
PDUCountTotal.WithLabelValues("success").Inc()
}
wg.Wait()
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
// nolint:gocyclo
func (t *TxnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs {
EDUCountTotal.Inc()
switch e.Type {
case gomatrixserverlib.MTyping:
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
var typingPayload struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal typing event")
continue
}
if _, serverName, err := gomatrixserverlib.SplitID('@', typingPayload.UserID); err != nil {
continue
} else if serverName == t.ourServerName {
continue
} else if serverName != t.Origin {
continue
}
if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream")
}
case gomatrixserverlib.MDirectToDevice:
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
var directPayload gomatrixserverlib.ToDeviceMessage
if err := json.Unmarshal(e.Content, &directPayload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal send-to-device events")
continue
}
if _, serverName, err := gomatrixserverlib.SplitID('@', directPayload.Sender); err != nil {
continue
} else if serverName == t.ourServerName {
continue
} else if serverName != t.Origin {
continue
}
for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here
if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
sentry.CaptureException(err)
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender,
"user_id": userID,
"device_id": deviceID,
}).Error("Failed to send send-to-device event to JetStream")
}
}
}
case gomatrixserverlib.MDeviceListUpdate:
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
sentry.CaptureException(err)
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
}
case gomatrixserverlib.MReceipt:
// https://matrix.org/docs/spec/server_server/r0.1.4#receipts
payload := map[string]types.FederationReceiptMRead{}
if err := json.Unmarshal(e.Content, &payload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event")
continue
}
for roomID, receipt := range payload {
for userID, mread := range receipt.User {
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to split domain from receipt event sender")
continue
}
if t.Origin != domain {
util.GetLogger(ctx).Debugf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
continue
}
if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"sender": t.Origin,
"user_id": userID,
"room_id": roomID,
"events": mread.EventIDs,
}).Error("Failed to send receipt event to JetStream")
continue
}
}
}
case types.MSigningKeyUpdate:
if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
sentry.CaptureException(err)
logrus.WithError(err).Errorf("Failed to process signing key update")
}
case gomatrixserverlib.MPresence:
if t.inboundPresenceEnabled {
if err := t.processPresence(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process presence update")
}
}
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}
}
}
// processPresence handles m.receipt events
func (t *TxnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
payload := types.Presence{}
if err := json.Unmarshal(e.Content, &payload); err != nil {
return err
}
for _, content := range payload.Push {
if _, serverName, err := gomatrixserverlib.SplitID('@', content.UserID); err != nil {
continue
} else if serverName == t.ourServerName {
continue
} else if serverName != t.Origin {
continue
}
presence, ok := syncTypes.PresenceFromString(content.Presence)
if !ok {
continue
}
if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
return err
}
}
return nil
}
// processReceiptEvent sends receipt events to JetStream
func (t *TxnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string,
timestamp gomatrixserverlib.Timestamp,
eventIDs []string,
) error {
if _, serverName, err := gomatrixserverlib.SplitID('@', userID); err != nil {
return nil
} else if serverName == t.ourServerName {
return nil
} else if serverName != t.Origin {
return nil
}
// store every event
for _, eventID := range eventIDs {
if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil {
return fmt.Errorf("unable to set receipt event: %w", err)
}
}
return nil
}

View file

@ -23,9 +23,12 @@ const (
FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest"
FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest"
FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU"
FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync"
FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions"
FederationAPIPerformWakeupServers = "/federationapi/performWakeupServers"
FederationAPIQueryMailservers = "/federationapi/queryMailservers"
FederationAPIPerformMailserverSync = "/federationapi/performMailserverSync"
FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync"
FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions"
FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices"
FederationAPIClaimKeysPath = "/federationapi/client/claimKeys"
@ -153,28 +156,6 @@ func (h *httpFederationInternalAPI) PerformBroadcastEDU(
)
}
func (h *httpFederationInternalAPI) PerformStoreAsync(
ctx context.Context,
request *api.PerformStoreAsyncRequest,
response *api.PerformStoreAsyncResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformStoreAsync", h.federationAPIURL+FederationAPIPerformStoreAsyncPath,
h.httpClient, ctx, request, response,
)
}
func (h *httpFederationInternalAPI) QueryAsyncTransactions(
ctx context.Context,
request *api.QueryAsyncTransactionsRequest,
response *api.QueryAsyncTransactionsResponse,
) error {
return httputil.CallInternalRPCAPI(
"QueryAsyncTransactions", h.federationAPIURL+FederationAPIQueryAsyncTransactionsPath,
h.httpClient, ctx, request, response,
)
}
// Handle an instruction to remove the respective servers from being blacklisted.
func (h *httpFederationInternalAPI) PerformWakeupServers(
ctx context.Context,
@ -534,3 +515,48 @@ func (h *httpFederationInternalAPI) QueryPublicKeys(
h.httpClient, ctx, request, response,
)
}
func (h *httpFederationInternalAPI) QueryMailservers(
ctx context.Context,
request *api.QueryMailserversRequest,
response *api.QueryMailserversResponse,
) error {
return httputil.CallInternalRPCAPI(
"QueryMailservers", h.federationAPIURL+FederationAPIQueryMailservers,
h.httpClient, ctx, request, response,
)
}
// PerformMailserverSync implements api.FederationInternalAPI
func (h *httpFederationInternalAPI) PerformMailserverSync(
ctx context.Context,
request *api.PerformMailserverSyncRequest,
response *api.PerformMailserverSyncResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformMailserverSync", h.federationAPIURL+FederationAPIPerformMailserverSync,
h.httpClient, ctx, request, response,
)
}
func (h *httpFederationInternalAPI) PerformStoreAsync(
ctx context.Context,
request *api.PerformStoreAsyncRequest,
response *api.PerformStoreAsyncResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformStoreAsync", h.federationAPIURL+FederationAPIPerformStoreAsyncPath,
h.httpClient, ctx, request, response,
)
}
func (h *httpFederationInternalAPI) QueryAsyncTransactions(
ctx context.Context,
request *api.QueryAsyncTransactionsRequest,
response *api.QueryAsyncTransactionsResponse,
) error {
return httputil.CallInternalRPCAPI(
"QueryAsyncTransactions", h.federationAPIURL+FederationAPIQueryAsyncTransactionsPath,
h.httpClient, ctx, request, response,
)
}

View file

@ -411,8 +411,9 @@ func (oq *destinationQueue) nextTransaction(
mailservers := oq.statistics.KnownMailservers()
if oq.statistics.AssumedOffline() && len(mailservers) > 0 {
logrus.Infof("Sending to mailservers: %v", mailservers)
// TODO : how to pass through actual userID here?!?!?!?!
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.origin), false)
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
if userErr != nil {
return userErr, false
}

View file

@ -6,6 +6,7 @@ import (
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
type AsyncEventsResponse struct {
@ -21,6 +22,7 @@ func GetAsyncEvents(
fedAPI api.FederationInternalAPI,
userID gomatrixserverlib.UserID,
) util.JSONResponse {
logrus.Infof("Handling async_events for %v", userID)
var response api.QueryAsyncTransactionsResponse
err := fedAPI.QueryAsyncTransactions(httpReq.Context(), &api.QueryAsyncTransactionsRequest{UserID: userID}, &response)
if err != nil {

View file

@ -62,7 +62,7 @@ func Setup(
producer *producers.SyncAPIProducer,
) {
prometheus.MustRegister(
pduCountTotal, eduCountTotal,
fedInternal.PDUCountTotal, fedInternal.EDUCountTotal,
)
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()

View file

@ -17,26 +17,21 @@ package routing
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
)
const (
@ -56,26 +51,6 @@ const (
MetricsWorkMissingPrevEvents = "missing_prev_events"
)
var (
pduCountTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_pdus",
Help: "Number of incoming PDUs from remote servers with labels for success",
},
[]string{"status"}, // 'success' or 'total'
)
eduCountTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_edus",
Help: "Number of incoming EDUs from remote servers",
},
)
)
var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
// Send implements /_matrix/federation/v1/send/{txnID}
@ -123,18 +98,6 @@ func Send(
defer close(ch)
defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{
rsAPI: rsAPI,
keys: keys,
ourServerName: cfg.Matrix.ServerName,
federation: federation,
servers: servers,
keyAPI: keyAPI,
roomsMu: mu,
producer: producer,
inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound,
}
var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []gomatrixserverlib.EDU `json:"edus"`
@ -155,16 +118,24 @@ func Send(
}
}
// TODO: Really we should have a function to convert FederationRequest to txnReq
t.PDUs = txnEvents.PDUs
t.EDUs = txnEvents.EDUs
t.Origin = request.Origin()
t.TransactionID = txnID
t.Destination = cfg.Matrix.ServerName
t := fedInternal.NewTxnReq(
rsAPI,
keyAPI,
cfg.Matrix.ServerName,
keys,
mu,
servers,
producer,
cfg.Matrix.Presence.EnableInbound,
txnEvents.PDUs,
txnEvents.EDUs,
request.Origin(),
txnID,
cfg.Matrix.ServerName)
util.GetLogger(httpReq.Context()).Debugf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(httpReq.Context())
resp, jsonErr := t.ProcessTransaction(httpReq.Context())
if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr
@ -181,283 +152,3 @@ func Send(
ch <- res
return res
}
type txnReq struct {
gomatrixserverlib.Transaction
rsAPI api.FederationRoomserverAPI
keyAPI keyapi.FederationKeyAPI
ourServerName gomatrixserverlib.ServerName
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
roomsMu *internal.MutexByRoom
servers federationAPI.ServersInRoomProvider
producer *producers.SyncAPIProducer
inboundPresenceEnabled bool
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
type txnFederationClient interface {
LookupState(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
res gomatrixserverlib.RespState, err error,
)
LookupStateIDs(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error)
GetEvent(ctx context.Context, origin, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
LookupMissingEvents(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
}
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t.processEDUs(ctx)
}()
results := make(map[string]gomatrixserverlib.PDUResult)
roomVersions := make(map[string]gomatrixserverlib.RoomVersion)
getRoomVersion := func(roomID string) gomatrixserverlib.RoomVersion {
if v, ok := roomVersions[roomID]; ok {
return v
}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to query room version for room", verReq.RoomID)
return ""
}
roomVersions[roomID] = verRes.RoomVersion
return verRes.RoomVersion
}
for _, pdu := range t.PDUs {
pduCountTotal.WithLabelValues("total").Inc()
var header struct {
RoomID string `json:"room_id"`
}
if err := json.Unmarshal(pdu, &header); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to extract room ID from event")
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
}
roomVersion := getRoomVersion(header.RoomID)
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if err != nil {
if _, ok := err.(gomatrixserverlib.BadJSONError); ok {
// Room version 6 states that homeservers should strictly enforce canonical JSON
// on PDUs.
//
// This enforces that the entire transaction is rejected if a single bad PDU is
// sent. It is unclear if this is the correct behaviour or not.
//
// See https://github.com/matrix-org/synapse/issues/7543
return nil, &util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON("PDU contains bad JSON"),
}
}
util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu))
continue
}
if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
continue
}
if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: "Forbidden by server ACLs",
}
continue
}
if err = event.VerifyEventSignatures(ctx, t.keys); err != nil {
util.GetLogger(ctx).WithError(err).Debugf("Transaction: Couldn't validate signature of event %q", event.EventID())
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: err.Error(),
}
continue
}
// pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
// discarded by the caller of this function
if err = api.SendEvents(
ctx,
t.rsAPI,
api.KindNew,
[]*gomatrixserverlib.HeaderedEvent{
event.Headered(roomVersion),
},
t.Destination,
t.Origin,
api.DoNotSendToOtherServers,
nil,
true,
); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Transaction: Couldn't submit event %q to input queue: %s", event.EventID(), err)
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: err.Error(),
}
continue
}
results[event.EventID()] = gomatrixserverlib.PDUResult{}
pduCountTotal.WithLabelValues("success").Inc()
}
wg.Wait()
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
// nolint:gocyclo
func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs {
eduCountTotal.Inc()
switch e.Type {
case gomatrixserverlib.MTyping:
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
var typingPayload struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal typing event")
continue
}
if _, serverName, err := gomatrixserverlib.SplitID('@', typingPayload.UserID); err != nil {
continue
} else if serverName == t.ourServerName {
continue
} else if serverName != t.Origin {
continue
}
if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream")
}
case gomatrixserverlib.MDirectToDevice:
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
var directPayload gomatrixserverlib.ToDeviceMessage
if err := json.Unmarshal(e.Content, &directPayload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal send-to-device events")
continue
}
if _, serverName, err := gomatrixserverlib.SplitID('@', directPayload.Sender); err != nil {
continue
} else if serverName == t.ourServerName {
continue
} else if serverName != t.Origin {
continue
}
for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here
if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
sentry.CaptureException(err)
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender,
"user_id": userID,
"device_id": deviceID,
}).Error("Failed to send send-to-device event to JetStream")
}
}
}
case gomatrixserverlib.MDeviceListUpdate:
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
sentry.CaptureException(err)
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
}
case gomatrixserverlib.MReceipt:
// https://matrix.org/docs/spec/server_server/r0.1.4#receipts
payload := map[string]types.FederationReceiptMRead{}
if err := json.Unmarshal(e.Content, &payload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event")
continue
}
for roomID, receipt := range payload {
for userID, mread := range receipt.User {
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to split domain from receipt event sender")
continue
}
if t.Origin != domain {
util.GetLogger(ctx).Debugf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
continue
}
if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"sender": t.Origin,
"user_id": userID,
"room_id": roomID,
"events": mread.EventIDs,
}).Error("Failed to send receipt event to JetStream")
continue
}
}
}
case types.MSigningKeyUpdate:
if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
sentry.CaptureException(err)
logrus.WithError(err).Errorf("Failed to process signing key update")
}
case gomatrixserverlib.MPresence:
if t.inboundPresenceEnabled {
if err := t.processPresence(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process presence update")
}
}
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}
}
}
// processPresence handles m.receipt events
func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
payload := types.Presence{}
if err := json.Unmarshal(e.Content, &payload); err != nil {
return err
}
for _, content := range payload.Push {
if _, serverName, err := gomatrixserverlib.SplitID('@', content.UserID); err != nil {
continue
} else if serverName == t.ourServerName {
continue
} else if serverName != t.Origin {
continue
}
presence, ok := syncTypes.PresenceFromString(content.Presence)
if !ok {
continue
}
if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
return err
}
}
return nil
}
// processReceiptEvent sends receipt events to JetStream
func (t *txnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string,
timestamp gomatrixserverlib.Timestamp,
eventIDs []string,
) error {
if _, serverName, err := gomatrixserverlib.SplitID('@', userID); err != nil {
return nil
} else if serverName == t.ourServerName {
return nil
} else if serverName != t.Origin {
return nil
}
// store every event
for _, eventID := range eventIDs {
if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil {
return fmt.Errorf("unable to set receipt event: %w", err)
}
}
return nil
}

View file

@ -7,6 +7,7 @@ import (
"testing"
"time"
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/test"
@ -184,22 +185,30 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, origin, s gomatr
return c.getMissingEvents(missing)
}
func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
t := &txnReq{
rsAPI: rsAPI,
keys: &test.NopJSONVerifier{},
federation: fedClient,
roomsMu: internal.NewMutexByRoom(),
}
func mustCreateTransaction(rsAPI api.FederationRoomserverAPI, pdus []json.RawMessage) *fedInternal.TxnReq {
t := fedInternal.NewTxnReq(
rsAPI,
nil,
"",
&test.NopJSONVerifier{},
internal.NewMutexByRoom(),
nil,
nil,
false,
pdus,
nil,
testOrigin,
gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano())),
testDestination)
t.PDUs = pdus
t.Origin = testOrigin
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano()))
t.Destination = testDestination
return t
return &t
}
func mustProcessTransaction(t *testing.T, txn *txnReq, pdusWithErrors []string) {
res, err := txn.processTransaction(context.Background())
func mustProcessTransaction(t *testing.T, txn *fedInternal.TxnReq, pdusWithErrors []string) {
res, err := txn.ProcessTransaction(context.Background())
if err != nil {
t.Errorf("txn.processTransaction returned an error: %v", err)
return
@ -262,7 +271,7 @@ func TestBasicTransaction(t *testing.T) {
pdus := []json.RawMessage{
testData[len(testData)-1], // a message event
}
txn := mustCreateTransaction(rsAPI, &txnFedClient{}, pdus)
txn := mustCreateTransaction(rsAPI, pdus)
mustProcessTransaction(t, txn, nil)
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []*gomatrixserverlib.HeaderedEvent{testEvents[len(testEvents)-1]})
}
@ -274,7 +283,7 @@ func TestTransactionFailAuthChecks(t *testing.T) {
pdus := []json.RawMessage{
testData[len(testData)-1], // a message event
}
txn := mustCreateTransaction(rsAPI, &txnFedClient{}, pdus)
txn := mustCreateTransaction(rsAPI, pdus)
mustProcessTransaction(t, txn, []string{})
// expect message to be sent to the roomserver
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []*gomatrixserverlib.HeaderedEvent{testEvents[len(testEvents)-1]})