Fix bugs in onMessage

This commit is contained in:
Neil Alexander 2021-03-03 15:30:07 +00:00
parent 0157af541f
commit 09bbedd7db
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 8 additions and 12 deletions

View file

@ -85,9 +85,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
} }
if output.Type != api.OutputTypeNewRoomEvent { if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil return nil
} }
@ -114,6 +111,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
// Queue this event to be sent off to the application service // Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database") log.WithError(err).Warn("failed to insert incoming event into appservices database")
return err
} else { } else {
// Tell our worker to send out new messages by updating remaining message // Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast // count and waking them up with a broadcast
@ -143,14 +141,12 @@ func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, e
// e.g. the event came over federation but we do not have the full state persisted. // e.g. the event came over federation but we do not have the full state persisted.
if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil { if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
for _, ev := range membershipRes.JoinEvents { for _, ev := range membershipRes.JoinEvents {
if ev.Type == gomatrixserverlib.MRoomMember { var membership gomatrixserverlib.MemberContent
var membership gomatrixserverlib.MemberContent if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil {
if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil { continue
continue }
} if appservice.IsInterestedInUserID(*ev.StateKey) {
if membership.Membership == gomatrixserverlib.Join && appservice.IsInterestedInUserID(*ev.StateKey) { return true
return true
}
} }
} }
} else { } else {

View file

@ -62,7 +62,7 @@ func SetupTransactionWorkers(
func worker(db storage.Database, ws types.ApplicationServiceWorkerState) { func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"appservice": ws.AppService.ID, "appservice": ws.AppService.ID,
}).Info("starting application service") }).Info("Starting application service")
ctx := context.Background() ctx := context.Background()
// Create a HTTP client for sending requests to app services // Create a HTTP client for sending requests to app services