From 9f863de5d6673d4eb03ca547969436c3f14ea29c Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Sun, 24 Jun 2018 20:54:04 +0100 Subject: [PATCH] Fix up event filtering --- .../appservice/consumers/roomserver.go | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index b62338d29..4f4586ebd 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -113,12 +113,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return err } - // Check if any events need to passed on to external application services - if len(events) > 0 { - // Check if this was a membership event - return s.filterRoomserverEvents(ctx, events) - } - return s.filterRoomserverEvents(ctx, append(events, ev)) + // Combine any state and non-state events and send them to the application service + return s.filterRoomserverEvents(ctx, ev) } // lookupStateEvents looks up the state events that are added by a new event. @@ -172,20 +168,18 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( // application service. func (s *OutputRoomEventConsumer) filterRoomserverEvents( ctx context.Context, - events []gomatrixserverlib.Event, + event gomatrixserverlib.Event, ) error { - for _, event := range events { - for _, ws := range s.workerStates { - // Check if this event is interesting to this application service - if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { - // Queue this event to be sent off to the application service - if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { - log.WithError(err).Warn("failed to insert incoming event into appservices database") - } else { - // Tell our worker to send out new messages by updating remaining message - // count and waking them up with a broadcast - ws.NotifyNewEvent() - } + for _, ws := range s.workerStates { + // Check if this event is interesting to this application service + if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { + // Queue this event to be sent off to the application service + if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { + log.WithError(err).Warn("failed to insert incoming event into appservices database") + } else { + // Tell our worker to send out new messages by updating remaining message + // count and waking them up with a broadcast + ws.NotifyNewEvent() } } }