diff --git a/go.mod b/go.mod index 82e066d41..4bfe679ba 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/Masterminds/semver/v3 v3.1.1 - github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537 + github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86 github.com/Shopify/sarama v1.29.0 github.com/codeclysm/extract v2.2.0+incompatible github.com/containerd/containerd v1.5.2 // indirect diff --git a/go.sum b/go.sum index e5ca53521..1996c51b8 100644 --- a/go.sum +++ b/go.sum @@ -86,8 +86,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEkc1jNH01qxCWA= github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30= -github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537 h1:j/jlzVQRvUvNANJlz5Puac0Avc6Xe+rhvy2Se/f+Fwo= -github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc= +github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86 h1:ZFbfRbhZDohUiouv361CC0XWTESwUlVicz/zgGSO964= +github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go index 5f2d40552..e3ef50c5f 100644 --- a/setup/kafka/kafka.go +++ b/setup/kafka/kafka.go @@ -1,6 +1,7 @@ package kafka import ( + "strings" "time" js "github.com/S7evinK/saramajetstream" @@ -66,7 +67,7 @@ func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { logrus.WithField("servers", cfg.Addresses).Debug("connecting to nats") - nc, err := nats.Connect(cfg.Addresses[0]) + nc, err := nats.Connect(strings.Join(cfg.Addresses, ",")) if err != nil { logrus.WithError(err).Panic("failed to connect to nats") return nil, nil @@ -91,9 +92,9 @@ func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { // Typing events can be removed from the stream, as they are only relevant for a short time if topic == config.TopicOutputTypingEvent { - maxLifeTime = time.Second * 30 + maxLifeTime = time.Second * 60 } - _, _ = s.AddStream(&nats.StreamConfig{ + _, err = s.AddStream(&nats.StreamConfig{ Name: sn, Subjects: []string{topic}, MaxBytes: int64(*cfg.MaxMessageBytes), @@ -101,10 +102,13 @@ func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { MaxAge: maxLifeTime, Duplicates: maxLifeTime / 2, }) + if err != nil { + logrus.WithError(err).WithField("stream", sn).Fatal("unable to add nats stream") + } } } - consumer := js.NewJetStreamConsumer(s, cfg.TopicPrefix) - producer := js.NewJetStreamProducer(s, cfg.TopicPrefix) + consumer := js.NewJetStreamConsumer(nc, s, cfg.TopicPrefix) + producer := js.NewJetStreamProducer(nc, s, cfg.TopicPrefix) return consumer, producer }