diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 58d471148..50d5af3a4 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -104,17 +104,14 @@ func (s *OutputRoomEventConsumer) onMessage( if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring") - _ = msg.Ack() continue } switch output.Type { case api.OutputTypeNewRoomEvent: if output.NewRoomEvent == nil { - _ = msg.Ack() continue } if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) { - _ = msg.Ack() continue } events = append(events, output.NewRoomEvent.Event) @@ -145,21 +142,20 @@ func (s *OutputRoomEventConsumer) onMessage( events = append(events, output.NewInviteEvent.Event) default: - _ = msg.Ack() continue } } // If there are no events selected for sending then we should - // negatively ack the pending events so we will retry them again - // later. + // ack the events so that we don't get sent them again in the + // future. if len(events) == 0 { - return false + return true } // Send event to any relevant application services if err := s.sendEvents(ctx, state, events); err != nil { - log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to filter events") + log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to send events") return false }