From df4f3aefdd9718edfb039ad07a5f1579518736b9 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 11 Mar 2020 17:41:31 +0000 Subject: [PATCH] Double-unmarshal otherwise we lack context of sarama event type --- appservice/consumers/roomserver.go | 20 +++++++++++++------- federationsender/consumers/roomserver.go | 20 +++++++++++++------- publicroomsapi/consumers/roomserver.go | 22 ++++++++++++++-------- syncapi/consumers/roomserver.go | 6 ++++++ 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index e93d45fb2..766d4b277 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -84,6 +84,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be @@ -114,13 +127,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil - } - ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index f03e0b5a3..6940c0d51 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -77,6 +77,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be @@ -107,13 +120,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil - } - ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index a2ec1d92c..aae12bf50 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -67,6 +67,20 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + // Filter out any messages that aren't new room events + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be @@ -97,14 +111,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - // Filter out any messages that aren't new room events - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil - } - ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 32eaf6e2d..4731e830c 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -77,6 +77,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be