diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 50d5af3a4..60522f6d7 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -79,7 +79,8 @@ func (s *OutputRoomEventConsumer) Start() error { token := jetstream.Tokenise(as.ID) if err := jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, - s.cfg.Matrix.JetStream.Durable("Appservice_"+token), 50, + s.cfg.Matrix.JetStream.Durable("Appservice_"+token), + 50, // maximum number of events to send in a single transaction func(ctx context.Context, msgs []*nats.Msg) bool { return s.onMessage(ctx, state, msgs) }, @@ -96,7 +97,7 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage( ctx context.Context, state *appserviceState, msgs []*nats.Msg, ) bool { - log.WithField("appservice", state.ID).Debugf("Appservice worker received %d message(s) from roomserver", len(msgs)) + log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs)) events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs)) for _, msg := range msgs { // Parse out the event JSON @@ -108,10 +109,7 @@ func (s *OutputRoomEventConsumer) onMessage( } switch output.Type { case api.OutputTypeNewRoomEvent: - if output.NewRoomEvent == nil { - continue - } - if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) { + if output.NewRoomEvent == nil || !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) { continue } events = append(events, output.NewRoomEvent.Event) @@ -147,19 +145,16 @@ func (s *OutputRoomEventConsumer) onMessage( } // If there are no events selected for sending then we should - // ack the events so that we don't get sent them again in the + // ack the messages so that we don't get sent them again in the // future. if len(events) == 0 { return true } - // Send event to any relevant application services - if err := s.sendEvents(ctx, state, events); err != nil { - log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to send events") - return false - } - - return true + // Send event to any relevant application services. If we hit + // an error here, return false, so that we negatively ack. + log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events)) + return s.sendEvents(ctx, state, events) == nil } // sendEvents passes events to the appservice by using the transactions @@ -168,16 +163,12 @@ func (s *OutputRoomEventConsumer) sendEvents( ctx context.Context, state *appserviceState, events []*gomatrixserverlib.HeaderedEvent, ) error { - // If there are no events that we are interested in then don't bother - // doing anything else at this point. - if len(events) == 0 { - return nil - } - // Create the transaction body. - transaction, err := json.Marshal(gomatrixserverlib.ApplicationServiceTransaction{ - Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll), - }) + transaction, err := json.Marshal( + gomatrixserverlib.ApplicationServiceTransaction{ + Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll), + }, + ) if err != nil { return err } @@ -196,8 +187,7 @@ func (s *OutputRoomEventConsumer) sendEvents( req.Header.Set("Content-Type", "application/json") resp, err := s.client.Do(req) if err != nil { - state.backoffAndPause(err) - return err + return state.backoffAndPause(err) } // If the response was fine then we can clear any backoffs in place and @@ -206,20 +196,20 @@ func (s *OutputRoomEventConsumer) sendEvents( case http.StatusOK: state.backoff = 0 default: - state.backoffAndPause(err) + _ = state.backoffAndPause(err) } return nil } // backoff pauses the calling goroutine for a 2^some backoff exponent seconds -func (s *appserviceState) backoffAndPause(err error) { +func (s *appserviceState) backoffAndPause(err error) error { if s.backoff < 6 { s.backoff++ } - duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff))) log.WithField("appservice", s.ID).WithError(err).Warnf("Unable to send transaction to appservice successfully, backing off for %s", duration.String()) time.Sleep(duration) + return err } // appserviceIsInterestedInEvent returns a boolean depending on whether a given @@ -227,15 +217,12 @@ func (s *appserviceState) backoffAndPause(err error) { // // TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682 func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool { - // No reason to queue events if they'll never be sent to the application - // service - if appservice.URL == "" { + switch { + case appservice.URL == "": return false - } - - // Check Room ID and Sender of the event - if appservice.IsInterestedInUserID(event.Sender()) || - appservice.IsInterestedInRoomID(event.RoomID()) { + case appservice.IsInterestedInUserID(event.Sender()): + return true + case appservice.IsInterestedInRoomID(event.RoomID()): return true } @@ -256,7 +243,8 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont } } else { log.WithFields(log.Fields{ - "room_id": event.RoomID(), + "appservice": appservice.ID, + "room_id": event.RoomID(), }).WithError(err).Errorf("Unable to get aliases for room") } @@ -291,7 +279,8 @@ func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, e } } else { log.WithFields(log.Fields{ - "room_id": event.RoomID(), + "appservice": appservice.ID, + "room_id": event.RoomID(), }).WithError(err).Errorf("Unable to get membership for room") } return false