diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index e9503f7ed..78331d87b 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -32,32 +32,47 @@ func JetStreamConsumer( return fmt.Errorf("nats.SubscribeSync: %w", err) } go func() { - handle := func(err error) { - if err == context.Canceled || err == context.DeadlineExceeded { - logrus.WithContext(ctx).WithField("subject", subj).Warn(err) - return - } - logrus.WithContext(ctx).WithField("subject", subj).Fatal(err) - } for { + // For reasons that don't make any sense to me, the nats.Context is + // actually not completely limiting. We can't supply context.Background + // because that is not allowed. If we supply a context without a deadline + // then it will just enforce the JetStream timeout of 5 seconds. So it + // is quite likely that we will end up looping here at that timeout. msgs, err := sub.Fetch(1, nats.Context(ctx)) if err != nil { - handle(fmt.Errorf("sub.Fetch: %w", err)) + if err == context.Canceled || err == context.DeadlineExceeded { + // Work out whether it was the JetStream context that + // expired, or whether it was the supplied context. + select { + case <-ctx.Done(): + // The supplied context expired, so we want to stop the + // consumer altogether. + return + default: + // The JetStream context expired, so the fetch just timed + // out and we should try again. + continue + } + } else { + // Something else went wrong, so we'll panic. + logrus.WithContext(ctx).WithField("subject", subj).Fatal(err) + } } if len(msgs) < 1 { continue } msg := msgs[0] if err = msg.InProgress(); err != nil { - handle(fmt.Errorf("msg.InProgress: %w", err)) + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) + continue } if f(ctx, msg) { if err = msg.Ack(); err != nil { - handle(fmt.Errorf("msg.Ack: %w", err)) + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Ack: %w", err)) } } else { if err = msg.Nak(); err != nil { - handle(fmt.Errorf("msg.Nak: %w", err)) + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) } } }