diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index f8ed834fe..c91616227 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -103,7 +103,7 @@ func (s *OutputRoomEventConsumer) onMessage( var output api.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") + log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring") continue } switch output.Type { @@ -145,7 +145,7 @@ func (s *OutputRoomEventConsumer) onMessage( // Send event to any relevant application services if err := s.filterRoomserverEvents(ctx, state, events); err != nil { - log.WithError(err).Errorf("roomserver output log: filter error") + log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to filter events") return false } @@ -161,6 +161,9 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( ctx context.Context, state *appserviceState, events []*gomatrixserverlib.HeaderedEvent, ) error { + // Filter out the events down to only ones that the appservice has + // any interest in. + // TODO: We can probably benefit from some caching here somewhere. filteredEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events)) for _, event := range events { if s.appserviceIsInterestedInEvent(ctx, event, state.ApplicationService) { @@ -169,6 +172,13 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( filteredEvents = append(filteredEvents, event) } + // If there are no events that we are interested in then don't bother + // doing anything else at this point. + if len(filteredEvents) == 0 { + return nil + } + + // Create the transaction body. transaction, err := json.Marshal(gomatrixserverlib.ApplicationServiceTransaction{ Events: gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), }) @@ -176,9 +186,11 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( return err } - txnID := 0 // TODO + // TODO: We should probably be more intelligent and pick something not + // in the control of the event. A NATS timestamp header or something maybe. + txnID := filteredEvents[0].Event.OriginServerTS() - // PUT a transaction to our AS + // Send the transaction to the appservice. // https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken)) req, err := http.NewRequest("PUT", address, bytes.NewBuffer(transaction)) @@ -186,20 +198,21 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( return err } req.Header.Set("Content-Type", "application/json") - resp, err := s.client.Do(req) if err != nil { state.backoffAndPause(err) return err } + // If the response was fine then we can clear any backoffs in place and + // report that everything was OK. Otherwise, back off for a while. switch resp.StatusCode { case http.StatusOK: state.backoff = 0 - return nil default: - return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode) + state.backoffAndPause(err) } + return nil } // backoff pauses the calling goroutine for a 2^some backoff exponent seconds