Pull consumers

This commit is contained in:
Neil Alexander 2022-02-02 11:36:53 +00:00
parent 9f969498ce
commit d95c55577b
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
11 changed files with 40 additions and 36 deletions

View file

@ -34,7 +34,7 @@ import (
type OutputRoomEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
asDB storage.Database
rsAPI api.RoomserverInternalAPI
@ -67,8 +67,8 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}

View file

@ -34,7 +34,7 @@ import (
type OutputEDUConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
@ -67,20 +67,20 @@ func NewOutputEDUConsumer(
// Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.typingTopic, t.onTypingEvent,
t.durable, nats.DeliverAll(), nats.ManualAck(),
t.ctx, t.jetstream, t.typingTopic, t.durable, t.onTypingEvent,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.sendToDeviceTopic, t.onSendToDeviceEvent,
t.durable, nats.DeliverAll(), nats.ManualAck(),
t.ctx, t.jetstream, t.sendToDeviceTopic, t.durable, t.onSendToDeviceEvent,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.receiptTopic, t.onReceiptEvent,
t.durable, nats.DeliverAll(), nats.ManualAck(),
t.ctx, t.jetstream, t.receiptTopic, t.durable, t.onReceiptEvent,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}

View file

@ -37,7 +37,7 @@ type OutputRoomEventConsumer struct {
cfg *config.FederationAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
db storage.Database
queues *queue.OutgoingQueues
topic string
@ -67,8 +67,8 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}

View file

@ -41,7 +41,7 @@ type RoomserverInternalAPI struct {
fsAPI fsAPI.FederationInternalAPI
asAPI asAPI.AppServiceQueryAPI
JetStream nats.JetStreamContext
Durable nats.SubOpt
Durable string
InputRoomEventTopic string // JetStream topic for new input room events
OutputRoomEventTopic string // JetStream topic for new output room events
PerspectiveServerNames []gomatrixserverlib.ServerName
@ -87,7 +87,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic,
JetStream: r.JetStream,
Durable: r.Durable,
Durable: nats.Durable(r.Durable),
ServerName: r.Cfg.Matrix.ServerName,
FSAPI: fsAPI,
KeyRing: keyRing,

View file

@ -2,8 +2,6 @@ package config
import (
"fmt"
"github.com/nats-io/nats.go"
)
type JetStream struct {
@ -25,8 +23,8 @@ func (c *JetStream) TopicFor(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name)
}
func (c *JetStream) Durable(name string) nats.SubOpt {
return nats.Durable(c.TopicFor(name))
func (c *JetStream) Durable(name string) string {
return c.TopicFor(name)
}
func (c *JetStream) Defaults(generate bool) {

View file

@ -9,11 +9,17 @@ import (
)
func JetStreamConsumer(
ctx context.Context, nats nats.JetStreamContext, subj string,
ctx context.Context, nats nats.JetStreamContext, subj, durable string,
f func(ctx context.Context, msg *nats.Msg) bool,
opts ...nats.SubOpt,
) error {
sub, err := nats.SubscribeSync(subj, opts...)
if _, err := nats.ConsumerInfo(subj, durable); err == nil {
if err := nats.DeleteConsumer(subj, durable); err != nil {
return fmt.Errorf("nats.DeleteConsumer: %w", err)
}
}
sub, err := nats.PullSubscribe(subj, durable, opts...)
if err != nil {
return fmt.Errorf("nats.SubscribeSync: %w", err)
}

View file

@ -34,7 +34,7 @@ import (
type OutputClientDataConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
stream types.StreamProvider
@ -64,8 +64,8 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}

View file

@ -34,7 +34,7 @@ import (
type OutputReceiptEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
stream types.StreamProvider
@ -65,8 +65,8 @@ func NewOutputReceiptEventConsumer(
// Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}

View file

@ -36,7 +36,7 @@ import (
type OutputSendToDeviceEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
@ -69,8 +69,8 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}

View file

@ -35,7 +35,7 @@ import (
type OutputTypingEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
eduCache *cache.EDUCache
stream types.StreamProvider
@ -67,8 +67,8 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}

View file

@ -38,7 +38,7 @@ type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
durable nats.SubOpt
durable string
topic string
db storage.Database
pduStream types.StreamProvider
@ -74,8 +74,8 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}