Double-unmarshal otherwise we lack context of sarama event type

This commit is contained in:
Neil Alexander 2020-03-11 17:41:31 +00:00
parent 0049100def
commit df4f3aefdd
4 changed files with 46 additions and 22 deletions

View file

@ -84,6 +84,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
// See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format
// will be
@ -114,13 +127,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),

View file

@ -77,6 +77,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
// See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format
// will be
@ -107,13 +120,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),

View file

@ -67,6 +67,20 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
// Filter out any messages that aren't new room events
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
// See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format
// will be
@ -97,14 +111,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
// Filter out any messages that aren't new room events
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),

View file

@ -77,6 +77,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
// See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format
// will be