Graceful shutdowns (#1734)

* Initial graceful stop

* Fix dendritejs

* Use process context for outbound federation requests in destination queues

* Reduce logging

* Fix log level
This commit is contained in:
Neil Alexander 2021-01-26 12:56:20 +00:00 committed by GitHub
parent 64fb6de6d4
commit 9f443317bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 187 additions and 24 deletions

View file

@ -89,7 +89,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, consumer, appserviceDB,
base.ProcessContext, base.Cfg, consumer, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/Shopify/sarama"
@ -41,6 +42,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
appserviceDB storage.Database,
@ -48,6 +50,7 @@ func NewOutputRoomEventConsumer(
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "appservice/roomserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
Consumer: kafkaConsumer,

View file

@ -166,6 +166,7 @@ func (m *DendriteMonolith) Start() {
),
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,

View file

@ -192,6 +192,7 @@ func main() {
ExtPublicRoomsProvider: provider,
}
monolith.AddAllPublicRoutes(
base.Base.ProcessContext,
base.Base.PublicClientAPIMux,
base.Base.PublicFederationAPIMux,
base.Base.PublicKeyAPIMux,
@ -234,5 +235,5 @@ func main() {
}
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
select {}
base.Base.WaitForShutdown()
}

View file

@ -150,6 +150,7 @@ func main() {
),
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
@ -200,5 +201,6 @@ func main() {
}
}()
select {}
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
base.WaitForShutdown()
}

View file

@ -144,6 +144,7 @@ func main() {
KeyAPI: keyAPI,
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
@ -176,5 +177,5 @@ func main() {
}
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
select {}
base.WaitForShutdown()
}

View file

@ -74,5 +74,6 @@ func main() {
base := setup.NewBaseDendrite(cfg, component, false) // TODO
defer base.Close() // nolint: errcheck
start(base, cfg)
go start(base, cfg)
base.WaitForShutdown()
}

View file

@ -27,6 +27,7 @@ func SyncAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux, userAPI, rsAPI,
base.KeyServerHTTPClient(),
federation, &cfg.SyncAPI,

View file

