Cleanup old consumers

This commit is contained in:
Till Faelligen 2023-12-01 08:52:34 +01:00
parent 3e113fd14f
commit 62c2e92ef9
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E

View file

@ -78,6 +78,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices))
for _, as := range s.cfg.Derived.ApplicationServices { for _, as := range s.cfg.Derived.ApplicationServices {
appsvc := as appsvc := as
state := &appserviceState{ state := &appserviceState{
@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error {
); err != nil { ); err != nil {
return fmt.Errorf("failed to create %q consumer: %w", token, err) return fmt.Errorf("failed to create %q consumer: %w", token, err)
} }
durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token))
}
// Cleanup any consumers still existing on the OutputRoomEvent stream
// to avoid messages not being deleted
for _, consumerName := range durableNames {
err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName)
if err != nil && err != nats.ErrConsumerNotFound {
return err
}
} }
return nil return nil
} }