From d95c55577b0f9b1bb9dde55ae8f15cd015ed4224 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 2 Feb 2022 11:36:53 +0000 Subject: [PATCH] Pull consumers --- appservice/consumers/roomserver.go | 6 +++--- federationapi/consumers/eduserver.go | 14 +++++++------- federationapi/consumers/roomserver.go | 6 +++--- roomserver/internal/api.go | 4 ++-- setup/config/config_jetstream.go | 6 ++---- setup/jetstream/helpers.go | 10 ++++++++-- syncapi/consumers/clientapi.go | 6 +++--- syncapi/consumers/eduserver_receipts.go | 6 +++--- syncapi/consumers/eduserver_sendtodevice.go | 6 +++--- syncapi/consumers/eduserver_typing.go | 6 +++--- syncapi/consumers/roomserver.go | 6 +++--- 11 files changed, 40 insertions(+), 36 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 2813c8d3a..7b59e3704 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -34,7 +34,7 @@ import ( type OutputRoomEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext - durable nats.SubOpt + durable string topic string asDB storage.Database rsAPI api.RoomserverInternalAPI @@ -67,8 +67,8 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.onMessage, - s.durable, nats.DeliverAll(), nats.ManualAck(), + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), ) } diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 170bbd53c..22fedbeb4 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -34,7 +34,7 @@ import ( type OutputEDUConsumer struct { ctx context.Context jetstream nats.JetStreamContext - durable nats.SubOpt + durable string db storage.Database queues *queue.OutgoingQueues ServerName gomatrixserverlib.ServerName @@ -67,20 +67,20 @@ func NewOutputEDUConsumer( // Start consuming from EDU servers func (t *OutputEDUConsumer) Start() error { if err := jetstream.JetStreamConsumer( - t.ctx, t.jetstream, t.typingTopic, t.onTypingEvent, - t.durable, nats.DeliverAll(), nats.ManualAck(), + t.ctx, t.jetstream, t.typingTopic, t.durable, t.onTypingEvent, + nats.DeliverAll(), nats.ManualAck(), ); err != nil { return err } if err := jetstream.JetStreamConsumer( - t.ctx, t.jetstream, t.sendToDeviceTopic, t.onSendToDeviceEvent, - t.durable, nats.DeliverAll(), nats.ManualAck(), + t.ctx, t.jetstream, t.sendToDeviceTopic, t.durable, t.onSendToDeviceEvent, + nats.DeliverAll(), nats.ManualAck(), ); err != nil { return err } if err := jetstream.JetStreamConsumer( - t.ctx, t.jetstream, t.receiptTopic, t.onReceiptEvent, - t.durable, nats.DeliverAll(), nats.ManualAck(), + t.ctx, t.jetstream, t.receiptTopic, t.durable, t.onReceiptEvent, + nats.DeliverAll(), nats.ManualAck(), ); err != nil { return err } diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 4ac799242..ac29f930b 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -37,7 +37,7 @@ type OutputRoomEventConsumer struct { cfg *config.FederationAPI rsAPI api.RoomserverInternalAPI jetstream nats.JetStreamContext - durable nats.SubOpt + durable string db storage.Database queues *queue.OutgoingQueues topic string @@ -67,8 +67,8 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.onMessage, - s.durable, nats.DeliverAll(), nats.ManualAck(), + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), ) } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 5b87e623d..fd963ad83 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -41,7 +41,7 @@ type RoomserverInternalAPI struct { fsAPI fsAPI.FederationInternalAPI asAPI asAPI.AppServiceQueryAPI JetStream nats.JetStreamContext - Durable nats.SubOpt + Durable string InputRoomEventTopic string // JetStream topic for new input room events OutputRoomEventTopic string // JetStream topic for new output room events PerspectiveServerNames []gomatrixserverlib.ServerName @@ -87,7 +87,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA InputRoomEventTopic: r.InputRoomEventTopic, OutputRoomEventTopic: r.OutputRoomEventTopic, JetStream: r.JetStream, - Durable: r.Durable, + Durable: nats.Durable(r.Durable), ServerName: r.Cfg.Matrix.ServerName, FSAPI: fsAPI, KeyRing: keyRing, diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index 94e2d88b3..9271cd8b4 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -2,8 +2,6 @@ package config import ( "fmt" - - "github.com/nats-io/nats.go" ) type JetStream struct { @@ -25,8 +23,8 @@ func (c *JetStream) TopicFor(name string) string { return fmt.Sprintf("%s%s", c.TopicPrefix, name) } -func (c *JetStream) Durable(name string) nats.SubOpt { - return nats.Durable(c.TopicFor(name)) +func (c *JetStream) Durable(name string) string { + return c.TopicFor(name) } func (c *JetStream) Defaults(generate bool) { diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 5a0372f19..0354d8647 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -9,11 +9,17 @@ import ( ) func JetStreamConsumer( - ctx context.Context, nats nats.JetStreamContext, subj string, + ctx context.Context, nats nats.JetStreamContext, subj, durable string, f func(ctx context.Context, msg *nats.Msg) bool, opts ...nats.SubOpt, ) error { - sub, err := nats.SubscribeSync(subj, opts...) + if _, err := nats.ConsumerInfo(subj, durable); err == nil { + if err := nats.DeleteConsumer(subj, durable); err != nil { + return fmt.Errorf("nats.DeleteConsumer: %w", err) + } + } + + sub, err := nats.PullSubscribe(subj, durable, opts...) if err != nil { return fmt.Errorf("nats.SubscribeSync: %w", err) } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index d0083d677..c3650085f 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -34,7 +34,7 @@ import ( type OutputClientDataConsumer struct { ctx context.Context jetstream nats.JetStreamContext - durable nats.SubOpt + durable string topic string db storage.Database stream types.StreamProvider @@ -64,8 +64,8 @@ func NewOutputClientDataConsumer( // Start consuming from room servers func (s *OutputClientDataConsumer) Start() error { return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.onMessage, - s.durable, nats.DeliverAll(), nats.ManualAck(), + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), ) } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 29efb6454..392840ece 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -34,7 +34,7 @@ import ( type OutputReceiptEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext - durable nats.SubOpt + durable string topic string db storage.Database stream types.StreamProvider @@ -65,8 +65,8 @@ func NewOutputReceiptEventConsumer( // Start consuming from EDU api func (s *OutputReceiptEventConsumer) Start() error { return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.onMessage, - s.durable, nats.DeliverAll(), nats.ManualAck(), + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), ) } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index a64394392..b0beef063 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -36,7 +36,7 @@ import ( type OutputSendToDeviceEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext - durable nats.SubOpt + durable string topic string db storage.Database serverName gomatrixserverlib.ServerName // our server name @@ -69,8 +69,8 @@ func NewOutputSendToDeviceEventConsumer( // Start consuming from EDU api func (s *OutputSendToDeviceEventConsumer) Start() error { return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.onMessage, - s.durable, nats.DeliverAll(), nats.ManualAck(), + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), ) } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 3669665bd..cae5df8a8 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -35,7 +35,7 @@ import ( type OutputTypingEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext - durable nats.SubOpt + durable string topic string eduCache *cache.EDUCache stream types.StreamProvider @@ -67,8 +67,8 @@ func NewOutputTypingEventConsumer( // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.onMessage, - s.durable, nats.DeliverAll(), nats.ManualAck(), + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), ) } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index a16049db4..7fe52b728 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -38,7 +38,7 @@ type OutputRoomEventConsumer struct { cfg *config.SyncAPI rsAPI api.RoomserverInternalAPI jetstream nats.JetStreamContext - durable nats.SubOpt + durable string topic string db storage.Database pduStream types.StreamProvider @@ -74,8 +74,8 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.onMessage, - s.durable, nats.DeliverAll(), nats.ManualAck(), + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), ) }