From 62c2e92ef940426f65251c33f5a013a246e201e3 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Fri, 1 Dec 2023 08:52:34 +0100 Subject: [PATCH] Cleanup old consumers --- appservice/consumers/roomserver.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index f677fe560..bc7ae5ea6 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -78,6 +78,7 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { + durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices)) for _, as := range s.cfg.Derived.ApplicationServices { appsvc := as state := &appserviceState{ @@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error { ); err != nil { return fmt.Errorf("failed to create %q consumer: %w", token, err) } + durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token)) + } + // Cleanup any consumers still existing on the OutputRoomEvent stream + // to avoid messages not being deleted + for _, consumerName := range durableNames { + err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName) + if err != nil && err != nats.ErrConsumerNotFound { + return err + } } return nil }