Fix topic naming

This commit is contained in:
Neil Alexander 2021-11-03 12:05:25 +00:00
parent 6b835b83bf
commit e745a7663f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -68,13 +68,13 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
} }
for _, stream := range streams { for _, stream := range streams {
stream.Name = cfg.TopicFor(stream.Name) name := cfg.TopicFor(stream.Name)
info, err := s.StreamInfo(stream.Name) info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound { if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info") logrus.WithError(err).Fatal("Unable to get stream info")
} }
if info == nil { if info == nil {
stream.Subjects = []string{stream.Name} stream.Subjects = []string{name}
// If we're trying to keep everything in memory (e.g. unit tests) // If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy. // then overwrite the storage policy.
if cfg.InMemory { if cfg.InMemory {
@ -82,7 +82,7 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
} }
if _, err = s.AddStream(stream); err != nil { if _, err = s.AddStream(stream); err != nil {
logrus.WithError(err).WithField("stream", stream.Name).Fatal("Unable to add stream") logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
} }
} }
} }