mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-03-03 08:53:10 -06:00
Tweak that some more
This commit is contained in:
parent
ea115539ef
commit
10dd3a7c2a
|
|
@ -104,17 +104,14 @@ func (s *OutputRoomEventConsumer) onMessage(
|
|||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring")
|
||||
_ = msg.Ack()
|
||||
continue
|
||||
}
|
||||
switch output.Type {
|
||||
case api.OutputTypeNewRoomEvent:
|
||||
if output.NewRoomEvent == nil {
|
||||
_ = msg.Ack()
|
||||
continue
|
||||
}
|
||||
if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
|
||||
_ = msg.Ack()
|
||||
continue
|
||||
}
|
||||
events = append(events, output.NewRoomEvent.Event)
|
||||
|
|
@ -145,21 +142,20 @@ func (s *OutputRoomEventConsumer) onMessage(
|
|||
events = append(events, output.NewInviteEvent.Event)
|
||||
|
||||
default:
|
||||
_ = msg.Ack()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no events selected for sending then we should
|
||||
// negatively ack the pending events so we will retry them again
|
||||
// later.
|
||||
// ack the events so that we don't get sent them again in the
|
||||
// future.
|
||||
if len(events) == 0 {
|
||||
return false
|
||||
return true
|
||||
}
|
||||
|
||||
// Send event to any relevant application services
|
||||
if err := s.sendEvents(ctx, state, events); err != nil {
|
||||
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to filter events")
|
||||
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to send events")
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue