From b8beae8f98f9e438ac717c92759f49a43c7782fd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 2 Feb 2022 11:57:24 +0000 Subject: [PATCH] Clean up old consumers --- setup/jetstream/helpers.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index ec7fc8728..e9503f7ed 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -9,17 +9,25 @@ import ( ) func JetStreamConsumer( - ctx context.Context, nats nats.JetStreamContext, subj, durable string, + ctx context.Context, js nats.JetStreamContext, subj, durable string, f func(ctx context.Context, msg *nats.Msg) bool, opts ...nats.SubOpt, ) error { - if cinfo, err := nats.ConsumerInfo(subj, durable); err == nil && cinfo.PushBound { - if err := nats.DeleteConsumer(subj, durable); err != nil { - return fmt.Errorf("nats.DeleteConsumer: %w", err) + defer func() { + // If there are existing consumers from before they were pull + // consumers, we need to clean up the old push consumers. However, + // in order to not affect the interest-based policies, we need to + // do this *after* creating the new pull consumers, which have + // "Pull" suffixed to their name. + if _, err := js.ConsumerInfo(subj, durable); err == nil { + if err := js.DeleteConsumer(subj, durable); err != nil { + logrus.WithContext(ctx).Warnf("Failed to clean up old consumer %q", durable) + } } - } + }() - sub, err := nats.PullSubscribe(subj, durable, opts...) + name := durable + "Pull" + sub, err := js.PullSubscribe(subj, name, opts...) if err != nil { return fmt.Errorf("nats.SubscribeSync: %w", err) } @@ -32,10 +40,14 @@ func JetStreamConsumer( logrus.WithContext(ctx).WithField("subject", subj).Fatal(err) } for { - msg, err := sub.NextMsgWithContext(ctx) + msgs, err := sub.Fetch(1, nats.Context(ctx)) if err != nil { - handle(fmt.Errorf("sub.NextMsgWithContext: %w", err)) + handle(fmt.Errorf("sub.Fetch: %w", err)) } + if len(msgs) < 1 { + continue + } + msg := msgs[0] if err = msg.InProgress(); err != nil { handle(fmt.Errorf("msg.InProgress: %w", err)) }