Tweak consumer behaviour

This commit is contained in:
Neil Alexander 2022-08-31 16:45:07 +01:00
parent 19bdcd9c06
commit 514ab4bd59
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -3,7 +3,6 @@ package jetstream
import (
"context"
"fmt"
"time"
"github.com/getsentry/sentry-go"
"github.com/nats-io/nats.go"
@ -90,21 +89,25 @@ func JetStreamConsumer(
if len(msgs) < 1 {
continue
}
msg := msgs[len(msgs)-1] // most recent message, in case of AckAll
if err = msg.InProgress(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
sentry.CaptureException(err)
continue
for _, msg := range msgs {
if err = msg.InProgress(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
sentry.CaptureException(err)
continue
}
}
if f(ctx, msgs) {
msg := msgs[len(msgs)-1] // most recent message, in case of AckAll
if err = msg.AckSync(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
sentry.CaptureException(err)
}
} else {
if err = msg.NakWithDelay(time.Second*15, nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
sentry.CaptureException(err)
for _, msg := range msgs {
if err = msg.Nak(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
sentry.CaptureException(err)
}
}
}
}