From 4ebef3ec4f4a5990bb2238b5260aa193b6202834 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 22 Mar 2022 14:18:44 +0000 Subject: [PATCH] Recreate the stream if the config is bad --- roomserver/internal/input/input.go | 4 ++-- setup/jetstream/nats.go | 13 ++++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index beae0f80d..66548358b 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -107,7 +107,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { return } - logrus.Infof("Started stream for room %q", w.roomID) + logrus.Infof("Stream for room %q active", w.roomID) w.subscription = sub w.Act(nil, w.next) } @@ -139,7 +139,7 @@ func (w *worker) next() { } defer w.Act(nil, w.next) case context.DeadlineExceeded: - logrus.Infof("Stream for room %q idle, shutting down", w.roomID) + logrus.Infof("Stream for room %q inactive", w.roomID) if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 9afdf490e..3e373530a 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -80,7 +80,16 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream if err != nil && err != natsclient.ErrStreamNotFound { logrus.WithError(err).Fatal("Unable to get stream info") } - if info == nil { + if info != nil { + switch { + case info.Config.Retention != stream.Retention: + fallthrough + case info.Config.Storage != stream.Storage: + if err = s.DeleteStream(name); err != nil { + logrus.WithError(err).Fatal("Unable to recreate stream") + } + } + } else { if len(stream.Subjects) == 0 { stream.Subjects = []string{name} } @@ -97,8 +106,6 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream namespaced.Name = name if _, err = s.AddStream(&namespaced); err != nil { logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") - } else { - logrus.WithField("stream", name).Infof("Added stream") } } }