Fix federation API

This commit is contained in:
Neil Alexander 2022-11-10 11:42:20 +00:00
parent 1d28a10c03
commit 0013459677
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 34 additions and 22 deletions

View file

@ -120,15 +120,23 @@ func NewInternalAPI(
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
signingInfo := map[gomatrixserverlib.ServerName]*queue.SigningInfo{}
for _, serverName := range append(
[]gomatrixserverlib.ServerName{base.Cfg.Global.ServerName},
base.Cfg.Global.SecondaryServerNames...,
) {
signingInfo[serverName] = &queue.SigningInfo{
KeyID: cfg.Matrix.KeyID,
PrivateKey: cfg.Matrix.PrivateKey,
ServerName: serverName,
}
}
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext, federationDB, base.ProcessContext,
cfg.Matrix.DisableFederation, cfg.Matrix.DisableFederation,
cfg.Matrix.ServerName, federation, rsAPI, &stats, cfg.Matrix.ServerName, federation, rsAPI, &stats,
&queue.SigningInfo{ signingInfo,
KeyID: cfg.Matrix.KeyID,
PrivateKey: cfg.Matrix.PrivateKey,
ServerName: cfg.Matrix.ServerName,
},
) )
rsConsumer := consumers.NewOutputRoomEventConsumer( rsConsumer := consumers.NewOutputRoomEventConsumer(

View file

@ -50,7 +50,7 @@ type destinationQueue struct {
queues *OutgoingQueues queues *OutgoingQueues
db storage.Database db storage.Database
process *process.ProcessContext process *process.ProcessContext
signing *SigningInfo signing map[gomatrixserverlib.ServerName]*SigningInfo
rsAPI api.FederationRoomserverAPI rsAPI api.FederationRoomserverAPI
client fedapi.FederationClient // federation client client fedapi.FederationClient // federation client
origin gomatrixserverlib.ServerName // origin of requests origin gomatrixserverlib.ServerName // origin of requests

View file

@ -46,7 +46,7 @@ type OutgoingQueues struct {
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
client fedapi.FederationClient client fedapi.FederationClient
statistics *statistics.Statistics statistics *statistics.Statistics
signing *SigningInfo signing map[gomatrixserverlib.ServerName]*SigningInfo
queuesMutex sync.Mutex // protects the below queuesMutex sync.Mutex // protects the below
queues map[gomatrixserverlib.ServerName]*destinationQueue queues map[gomatrixserverlib.ServerName]*destinationQueue
} }
@ -91,7 +91,7 @@ func NewOutgoingQueues(
client fedapi.FederationClient, client fedapi.FederationClient,
rsAPI api.FederationRoomserverAPI, rsAPI api.FederationRoomserverAPI,
statistics *statistics.Statistics, statistics *statistics.Statistics,
signing *SigningInfo, signing map[gomatrixserverlib.ServerName]*SigningInfo,
) *OutgoingQueues { ) *OutgoingQueues {
queues := &OutgoingQueues{ queues := &OutgoingQueues{
disabled: disabled, disabled: disabled,
@ -199,11 +199,10 @@ func (oqs *OutgoingQueues) SendEvent(
log.Trace("Federation is disabled, not sending event") log.Trace("Federation is disabled, not sending event")
return nil return nil
} }
if origin != oqs.origin { if _, ok := oqs.signing[origin]; !ok {
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf( return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q", "sendevent: unexpected server to send as %q",
origin, oqs.origin, origin,
) )
} }
@ -214,7 +213,9 @@ func (oqs *OutgoingQueues) SendEvent(
destmap[d] = struct{}{} destmap[d] = struct{}{}
} }
delete(destmap, oqs.origin) delete(destmap, oqs.origin)
delete(destmap, oqs.signing.ServerName) for local := range oqs.signing {
delete(destmap, local)
}
// Check if any of the destinations are prohibited by server ACLs. // Check if any of the destinations are prohibited by server ACLs.
for destination := range destmap { for destination := range destmap {
@ -288,11 +289,10 @@ func (oqs *OutgoingQueues) SendEDU(
log.Trace("Federation is disabled, not sending EDU") log.Trace("Federation is disabled, not sending EDU")
return nil return nil
} }
if origin != oqs.origin { if _, ok := oqs.signing[origin]; !ok {
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf( return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q", "sendevent: unexpected server to send as %q",
origin, oqs.origin, origin,
) )
} }
@ -303,7 +303,9 @@ func (oqs *OutgoingQueues) SendEDU(
destmap[d] = struct{}{} destmap[d] = struct{}{}
} }
delete(destmap, oqs.origin) delete(destmap, oqs.origin)
delete(destmap, oqs.signing.ServerName) for local := range oqs.signing {
delete(destmap, local)
}
// There is absolutely no guarantee that the EDU will have a room_id // There is absolutely no guarantee that the EDU will have a room_id
// field, as it is not required by the spec. However, if it *does* // field, as it is not required by the spec. However, if it *does*

View file

@ -350,10 +350,12 @@ func testSetup(failuresUntilBlacklist uint32, shouldTxSucceed bool, t *testing.T
} }
rs := &stubFederationRoomServerAPI{} rs := &stubFederationRoomServerAPI{}
stats := statistics.NewStatistics(db, failuresUntilBlacklist) stats := statistics.NewStatistics(db, failuresUntilBlacklist)
signingInfo := &SigningInfo{ signingInfo := map[gomatrixserverlib.ServerName]*SigningInfo{
"localhost": {
KeyID: "ed21019:auto", KeyID: "ed21019:auto",
PrivateKey: test.PrivateKeyA, PrivateKey: test.PrivateKeyA,
ServerName: "localhost", ServerName: "localhost",
},
} }
queues := NewOutgoingQueues(db, processContext, false, "localhost", fc, rs, &stats, signingInfo) queues := NewOutgoingQueues(db, processContext, false, "localhost", fc, rs, &stats, signingInfo)