diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 4e84cd0f1..f47637c69 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -34,6 +34,14 @@ func JetStreamConsumer( } }() + // If the batch size is greater than 1, we will want to acknowledge all + // received messages in the batch. Below we will send an acknowledgement + // for the most recent message in the batch and AckAll will ensure that + // all messages that came before it are also acknowledged implicitly. + if batch > 1 { + opts = append(opts, nats.AckAll()) + } + name := durable + "Pull" sub, err := js.PullSubscribe(subj, name, opts...) if err != nil { @@ -41,7 +49,6 @@ func JetStreamConsumer( return fmt.Errorf("nats.SubscribeSync: %w", err) } go func() { - nextmsg: for { // If the parent context has given up then there's no point in // carrying on doing anything, so stop the listener. @@ -82,26 +89,21 @@ func JetStreamConsumer( if len(msgs) < 1 { continue } - for _, msg := range msgs { - if err = msg.InProgress(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) - sentry.CaptureException(err) - continue nextmsg - } + msg := msgs[len(msgs)-1] // most recent message, in case of AckAll + if err = msg.InProgress(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) + sentry.CaptureException(err) + continue } if f(ctx, msgs) { - for _, msg := range msgs { - if err = msg.AckSync(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) - sentry.CaptureException(err) - } + if err = msg.AckSync(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) + sentry.CaptureException(err) } } else { - for _, msg := range msgs { - if err = msg.Nak(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) - sentry.CaptureException(err) - } + if err = msg.Nak(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) + sentry.CaptureException(err) } } }