mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-26 08:13:09 -06:00
Use process context for outbound federation requests in destination queues
This commit is contained in:
parent
35297bb2be
commit
7c6238674d
|
|
@ -74,5 +74,6 @@ func main() {
|
||||||
base := setup.NewBaseDendrite(cfg, component, false) // TODO
|
base := setup.NewBaseDendrite(cfg, component, false) // TODO
|
||||||
defer base.Close() // nolint: errcheck
|
defer base.Close() // nolint: errcheck
|
||||||
|
|
||||||
start(base, cfg)
|
go start(base, cfg)
|
||||||
|
base.WaitForShutdown()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,8 @@ func NewInternalAPI(
|
||||||
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||||
|
|
||||||
queues := queue.NewOutgoingQueues(
|
queues := queue.NewOutgoingQueues(
|
||||||
federationSenderDB, cfg.Matrix.DisableFederation,
|
federationSenderDB, base.ProcessContext,
|
||||||
|
cfg.Matrix.DisableFederation,
|
||||||
cfg.Matrix.ServerName, federation, rsAPI, stats,
|
cfg.Matrix.ServerName, federation, rsAPI, stats,
|
||||||
&queue.SigningInfo{
|
&queue.SigningInfo{
|
||||||
KeyID: cfg.Matrix.KeyID,
|
KeyID: cfg.Matrix.KeyID,
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -46,6 +47,7 @@ const (
|
||||||
// at a time.
|
// at a time.
|
||||||
type destinationQueue struct {
|
type destinationQueue struct {
|
||||||
db storage.Database
|
db storage.Database
|
||||||
|
process *process.ProcessContext
|
||||||
signing *SigningInfo
|
signing *SigningInfo
|
||||||
rsAPI api.RoomserverInternalAPI
|
rsAPI api.RoomserverInternalAPI
|
||||||
client *gomatrixserverlib.FederationClient // federation client
|
client *gomatrixserverlib.FederationClient // federation client
|
||||||
|
|
@ -411,7 +413,7 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
// TODO: we should check for 500-ish fails vs 400-ish here,
|
// TODO: we should check for 500-ish fails vs 400-ish here,
|
||||||
// since we shouldn't queue things indefinitely in response
|
// since we shouldn't queue things indefinitely in response
|
||||||
// to a 400-ish error
|
// to a 400-ish error
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err := oq.client.SendTransaction(ctx, t)
|
_, err := oq.client.SendTransaction(ctx, t)
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -36,6 +37,7 @@ import (
|
||||||
// matrix servers
|
// matrix servers
|
||||||
type OutgoingQueues struct {
|
type OutgoingQueues struct {
|
||||||
db storage.Database
|
db storage.Database
|
||||||
|
process *process.ProcessContext
|
||||||
disabled bool
|
disabled bool
|
||||||
rsAPI api.RoomserverInternalAPI
|
rsAPI api.RoomserverInternalAPI
|
||||||
origin gomatrixserverlib.ServerName
|
origin gomatrixserverlib.ServerName
|
||||||
|
|
@ -80,6 +82,7 @@ var destinationQueueBackingOff = prometheus.NewGauge(
|
||||||
// NewOutgoingQueues makes a new OutgoingQueues
|
// NewOutgoingQueues makes a new OutgoingQueues
|
||||||
func NewOutgoingQueues(
|
func NewOutgoingQueues(
|
||||||
db storage.Database,
|
db storage.Database,
|
||||||
|
process *process.ProcessContext,
|
||||||
disabled bool,
|
disabled bool,
|
||||||
origin gomatrixserverlib.ServerName,
|
origin gomatrixserverlib.ServerName,
|
||||||
client *gomatrixserverlib.FederationClient,
|
client *gomatrixserverlib.FederationClient,
|
||||||
|
|
@ -89,6 +92,7 @@ func NewOutgoingQueues(
|
||||||
) *OutgoingQueues {
|
) *OutgoingQueues {
|
||||||
queues := &OutgoingQueues{
|
queues := &OutgoingQueues{
|
||||||
disabled: disabled,
|
disabled: disabled,
|
||||||
|
process: process,
|
||||||
db: db,
|
db: db,
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
origin: origin,
|
origin: origin,
|
||||||
|
|
@ -151,6 +155,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
||||||
destinationQueueTotal.Inc()
|
destinationQueueTotal.Inc()
|
||||||
oq = &destinationQueue{
|
oq = &destinationQueue{
|
||||||
db: oqs.db,
|
db: oqs.db,
|
||||||
|
process: oqs.process,
|
||||||
rsAPI: oqs.rsAPI,
|
rsAPI: oqs.rsAPI,
|
||||||
origin: oqs.origin,
|
origin: oqs.origin,
|
||||||
destination: destination,
|
destination: destination,
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,10 @@ func NewProcessContext() *ProcessContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *ProcessContext) Context() context.Context {
|
||||||
|
return context.WithValue(b.ctx, "scope", "process") // nolint:staticcheck
|
||||||
|
}
|
||||||
|
|
||||||
func (b *ProcessContext) ComponentStarted() {
|
func (b *ProcessContext) ComponentStarted() {
|
||||||
b.wg.Add(1)
|
b.wg.Add(1)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue