diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 44ac9fc3e..395fa994c 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -81,6 +81,11 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } if output.Type != api.OutputTypeNewRoomEvent { log.WithField("type", output.Type).Debug( diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 0180e38e8..d47254efd 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -79,6 +79,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } + if output.Type != api.OutputTypeNewRoomEvent { log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -86,6 +87,31 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } + // Get the room version of the room + vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)} + vQueryRes := api.QueryRoomVersionForRoomIDResponse{} + if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil { + log.WithFields(log.Fields{ + "room_id": string(msg.Key), + }).WithError(err).Errorf("can't query room version") + return err + } + + // Prepare the room event so that it has the correct field types + // for the room version + if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil { + log.WithFields(log.Fields{ + "room_version": vQueryRes.RoomVersion, + }).WithError(err).Errorf("can't prepare event to version") + return err + } + + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index 22584a1d5..c56d8215b 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -63,6 +63,11 @@ func (s *OutputRoomEventConsumer) Start() error { // onMessage is called when the sync server receives a new event from the room server output log. func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } // Filter out any messages that aren't new room events if output.Type != api.OutputTypeNewRoomEvent {