diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index a65281514..98202dc69 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -15,8 +15,10 @@ package consumers import ( + "bytes" "context" "encoding/json" + "errors" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common" @@ -84,7 +86,36 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event + // See if the room version is present in the headers. If it isn't + // then we can't process the event as we don't know what the format + // will be + var roomVersion gomatrixserverlib.RoomVersion + for _, header := range msg.Headers { + if bytes.Equal(header.Key, []byte("room_version")) { + roomVersion = gomatrixserverlib.RoomVersion(header.Value) + break + } + } + if roomVersion == "" { + return errors.New("room version was not in sarama headers") + } + + // Prepare the room event so that it has the correct field types + // for the room version + ev := gomatrixserverlib.Event{} + if err := ev.PrepareAs(roomVersion); err != nil { + log.WithFields(log.Fields{ + "room_version": roomVersion, + }).WithError(err).Errorf("can't prepare event to version") + return err + } + + if err := json.Unmarshal(msg.Value, &ev); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(),