mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-04 04:33:10 -06:00
Rename durable consumer; Consumer cleanup
This commit is contained in:
parent
2952ce265f
commit
35e661a372
|
|
@ -157,5 +157,26 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up old consumers so that interest-based consumers do the
|
||||||
|
// right thing.
|
||||||
|
for stream, consumers := range map[string][]string{
|
||||||
|
OutputClientData: {"SyncAPIClientAPIConsumer"},
|
||||||
|
OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"},
|
||||||
|
OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"},
|
||||||
|
OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"},
|
||||||
|
} {
|
||||||
|
streamName := cfg.Matrix.JetStream.Prefixed(stream)
|
||||||
|
for _, consumer := range consumers {
|
||||||
|
consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull"
|
||||||
|
consumerInfo, err := s.ConsumerInfo(streamName, consumerName)
|
||||||
|
if err != nil || consumerInfo == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err = s.DeleteConsumer(streamName, consumerName); err != nil {
|
||||||
|
logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return s, nc
|
return s, nc
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ func NewOutputClientDataConsumer(
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
|
||||||
durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
|
||||||
db: store,
|
db: store,
|
||||||
notifier: notifier,
|
notifier: notifier,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer(
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
|
||||||
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
|
||||||
db: store,
|
db: store,
|
||||||
notifier: notifier,
|
notifier: notifier,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
|
|
@ -71,7 +71,7 @@ func NewOutputReceiptEventConsumer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from EDU api
|
// Start consuming receipts events.
|
||||||
func (s *OutputReceiptEventConsumer) Start() error {
|
func (s *OutputReceiptEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
||||||
|
|
|
||||||
|
|
@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
||||||
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
|
||||||
db: store,
|
db: store,
|
||||||
serverName: cfg.Matrix.ServerName,
|
serverName: cfg.Matrix.ServerName,
|
||||||
notifier: notifier,
|
notifier: notifier,
|
||||||
|
|
@ -65,7 +65,7 @@ func NewOutputSendToDeviceEventConsumer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from EDU api
|
// Start consuming send-to-device events.
|
||||||
func (s *OutputSendToDeviceEventConsumer) Start() error {
|
func (s *OutputSendToDeviceEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
||||||
|
|
|
||||||
|
|
@ -54,14 +54,14 @@ func NewOutputTypingEventConsumer(
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
||||||
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("SyncAPITypingConsumer"),
|
||||||
eduCache: eduCache,
|
eduCache: eduCache,
|
||||||
notifier: notifier,
|
notifier: notifier,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from EDU api
|
// Start consuming typing events.
|
||||||
func (s *OutputTypingEventConsumer) Start() error {
|
func (s *OutputTypingEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue