Don't start federation queues if disabled

This commit is contained in:
Neil Alexander 2020-12-02 14:41:33 +00:00
parent a9f0477b90
commit b6dfdef80d
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 35 additions and 21 deletions

View file

@ -59,8 +59,8 @@ func NewInternalAPI(
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
federationSenderDB, cfg.Matrix.ServerName, federation, federationSenderDB, cfg.Matrix.DisableFederation,
rsAPI, stats, cfg.Matrix.ServerName, federation, rsAPI, stats,
&queue.SigningInfo{ &queue.SigningInfo{
KeyID: cfg.Matrix.KeyID, KeyID: cfg.Matrix.KeyID,
PrivateKey: cfg.Matrix.PrivateKey, PrivateKey: cfg.Matrix.PrivateKey,

View file

@ -34,6 +34,7 @@ import (
// matrix servers // matrix servers
type OutgoingQueues struct { type OutgoingQueues struct {
db storage.Database db storage.Database
disabled bool
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
@ -46,6 +47,7 @@ type OutgoingQueues struct {
// NewOutgoingQueues makes a new OutgoingQueues // NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues( func NewOutgoingQueues(
db storage.Database, db storage.Database,
disabled bool,
origin gomatrixserverlib.ServerName, origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient, client *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
@ -53,6 +55,7 @@ func NewOutgoingQueues(
signing *SigningInfo, signing *SigningInfo,
) *OutgoingQueues { ) *OutgoingQueues {
queues := &OutgoingQueues{ queues := &OutgoingQueues{
disabled: disabled,
db: db, db: db,
rsAPI: rsAPI, rsAPI: rsAPI,
origin: origin, origin: origin,
@ -62,6 +65,7 @@ func NewOutgoingQueues(
queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
} }
// Look up which servers we have pending items for and then rehydrate those queues. // Look up which servers we have pending items for and then rehydrate those queues.
if !disabled {
time.AfterFunc(time.Second*5, func() { time.AfterFunc(time.Second*5, func() {
serverNames := map[gomatrixserverlib.ServerName]struct{}{} serverNames := map[gomatrixserverlib.ServerName]struct{}{}
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
@ -84,6 +88,7 @@ func NewOutgoingQueues(
} }
} }
}) })
}
return queues return queues
} }
@ -122,6 +127,9 @@ func (oqs *OutgoingQueues) SendEvent(
ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName, ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName,
destinations []gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName,
) error { ) error {
if oqs.disabled {
return fmt.Errorf("federation is disabled")
}
if origin != oqs.origin { if origin != oqs.origin {
// TODO: Support virtual hosting; gh issue #577. // TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf( return fmt.Errorf(
@ -181,6 +189,9 @@ func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
destinations []gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName,
) error { ) error {
if oqs.disabled {
return fmt.Errorf("federation is disabled")
}
if origin != oqs.origin { if origin != oqs.origin {
// TODO: Support virtual hosting; gh issue #577. // TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf( return fmt.Errorf(
@ -243,6 +254,9 @@ func (oqs *OutgoingQueues) SendEDU(
// RetryServer attempts to resend events to the given server if we had given up. // RetryServer attempts to resend events to the given server if we had given up.
func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
if oqs.disabled {
return
}
q := oqs.getQueue(srv) q := oqs.getQueue(srv)
if q == nil { if q == nil {
return return