Merge branch 'add-nats-support' of github.com:s7evink/dendrite into add-nats-support
This commit is contained in:
commit
913e4e651f
2
go.mod
2
go.mod
|
@ -4,7 +4,7 @@ require (
|
||||||
github.com/DATA-DOG/go-sqlmock v1.5.0
|
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||||
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
|
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
|
||||||
github.com/Masterminds/semver/v3 v3.1.1
|
github.com/Masterminds/semver/v3 v3.1.1
|
||||||
github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537
|
github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86
|
||||||
github.com/Shopify/sarama v1.29.0
|
github.com/Shopify/sarama v1.29.0
|
||||||
github.com/codeclysm/extract v2.2.0+incompatible
|
github.com/codeclysm/extract v2.2.0+incompatible
|
||||||
github.com/containerd/containerd v1.5.2 // indirect
|
github.com/containerd/containerd v1.5.2 // indirect
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -86,8 +86,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
|
||||||
github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
|
github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
|
||||||
github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEkc1jNH01qxCWA=
|
github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEkc1jNH01qxCWA=
|
||||||
github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30=
|
github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30=
|
||||||
github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537 h1:j/jlzVQRvUvNANJlz5Puac0Avc6Xe+rhvy2Se/f+Fwo=
|
github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86 h1:ZFbfRbhZDohUiouv361CC0XWTESwUlVicz/zgGSO964=
|
||||||
github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc=
|
github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc=
|
||||||
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
|
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
|
||||||
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ=
|
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ=
|
||||||
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
|
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
js "github.com/S7evinK/saramajetstream"
|
js "github.com/S7evinK/saramajetstream"
|
||||||
|
@ -66,7 +67,7 @@ func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
|
|
||||||
func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
logrus.WithField("servers", cfg.Addresses).Debug("connecting to nats")
|
logrus.WithField("servers", cfg.Addresses).Debug("connecting to nats")
|
||||||
nc, err := nats.Connect(cfg.Addresses[0])
|
nc, err := nats.Connect(strings.Join(cfg.Addresses, ","))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to connect to nats")
|
logrus.WithError(err).Panic("failed to connect to nats")
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -91,9 +92,9 @@ func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
|
|
||||||
// Typing events can be removed from the stream, as they are only relevant for a short time
|
// Typing events can be removed from the stream, as they are only relevant for a short time
|
||||||
if topic == config.TopicOutputTypingEvent {
|
if topic == config.TopicOutputTypingEvent {
|
||||||
maxLifeTime = time.Second * 30
|
maxLifeTime = time.Second * 60
|
||||||
}
|
}
|
||||||
_, _ = s.AddStream(&nats.StreamConfig{
|
_, err = s.AddStream(&nats.StreamConfig{
|
||||||
Name: sn,
|
Name: sn,
|
||||||
Subjects: []string{topic},
|
Subjects: []string{topic},
|
||||||
MaxBytes: int64(*cfg.MaxMessageBytes),
|
MaxBytes: int64(*cfg.MaxMessageBytes),
|
||||||
|
@ -101,10 +102,13 @@ func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
MaxAge: maxLifeTime,
|
MaxAge: maxLifeTime,
|
||||||
Duplicates: maxLifeTime / 2,
|
Duplicates: maxLifeTime / 2,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).WithField("stream", sn).Fatal("unable to add nats stream")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer := js.NewJetStreamConsumer(s, cfg.TopicPrefix)
|
consumer := js.NewJetStreamConsumer(nc, s, cfg.TopicPrefix)
|
||||||
producer := js.NewJetStreamProducer(s, cfg.TopicPrefix)
|
producer := js.NewJetStreamProducer(nc, s, cfg.TopicPrefix)
|
||||||
return consumer, producer
|
return consumer, producer
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue