diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index a81701fe6..5d1360e9c 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -125,18 +125,33 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. subjects = []string{name, name + ".>"} } - if info != nil && !reflect.DeepEqual(info.Config, stream) { + if info != nil { // If the stream config doesn't match what we expect, try to update // it. If that doesn't work then try to blow it away and we'll then // recreate it in the next section. - if info, err = s.UpdateStream(stream); err != nil { - logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) - // We failed to update the stream, this is a last attempt to get - // things working but may result in data loss. - if err = s.DeleteStream(name); err != nil { - logrus.WithError(err).Fatalf("Unable to delete stream %q", name) + // Each specific option that we set must be checked by hand, as if + // you DeepEqual the whole config struct, it will always show that + // there's a difference because the NATS Server will return defaults + // in the stream info. + switch { + case !reflect.DeepEqual(info.Config.Subjects, subjects): + fallthrough + case info.Config.Retention != stream.Retention: + fallthrough + case info.Config.Storage != stream.Storage: + fallthrough + case info.Config.MaxAge != stream.MaxAge: + // Try updating the stream first, as many things can be updated + // non-destructively. + if info, err = s.UpdateStream(stream); err != nil { + logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) + // We failed to update the stream, this is a last attempt to get + // things working but may result in data loss. + if err = s.DeleteStream(name); err != nil { + logrus.WithError(err).Fatalf("Unable to delete stream %q", name) + } + info = nil } - info = nil } } if info == nil {