mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 11:23:11 -06:00
Try double-unmarshal
This commit is contained in:
parent
d3634504b7
commit
66216375cc
|
|
@ -81,6 +81,11 @@ func (s *OutputRoomEventConsumer) Start() error {
|
|||
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
if output.Type != api.OutputTypeNewRoomEvent {
|
||||
log.WithField("type", output.Type).Debug(
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
if output.Type != api.OutputTypeNewRoomEvent {
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
|
|
@ -86,6 +87,31 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Get the room version of the room
|
||||
vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)}
|
||||
vQueryRes := api.QueryRoomVersionForRoomIDResponse{}
|
||||
if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"room_id": string(msg.Key),
|
||||
}).WithError(err).Errorf("can't query room version")
|
||||
return err
|
||||
}
|
||||
|
||||
// Prepare the room event so that it has the correct field types
|
||||
// for the room version
|
||||
if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"room_version": vQueryRes.RoomVersion,
|
||||
}).WithError(err).Errorf("can't prepare event to version")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
|
|
|
|||
|
|
@ -63,6 +63,11 @@ func (s *OutputRoomEventConsumer) Start() error {
|
|||
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Filter out any messages that aren't new room events
|
||||
if output.Type != api.OutputTypeNewRoomEvent {
|
||||
|
|
|
|||
Loading…
Reference in a new issue