@ -231,6 +231,7 @@ func main() {
ExtPublicRoomsProvider: p2pPublicRoomProvider,
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
@ -44,6 +45,7 @@ type OutputEDUConsumer struct {
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputEDUConsumer(
process *process.ProcessContext,
cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
@ -51,18 +53,21 @@ func NewOutputEDUConsumer(
) *OutputEDUConsumer {
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "eduserver/typing",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "eduserver/sendtodevice",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
receiptConsumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "eduserver/receipt",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
Consumer: kafkaConsumer,

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@ -41,6 +42,7 @@ type KeyChangeConsumer struct {
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
process *process.ProcessContext,
cfg *config.KeyServer,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
@ -49,6 +51,7 @@ func NewKeyChangeConsumer(
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "federationsender/keychange",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
Consumer: kafkaConsumer,

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@ -41,6 +42,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
@ -48,6 +50,7 @@ func NewOutputRoomEventConsumer(
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "federationsender/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,

View file

@ -59,7 +59,8 @@ func NewInternalAPI(
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
queues := queue.NewOutgoingQueues(
federationSenderDB, cfg.Matrix.DisableFederation,
federationSenderDB, base.ProcessContext,
cfg.Matrix.DisableFederation,
cfg.Matrix.ServerName, federation, rsAPI, stats,
&queue.SigningInfo{
KeyID: cfg.Matrix.KeyID,
@ -69,7 +70,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, queues,
base.ProcessContext, cfg, consumer, queues,
federationSenderDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@ -77,13 +78,13 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
cfg, consumer, queues, federationSenderDB,
base.ProcessContext, cfg, consumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
&base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@ -46,6 +47,7 @@ const (
// at a time.
type destinationQueue struct {
db storage.Database
process *process.ProcessContext
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
@ -411,7 +413,7 @@ func (oq *destinationQueue) nextTransaction(
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
defer cancel()
_, err := oq.client.SendTransaction(ctx, t)
switch err.(type) {

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
@ -36,6 +37,7 @@ import (
// matrix servers
type OutgoingQueues struct {
db storage.Database
process *process.ProcessContext
disabled bool
rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName
@ -80,6 +82,7 @@ var destinationQueueBackingOff = prometheus.NewGauge(
// NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(
db storage.Database,
process *process.ProcessContext,
disabled bool,
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
@ -89,6 +92,7 @@ func NewOutgoingQueues(
) *OutgoingQueues {
queues := &OutgoingQueues{
disabled: disabled,
process: process,
db: db,
rsAPI: rsAPI,
origin: origin,
@ -151,6 +155,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destinationQueueTotal.Inc()
oq = &destinationQueue{
db: oqs.db,
process: oqs.process,
rsAPI: oqs.rsAPI,
origin: oqs.origin,
destination: destination,

View file

@ -20,6 +20,8 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"
)
// A PartitionStorer has the storage APIs needed by the consumer.
@ -33,6 +35,9 @@ type PartitionStorer interface {
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
// The parent context for the listener, stop consuming when this context is done
Process *process.ProcessContext
// The component name
ComponentName string
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
@ -100,6 +105,15 @@ func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) {
}
for _, pc := range partitionConsumers {
go c.consumePartition(pc)
if c.Process != nil {
c.Process.ComponentStarted()
go func(pc sarama.PartitionConsumer) {
<-c.Process.WaitForShutdown()
_ = pc.Close()
c.Process.ComponentFinished()
logrus.Infof("Stopped consumer for %q topic %q", c.ComponentName, c.Topic)
}(pc)
}
}
return storedOffsets, nil

View file

@ -15,22 +15,28 @@
package setup
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/gorilla/mux"
@ -61,6 +67,7 @@ import (
// should only be used during start up.
// Must be closed when shutting down.
type BaseDendrite struct {
*process.ProcessContext
componentName string
tracerCloser io.Closer
PublicClientAPIMux *mux.Router
@ -161,7 +168,9 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
// We need to be careful with media APIs if they read from a filesystem to make sure they
// are not inadvertently reading paths without cleaning, else this could introduce a
// directory traversal attack e.g /../../../etc/passwd
return &BaseDendrite{
ProcessContext: process.NewProcessContext(),
componentName: componentName,
UseHTTPAPIs: useHTTPAPIs,
tracerCloser: closer,
@ -354,14 +363,26 @@ func (b *BaseDendrite) SetupAndServeHTTP(
if internalAddr != NoListener && internalAddr != externalAddr {
go func() {
var internalShutdown atomic.Bool // RegisterOnShutdown can be called more than once
logrus.Infof("Starting internal %s listener on %s", b.componentName, internalServ.Addr)
b.ProcessContext.ComponentStarted()
internalServ.RegisterOnShutdown(func() {
if internalShutdown.CAS(false, true) {
b.ProcessContext.ComponentFinished()
logrus.Infof("Stopped internal HTTP listener")
}
})
if certFile != nil && keyFile != nil {
if err := internalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil {
logrus.WithError(err).Fatal("failed to serve HTTPS")
if err != http.ErrServerClosed {
logrus.WithError(err).Fatal("failed to serve HTTPS")
}
}
} else {
if err := internalServ.ListenAndServe(); err != nil {
logrus.WithError(err).Fatal("failed to serve HTTP")
if err != http.ErrServerClosed {
logrus.WithError(err).Fatal("failed to serve HTTP")
}
}
}
logrus.Infof("Stopped internal %s listener on %s", b.componentName, internalServ.Addr)
@ -370,19 +391,52 @@ func (b *BaseDendrite) SetupAndServeHTTP(
if externalAddr != NoListener {
go func() {
var externalShutdown atomic.Bool // RegisterOnShutdown can be called more than once
logrus.Infof("Starting external %s listener on %s", b.componentName, externalServ.Addr)
b.ProcessContext.ComponentStarted()
externalServ.RegisterOnShutdown(func() {
if externalShutdown.CAS(false, true) {
b.ProcessContext.ComponentFinished()
logrus.Infof("Stopped external HTTP listener")
}
})
if certFile != nil && keyFile != nil {
if err := externalServ.ListenAndServeTLS(*certFile, *keyFile); err != nil {
logrus.WithError(err).Fatal("failed to serve HTTPS")
if err != http.ErrServerClosed {
logrus.WithError(err).Fatal("failed to serve HTTPS")
}
}
} else {
if err := externalServ.ListenAndServe(); err != nil {
logrus.WithError(err).Fatal("failed to serve HTTP")
if err != http.ErrServerClosed {
logrus.WithError(err).Fatal("failed to serve HTTP")
}
}
}
logrus.Infof("Stopped external %s listener on %s", b.componentName, externalServ.Addr)
}()
}
select {}
<-b.ProcessContext.WaitForShutdown()
ctx, cancel := context.WithCancel(context.Background())
cancel()
_ = internalServ.Shutdown(ctx)
_ = externalServ.Shutdown(ctx)
logrus.Infof("Stopped HTTP listeners")
}
func (b *BaseDendrite) WaitForShutdown() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
signal.Reset(syscall.SIGINT, syscall.SIGTERM)
logrus.Warnf("Shutdown signal received")
b.ProcessContext.ShutdownDendrite()
b.ProcessContext.WaitForComponentsToFinish()
logrus.Warnf("Dendrite is exiting now")
}

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/mediaapi"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
serverKeyAPI "github.com/matrix-org/dendrite/signingkeyserver/api"
"github.com/matrix-org/dendrite/syncapi"
userapi "github.com/matrix-org/dendrite/userapi/api"
@ -56,7 +57,7 @@ type Monolith struct {
}
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, mediaMux *mux.Router) {
clientapi.AddPublicRoutes(
csMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI,
@ -71,7 +72,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
csMux, m.UserAPI, m.RoomserverAPI,
process, csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}

45
setup/process/process.go Normal file
View file

@ -0,0 +1,45 @@
package process
import (
"context"
"sync"
)
type ProcessContext struct {
wg *sync.WaitGroup // used to wait for components to shutdown
ctx context.Context // cancelled when Stop is called
shutdown context.CancelFunc // shut down Dendrite
}
func NewProcessContext() *ProcessContext {
ctx, shutdown := context.WithCancel(context.Background())
return &ProcessContext{
ctx: ctx,
shutdown: shutdown,
wg: &sync.WaitGroup{},
}
}
func (b *ProcessContext) Context() context.Context {
return context.WithValue(b.ctx, "scope", "process") // nolint:staticcheck
}
func (b *ProcessContext) ComponentStarted() {
b.wg.Add(1)
}
func (b *ProcessContext) ComponentFinished() {
b.wg.Done()
}
func (b *ProcessContext) ShutdownDendrite() {
b.shutdown()
}
func (b *ProcessContext) WaitForShutdown() <-chan struct{} {
return b.ctx.Done()
}
func (b *ProcessContext) WaitForComponentsToFinish() {
b.wg.Wait()
}

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@ -38,14 +39,15 @@ type OutputClientDataConsumer struct {
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/clientapi",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
Consumer: kafkaConsumer,

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@ -39,6 +40,7 @@ type OutputReceiptEventConsumer struct {
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputReceiptEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@ -47,6 +49,7 @@ func NewOutputReceiptEventConsumer(
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/receipt",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
Consumer: kafkaConsumer,

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@ -42,6 +43,7 @@ type OutputSendToDeviceEventConsumer struct {
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputSendToDeviceEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@ -50,6 +52,7 @@ func NewOutputSendToDeviceEventConsumer(
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@ -39,6 +40,7 @@ type OutputTypingEventConsumer struct {
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputTypingEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@ -48,6 +50,7 @@ func NewOutputTypingEventConsumer(
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@ -46,6 +47,7 @@ type OutputKeyChangeEventConsumer struct {
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputKeyChangeEventConsumer(
process *process.ProcessContext,
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
@ -57,6 +59,7 @@ func NewOutputKeyChangeEventConsumer(
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/keychange",
Topic: topic,
Consumer: kafkaConsumer,

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@ -43,6 +44,7 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
@ -53,6 +55,7 @@ func NewOutputRoomEventConsumer(
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/kafka"
"github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@ -39,6 +40,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
// component.
func AddPublicRoutes(
process *process.ProcessContext,
router *mux.Router,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
@ -63,7 +65,7 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
@ -71,7 +73,7 @@ func AddPublicRoutes(
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
@ -79,28 +81,28 @@ func AddPublicRoutes(
}
clientConsumer := consumers.NewOutputClientDataConsumer(
cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
process, cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
process, cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
process, cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")