mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-03-03 17:03:10 -06:00
Revert "Looks like AckAll won't work on a pull subscription"
This reverts commit 92c4a96003.
This commit is contained in:
parent
0bd7c88724
commit
42e14af529
|
|
@ -34,6 +34,14 @@ func JetStreamConsumer(
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// If the batch size is greater than 1, we will want to acknowledge all
|
||||||
|
// received messages in the batch. Below we will send an acknowledgement
|
||||||
|
// for the most recent message in the batch and AckAll will ensure that
|
||||||
|
// all messages that came before it are also acknowledged implicitly.
|
||||||
|
if batch > 1 {
|
||||||
|
opts = append(opts, nats.AckAll())
|
||||||
|
}
|
||||||
|
|
||||||
name := durable + "Pull"
|
name := durable + "Pull"
|
||||||
sub, err := js.PullSubscribe(subj, name, opts...)
|
sub, err := js.PullSubscribe(subj, name, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -41,7 +49,6 @@ func JetStreamConsumer(
|
||||||
return fmt.Errorf("nats.SubscribeSync: %w", err)
|
return fmt.Errorf("nats.SubscribeSync: %w", err)
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
nextmsg:
|
|
||||||
for {
|
for {
|
||||||
// If the parent context has given up then there's no point in
|
// If the parent context has given up then there's no point in
|
||||||
// carrying on doing anything, so stop the listener.
|
// carrying on doing anything, so stop the listener.
|
||||||
|
|
@ -82,26 +89,21 @@ func JetStreamConsumer(
|
||||||
if len(msgs) < 1 {
|
if len(msgs) < 1 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, msg := range msgs {
|
msg := msgs[len(msgs)-1] // most recent message, in case of AckAll
|
||||||
if err = msg.InProgress(nats.Context(ctx)); err != nil {
|
if err = msg.InProgress(nats.Context(ctx)); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
continue nextmsg
|
continue
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if f(ctx, msgs) {
|
if f(ctx, msgs) {
|
||||||
for _, msg := range msgs {
|
if err = msg.AckSync(nats.Context(ctx)); err != nil {
|
||||||
if err = msg.AckSync(nats.Context(ctx)); err != nil {
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
|
sentry.CaptureException(err)
|
||||||
sentry.CaptureException(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for _, msg := range msgs {
|
if err = msg.Nak(nats.Context(ctx)); err != nil {
|
||||||
if err = msg.Nak(nats.Context(ctx)); err != nil {
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
|
sentry.CaptureException(err)
|
||||||
sentry.CaptureException(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue