mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-03-03 17:03:10 -06:00
Tweaks
This commit is contained in:
parent
7be0ca41ff
commit
7c837c57c4
|
|
@ -103,7 +103,7 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
switch output.Type {
|
switch output.Type {
|
||||||
|
|
@ -145,7 +145,7 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
|
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
if err := s.filterRoomserverEvents(ctx, state, events); err != nil {
|
if err := s.filterRoomserverEvents(ctx, state, events); err != nil {
|
||||||
log.WithError(err).Errorf("roomserver output log: filter error")
|
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to filter events")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -161,6 +161,9 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
||||||
ctx context.Context, state *appserviceState,
|
ctx context.Context, state *appserviceState,
|
||||||
events []*gomatrixserverlib.HeaderedEvent,
|
events []*gomatrixserverlib.HeaderedEvent,
|
||||||
) error {
|
) error {
|
||||||
|
// Filter out the events down to only ones that the appservice has
|
||||||
|
// any interest in.
|
||||||
|
// TODO: We can probably benefit from some caching here somewhere.
|
||||||
filteredEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
|
filteredEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
if s.appserviceIsInterestedInEvent(ctx, event, state.ApplicationService) {
|
if s.appserviceIsInterestedInEvent(ctx, event, state.ApplicationService) {
|
||||||
|
|
@ -169,6 +172,13 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
||||||
filteredEvents = append(filteredEvents, event)
|
filteredEvents = append(filteredEvents, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If there are no events that we are interested in then don't bother
|
||||||
|
// doing anything else at this point.
|
||||||
|
if len(filteredEvents) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the transaction body.
|
||||||
transaction, err := json.Marshal(gomatrixserverlib.ApplicationServiceTransaction{
|
transaction, err := json.Marshal(gomatrixserverlib.ApplicationServiceTransaction{
|
||||||
Events: gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll),
|
Events: gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll),
|
||||||
})
|
})
|
||||||
|
|
@ -176,9 +186,11 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
txnID := 0 // TODO
|
// TODO: We should probably be more intelligent and pick something not
|
||||||
|
// in the control of the event. A NATS timestamp header or something maybe.
|
||||||
|
txnID := filteredEvents[0].Event.OriginServerTS()
|
||||||
|
|
||||||
// PUT a transaction to our AS
|
// Send the transaction to the appservice.
|
||||||
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
|
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
|
||||||
address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
|
address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
|
||||||
req, err := http.NewRequest("PUT", address, bytes.NewBuffer(transaction))
|
req, err := http.NewRequest("PUT", address, bytes.NewBuffer(transaction))
|
||||||
|
|
@ -186,20 +198,21 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
resp, err := s.client.Do(req)
|
resp, err := s.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
state.backoffAndPause(err)
|
state.backoffAndPause(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the response was fine then we can clear any backoffs in place and
|
||||||
|
// report that everything was OK. Otherwise, back off for a while.
|
||||||
switch resp.StatusCode {
|
switch resp.StatusCode {
|
||||||
case http.StatusOK:
|
case http.StatusOK:
|
||||||
state.backoff = 0
|
state.backoff = 0
|
||||||
return nil
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
|
state.backoffAndPause(err)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
|
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue