diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 78791140e..7b75cb372 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -59,8 +59,8 @@ func NewInternalAPI( consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) queues := queue.NewOutgoingQueues( - federationSenderDB, cfg.Matrix.ServerName, federation, - rsAPI, stats, + federationSenderDB, cfg.Matrix.DisableFederation, + cfg.Matrix.ServerName, federation, rsAPI, stats, &queue.SigningInfo{ KeyID: cfg.Matrix.KeyID, PrivateKey: cfg.Matrix.PrivateKey, diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 04cb57e70..95d37b2b8 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -34,6 +34,7 @@ import ( // matrix servers type OutgoingQueues struct { db storage.Database + disabled bool rsAPI api.RoomserverInternalAPI origin gomatrixserverlib.ServerName client *gomatrixserverlib.FederationClient @@ -46,6 +47,7 @@ type OutgoingQueues struct { // NewOutgoingQueues makes a new OutgoingQueues func NewOutgoingQueues( db storage.Database, + disabled bool, origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient, rsAPI api.RoomserverInternalAPI, @@ -53,6 +55,7 @@ func NewOutgoingQueues( signing *SigningInfo, ) *OutgoingQueues { queues := &OutgoingQueues{ + disabled: disabled, db: db, rsAPI: rsAPI, origin: origin, @@ -62,28 +65,30 @@ func NewOutgoingQueues( queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } // Look up which servers we have pending items for and then rehydrate those queues. - time.AfterFunc(time.Second*5, func() { - serverNames := map[gomatrixserverlib.ServerName]struct{}{} - if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} + if !disabled { + time.AfterFunc(time.Second*5, func() { + serverNames := map[gomatrixserverlib.ServerName]struct{}{} + if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} + } + } else { + log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") } - } else { - log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") - } - if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} + if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} + } + } else { + log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") } - } else { - log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") - } - for serverName := range serverNames { - if !queues.getQueue(serverName).statistics.Blacklisted() { - queues.getQueue(serverName).wakeQueueIfNeeded() + for serverName := range serverNames { + if !queues.getQueue(serverName).statistics.Blacklisted() { + queues.getQueue(serverName).wakeQueueIfNeeded() + } } - } - }) + }) + } return queues } @@ -122,6 +127,9 @@ func (oqs *OutgoingQueues) SendEvent( ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName, ) error { + if oqs.disabled { + return fmt.Errorf("federation is disabled") + } if origin != oqs.origin { // TODO: Support virtual hosting; gh issue #577. return fmt.Errorf( @@ -181,6 +189,9 @@ func (oqs *OutgoingQueues) SendEDU( e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName, ) error { + if oqs.disabled { + return fmt.Errorf("federation is disabled") + } if origin != oqs.origin { // TODO: Support virtual hosting; gh issue #577. return fmt.Errorf( @@ -243,6 +254,9 @@ func (oqs *OutgoingQueues) SendEDU( // RetryServer attempts to resend events to the given server if we had given up. func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { + if oqs.disabled { + return + } q := oqs.getQueue(srv) if q == nil { return