diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 328cf9155..4e4fe7a29 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -157,5 +157,26 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc } } + // Clean up old consumers so that interest-based consumers do the + // right thing. + for stream, consumers := range map[string][]string{ + OutputClientData: {"SyncAPIClientAPIConsumer"}, + OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"}, + OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"}, + OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"}, + } { + streamName := cfg.Matrix.JetStream.Prefixed(stream) + for _, consumer := range consumers { + consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull" + consumerInfo, err := s.ConsumerInfo(streamName, consumerName) + if err != nil || consumerInfo == nil { + continue + } + if err = s.DeleteConsumer(streamName, consumerName); err != nil { + logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream) + } + } + } + return s, nc } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 40c1cd3d6..c28da4600 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -62,7 +62,7 @@ func NewOutputClientDataConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go index 3cd8d1b33..6bb0747f0 100644 --- a/syncapi/consumers/receipts.go +++ b/syncapi/consumers/receipts.go @@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"), db: store, notifier: notifier, stream: stream, @@ -71,7 +71,7 @@ func NewOutputReceiptEventConsumer( } } -// Start consuming from EDU api +// Start consuming receipts events. func (s *OutputReceiptEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, diff --git a/syncapi/consumers/sendtodevice.go b/syncapi/consumers/sendtodevice.go index 6edea9fd9..0b9153fcd 100644 --- a/syncapi/consumers/sendtodevice.go +++ b/syncapi/consumers/sendtodevice.go @@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"), db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, @@ -65,7 +65,7 @@ func NewOutputSendToDeviceEventConsumer( } } -// Start consuming from EDU api +// Start consuming send-to-device events. func (s *OutputSendToDeviceEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, diff --git a/syncapi/consumers/typing.go b/syncapi/consumers/typing.go index 87766c024..48e484ec5 100644 --- a/syncapi/consumers/typing.go +++ b/syncapi/consumers/typing.go @@ -54,14 +54,14 @@ func NewOutputTypingEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPITypingConsumer"), eduCache: eduCache, notifier: notifier, stream: stream, } } -// Start consuming from EDU api +// Start consuming typing events. func (s *OutputTypingEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,