diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 28f339a3b..adaeb873d 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -8,13 +8,13 @@ import ( "sync" "time" - "github.com/getsentry/sentry-go" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) @@ -36,7 +36,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS defer natsLock.Unlock() // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { - return setupNATS(process, cfg, nil) + return setupNATS(cfg, nil) } if s.Server == nil { var err error @@ -72,18 +72,29 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS if err != nil { logrus.Fatalln("Failed to create NATS client") } - return setupNATS(process, cfg, nc) + return setupNATS(cfg, nc) } -func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { +func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { + var s nats.JetStreamContext + var err error if nc == nil { - var err error opts := []natsclient.Option{ natsclient.DisconnectErrHandler(func(c *natsclient.Conn, err error) { logrus.WithError(err).Error("nats connection: disconnected") }), natsclient.ReconnectHandler(func(_ *natsclient.Conn) { logrus.Info("nats connection: client reconnected") + for _, stream := range []*nats.StreamConfig{ + streams[6], + streams[10], + } { + err = configureStream(stream, cfg, s) + if err != nil { + logrus.WithError(err).WithField("stream", stream.Name).Error("unable to configure a stream") + } + + } }), natsclient.ClosedHandler(func(_ *natsclient.Conn) { logrus.Info("nats connection: client closed") @@ -101,89 +112,16 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc } } - s, err := nc.JetStream() + s, err = nc.JetStream() if err != nil { logrus.WithError(err).Panic("Unable to get JetStream context") return nil, nil } for _, stream := range streams { // streams are defined in streams.go - name := cfg.Prefixed(stream.Name) - info, err := s.StreamInfo(name) - if err != nil && err != natsclient.ErrStreamNotFound { - logrus.WithError(err).Fatal("Unable to get stream info") - } - subjects := stream.Subjects - if len(subjects) == 0 { - // By default we want each stream to listen for the subjects - // that are either an exact match for the stream name, or where - // the first part of the subject is the stream name. ">" is a - // wildcard in NATS for one or more subject tokens. In the case - // that the stream is called "Foo", this will match any message - // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. - subjects = []string{name, name + ".>"} - } - if info != nil { - switch { - case !reflect.DeepEqual(info.Config.Subjects, subjects): - fallthrough - case info.Config.Retention != stream.Retention: - fallthrough - case info.Config.Storage != stream.Storage: - if err = s.DeleteStream(name); err != nil { - logrus.WithError(err).Fatal("Unable to delete stream") - } - info = nil - } - } - if info == nil { - // If we're trying to keep everything in memory (e.g. unit tests) - // then overwrite the storage policy. - if cfg.InMemory { - stream.Storage = natsclient.MemoryStorage - } - - // Namespace the streams without modifying the original streams - // array, otherwise we end up with namespaces on namespaces. - namespaced := *stream - namespaced.Name = name - namespaced.Subjects = subjects - if _, err = s.AddStream(&namespaced); err != nil { - logger := logrus.WithError(err).WithFields(logrus.Fields{ - "stream": namespaced.Name, - "subjects": namespaced.Subjects, - }) - - // If the stream was supposed to be in-memory to begin with - // then an error here is fatal so we'll give up. - if namespaced.Storage == natsclient.MemoryStorage { - logger.WithError(err).Fatal("Unable to add in-memory stream") - } - - // The stream was supposed to be on disk. Let's try starting - // Dendrite with the stream in-memory instead. That'll mean that - // we can't recover anything that was queued on the disk but we - // will still be able to start and run hopefully in the meantime. - logger.WithError(err).Error("Unable to add stream") - sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err)) - - namespaced.Storage = natsclient.MemoryStorage - if _, err = s.AddStream(&namespaced); err != nil { - // We tried to add the stream in-memory instead but something - // went wrong. That's an unrecoverable situation so we will - // give up at this point. - logger.WithError(err).Fatal("Unable to add in-memory stream") - } - - if stream.Storage != namespaced.Storage { - // We've managed to add the stream in memory. What's on the - // disk will be left alone, but our ability to recover from a - // future crash will be limited. Yell about it. - err := fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory", namespaced.Name) - sentry.CaptureException(err) - process.Degraded(err) - } - } + err = configureStream(stream, cfg, s) + if err != nil { + logrus.WithError(err).WithField("stream", stream.Name).Fatal("unable to configure a stream") } } @@ -213,3 +151,52 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc return s, nc } + +func configureStream(stream *nats.StreamConfig, cfg *config.JetStream, s nats.JetStreamContext) error { + name := cfg.Prefixed(stream.Name) + info, err := s.StreamInfo(name) + if err != nil && err != natsclient.ErrStreamNotFound { + return fmt.Errorf("get stream info: %w", err) + } + subjects := stream.Subjects + if len(subjects) == 0 { + // By default we want each stream to listen for the subjects + // that are either an exact match for the stream name, or where + // the first part of the subject is the stream name. ">" is a + // wildcard in NATS for one or more subject tokens. In the case + // that the stream is called "Foo", this will match any message + // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. + subjects = []string{name, name + ".>"} + } + if info != nil { + switch { + case !reflect.DeepEqual(info.Config.Subjects, subjects): + fallthrough + case info.Config.Retention != stream.Retention: + fallthrough + case info.Config.Storage != stream.Storage: + if err = s.DeleteStream(name); err != nil { + return fmt.Errorf("delete stream: %w", err) + } + info = nil + } + } + if info == nil { + // If we're trying to keep everything in memory (e.g. unit tests) + // then overwrite the storage policy. + if cfg.InMemory { + stream.Storage = natsclient.MemoryStorage + } + + // Namespace the streams without modifying the original streams + // array, otherwise we end up with namespaces on namespaces. + namespaced := *stream + namespaced.Name = name + namespaced.Subjects = subjects + if _, err = s.AddStream(&namespaced); err != nil { + return fmt.Errorf("add stream: %w", err) + } + logrus.Infof("stream created: %s", stream.Name) + } + return nil +}