diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 99a0d2d4a..5595d0a84 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -3,7 +3,6 @@ package jetstream import ( "context" "fmt" - "time" "github.com/getsentry/sentry-go" "github.com/nats-io/nats.go" @@ -90,21 +89,25 @@ func JetStreamConsumer( if len(msgs) < 1 { continue } - msg := msgs[len(msgs)-1] // most recent message, in case of AckAll - if err = msg.InProgress(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) - sentry.CaptureException(err) - continue + for _, msg := range msgs { + if err = msg.InProgress(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) + sentry.CaptureException(err) + continue + } } if f(ctx, msgs) { + msg := msgs[len(msgs)-1] // most recent message, in case of AckAll if err = msg.AckSync(nats.Context(ctx)); err != nil { logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) sentry.CaptureException(err) } } else { - if err = msg.NakWithDelay(time.Second*15, nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) - sentry.CaptureException(err) + for _, msg := range msgs { + if err = msg.Nak(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) + sentry.CaptureException(err) + } } } }