Filter more opportunistically

This commit is contained in:
Neil Alexander 2022-08-31 15:08:28 +01:00
parent c35f4942c9
commit b202c6f889
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -111,6 +111,9 @@ func (s *OutputRoomEventConsumer) onMessage(
if output.NewRoomEvent == nil {
continue
}
if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
continue
}
events = append(events, output.NewRoomEvent.Event)
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
newEventID := output.NewRoomEvent.Event.EventID()
@ -161,26 +164,15 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
) error {
// Filter out the events down to only ones that the appservice has
// any interest in.
// TODO: We can probably benefit from some caching here somewhere.
filteredEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
for _, event := range events {
if !s.appserviceIsInterestedInEvent(ctx, event, state.ApplicationService) {
continue
}
filteredEvents = append(filteredEvents, event)
}
// If there are no events that we are interested in then don't bother
// doing anything else at this point.
if len(filteredEvents) == 0 {
if len(events) == 0 {
return nil
}
// Create the transaction body.
transaction, err := json.Marshal(gomatrixserverlib.ApplicationServiceTransaction{
Events: gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll),
Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll),
})
if err != nil {
return err
@ -188,12 +180,12 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
// TODO: We should probably be more intelligent and pick something not
// in the control of the event. A NATS timestamp header or something maybe.
txnID := filteredEvents[0].Event.OriginServerTS()
txnID := events[0].Event.OriginServerTS()
// Send the transaction to the appservice.
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
req, err := http.NewRequest("PUT", address, bytes.NewBuffer(transaction))
req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction))
if err != nil {
return err
}