Recreate the stream if the config is bad

This commit is contained in:
Neil Alexander 2022-03-22 14:18:44 +00:00
parent b49160f1f1
commit 4ebef3ec4f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 12 additions and 5 deletions

View file

@ -107,7 +107,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
return
}
logrus.Infof("Started stream for room %q", w.roomID)
logrus.Infof("Stream for room %q active", w.roomID)
w.subscription = sub
w.Act(nil, w.next)
}
@ -139,7 +139,7 @@ func (w *worker) next() {
}
defer w.Act(nil, w.next)
case context.DeadlineExceeded:
logrus.Infof("Stream for room %q idle, shutting down", w.roomID)
logrus.Infof("Stream for room %q inactive", w.roomID)
if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
}

View file

@ -80,7 +80,16 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info")
}
if info == nil {
if info != nil {
switch {
case info.Config.Retention != stream.Retention:
fallthrough
case info.Config.Storage != stream.Storage:
if err = s.DeleteStream(name); err != nil {
logrus.WithError(err).Fatal("Unable to recreate stream")
}
}
} else {
if len(stream.Subjects) == 0 {
stream.Subjects = []string{name}
}
@ -97,8 +106,6 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
namespaced.Name = name
if _, err = s.AddStream(&namespaced); err != nil {
logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
} else {
logrus.WithField("stream", name).Infof("Added stream")
}
}
}