This commit is contained in:
Neil Alexander 2022-08-31 16:28:53 +01:00
parent 050a10d539
commit e80596effb
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -79,7 +79,8 @@ func (s *OutputRoomEventConsumer) Start() error {
token := jetstream.Tokenise(as.ID)
if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic,
s.cfg.Matrix.JetStream.Durable("Appservice_"+token), 50,
s.cfg.Matrix.JetStream.Durable("Appservice_"+token),
50, // maximum number of events to send in a single transaction
func(ctx context.Context, msgs []*nats.Msg) bool {
return s.onMessage(ctx, state, msgs)
},
@ -96,7 +97,7 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(
ctx context.Context, state *appserviceState, msgs []*nats.Msg,
) bool {
log.WithField("appservice", state.ID).Debugf("Appservice worker received %d message(s) from roomserver", len(msgs))
log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
for _, msg := range msgs {
// Parse out the event JSON
@ -108,10 +109,7 @@ func (s *OutputRoomEventConsumer) onMessage(
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
if output.NewRoomEvent == nil {
continue
}
if !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
if output.NewRoomEvent == nil || !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
continue
}
events = append(events, output.NewRoomEvent.Event)
@ -147,19 +145,16 @@ func (s *OutputRoomEventConsumer) onMessage(
}
// If there are no events selected for sending then we should
// ack the events so that we don't get sent them again in the
// ack the messages so that we don't get sent them again in the
// future.
if len(events) == 0 {
return true
}
// Send event to any relevant application services
if err := s.sendEvents(ctx, state, events); err != nil {
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to send events")
return false
}
return true
// Send event to any relevant application services. If we hit
// an error here, return false, so that we negatively ack.
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
return s.sendEvents(ctx, state, events) == nil
}
// sendEvents passes events to the appservice by using the transactions
@ -168,16 +163,12 @@ func (s *OutputRoomEventConsumer) sendEvents(
ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
) error {
// If there are no events that we are interested in then don't bother
// doing anything else at this point.
if len(events) == 0 {
return nil
}
// Create the transaction body.
transaction, err := json.Marshal(gomatrixserverlib.ApplicationServiceTransaction{
Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll),
})
transaction, err := json.Marshal(
gomatrixserverlib.ApplicationServiceTransaction{
Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll),
},
)
if err != nil {
return err
}
@ -196,8 +187,7 @@ func (s *OutputRoomEventConsumer) sendEvents(
req.Header.Set("Content-Type", "application/json")
resp, err := s.client.Do(req)
if err != nil {
state.backoffAndPause(err)
return err
return state.backoffAndPause(err)
}
// If the response was fine then we can clear any backoffs in place and
@ -206,20 +196,20 @@ func (s *OutputRoomEventConsumer) sendEvents(
case http.StatusOK:
state.backoff = 0
default:
state.backoffAndPause(err)
_ = state.backoffAndPause(err)
}
return nil
}
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func (s *appserviceState) backoffAndPause(err error) {
func (s *appserviceState) backoffAndPause(err error) error {
if s.backoff < 6 {
s.backoff++
}
duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff)))
log.WithField("appservice", s.ID).WithError(err).Warnf("Unable to send transaction to appservice successfully, backing off for %s", duration.String())
time.Sleep(duration)
return err
}
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
@ -227,15 +217,12 @@ func (s *appserviceState) backoffAndPause(err error) {
//
// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool {
// No reason to queue events if they'll never be sent to the application
// service
if appservice.URL == "" {
switch {
case appservice.URL == "":
return false
}
// Check Room ID and Sender of the event
if appservice.IsInterestedInUserID(event.Sender()) ||
appservice.IsInterestedInRoomID(event.RoomID()) {
case appservice.IsInterestedInUserID(event.Sender()):
return true
case appservice.IsInterestedInRoomID(event.RoomID()):
return true
}
@ -256,7 +243,8 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
}
} else {
log.WithFields(log.Fields{
"room_id": event.RoomID(),
"appservice": appservice.ID,
"room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get aliases for room")
}
@ -291,7 +279,8 @@ func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, e
}
} else {
log.WithFields(log.Fields{
"room_id": event.RoomID(),
"appservice": appservice.ID,
"room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get membership for room")
}
return false