Acknowledgement tweaks

This commit is contained in:
Neil Alexander 2022-08-31 15:20:19 +01:00
parent 2ceff09003
commit ea115539ef
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -83,7 +83,7 @@ func (s *OutputRoomEventConsumer) Start() error {
func(ctx context.Context, msgs []*nats.Msg) bool { func(ctx context.Context, msgs []*nats.Msg) bool {
return s.onMessage(ctx, state, msgs) return s.onMessage(ctx, state, msgs)
}, },
nats.DeliverAll(), nats.ManualAck(), nats.DeliverNew(), nats.ManualAck(),
); err != nil { ); err != nil {
return fmt.Errorf("failed to create %q consumer: %w", token, err) return fmt.Errorf("failed to create %q consumer: %w", token, err)
} }
@ -104,14 +104,17 @@ func (s *OutputRoomEventConsumer) onMessage(
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring") log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring")
_ = msg.Ack()
continue continue
} }
switch output.Type { switch output.Type {
case api.OutputTypeNewRoomEvent: case api.OutputTypeNewRoomEvent:
if output.NewRoomEvent == nil { if output.NewRoomEvent == nil {
_ = msg.Ack()
continue continue
} }
if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) { if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
_ = msg.Ack()
continue continue
} }
events = append(events, output.NewRoomEvent.Event) events = append(events, output.NewRoomEvent.Event)
@ -142,10 +145,18 @@ func (s *OutputRoomEventConsumer) onMessage(
events = append(events, output.NewInviteEvent.Event) events = append(events, output.NewInviteEvent.Event)
default: default:
_ = msg.Ack()
continue continue
} }
} }
// If there are no events selected for sending then we should
// negatively ack the pending events so we will retry them again
// later.
if len(events) == 0 {
return false
}
// Send event to any relevant application services // Send event to any relevant application services
if err := s.sendEvents(ctx, state, events); err != nil { if err := s.sendEvents(ctx, state, events); err != nil {
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to filter events") log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to filter events")