Fix up event filtering

This commit is contained in:
Andrew Morgan 2018-06-24 20:54:04 +01:00
parent 79754a2ab7
commit 9f863de5d6

View file

@ -113,12 +113,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return err return err
} }
// Check if any events need to passed on to external application services // Combine any state and non-state events and send them to the application service
if len(events) > 0 { return s.filterRoomserverEvents(ctx, ev)
// Check if this was a membership event
return s.filterRoomserverEvents(ctx, events)
}
return s.filterRoomserverEvents(ctx, append(events, ev))
} }
// lookupStateEvents looks up the state events that are added by a new event. // lookupStateEvents looks up the state events that are added by a new event.
@ -172,20 +168,18 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// application service. // application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents( func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context, ctx context.Context,
events []gomatrixserverlib.Event, event gomatrixserverlib.Event,
) error { ) error {
for _, event := range events { for _, ws := range s.workerStates {
for _, ws := range s.workerStates { // Check if this event is interesting to this application service
// Check if this event is interesting to this application service if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { // Queue this event to be sent off to the application service
// Queue this event to be sent off to the application service if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { log.WithError(err).Warn("failed to insert incoming event into appservices database")
log.WithError(err).Warn("failed to insert incoming event into appservices database") } else {
} else { // Tell our worker to send out new messages by updating remaining message
// Tell our worker to send out new messages by updating remaining message // count and waking them up with a broadcast
// count and waking them up with a broadcast ws.NotifyNewEvent()
ws.NotifyNewEvent()
}
} }
} }
} }