From 514ab4bd591cb50878586f962ab3279db2dd078f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 31 Aug 2022 16:45:07 +0100 Subject: [PATCH] Tweak consumer behaviour --- setup/jetstream/helpers.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 99a0d2d4a..5595d0a84 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -3,7 +3,6 @@ package jetstream import ( "context" "fmt" - "time" "github.com/getsentry/sentry-go" "github.com/nats-io/nats.go" @@ -90,21 +89,25 @@ func JetStreamConsumer( if len(msgs) < 1 { continue } - msg := msgs[len(msgs)-1] // most recent message, in case of AckAll - if err = msg.InProgress(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) - sentry.CaptureException(err) - continue + for _, msg := range msgs { + if err = msg.InProgress(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) + sentry.CaptureException(err) + continue + } } if f(ctx, msgs) { + msg := msgs[len(msgs)-1] // most recent message, in case of AckAll if err = msg.AckSync(nats.Context(ctx)); err != nil { logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) sentry.CaptureException(err) } } else { - if err = msg.NakWithDelay(time.Second*15, nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) - sentry.CaptureException(err) + for _, msg := range msgs { + if err = msg.Nak(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) + sentry.CaptureException(err) + } } } }