From ea115539efd9b31703b976a644f43bfafb09a56a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 31 Aug 2022 15:20:19 +0100 Subject: [PATCH] Acknowledgement tweaks --- appservice/consumers/roomserver.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 5d969301e..58d471148 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -83,7 +83,7 @@ func (s *OutputRoomEventConsumer) Start() error { func(ctx context.Context, msgs []*nats.Msg) bool { return s.onMessage(ctx, state, msgs) }, - nats.DeliverAll(), nats.ManualAck(), + nats.DeliverNew(), nats.ManualAck(), ); err != nil { return fmt.Errorf("failed to create %q consumer: %w", token, err) } @@ -104,14 +104,17 @@ func (s *OutputRoomEventConsumer) onMessage( if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring") + _ = msg.Ack() continue } switch output.Type { case api.OutputTypeNewRoomEvent: if output.NewRoomEvent == nil { + _ = msg.Ack() continue } if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) { + _ = msg.Ack() continue } events = append(events, output.NewRoomEvent.Event) @@ -142,10 +145,18 @@ func (s *OutputRoomEventConsumer) onMessage( events = append(events, output.NewInviteEvent.Event) default: + _ = msg.Ack() continue } } + // If there are no events selected for sending then we should + // negatively ack the pending events so we will retry them again + // later. + if len(events) == 0 { + return false + } + // Send event to any relevant application services if err := s.sendEvents(ctx, state, events); err != nil { log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to filter events")