diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index f7e4e27fd..5aa3ce7dd 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" log "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" @@ -99,24 +100,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return errors.New("room version was not in sarama headers") } - // Prepare the room event so that it has the correct field types - // for the room version - output.NewRoomEvent = &api.OutputNewRoomEvent{ - Event: gomatrixserverlib.Event{}, - } - if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": roomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } + evJSON := gjson.Get(string(msg.Value), "new_room_event.event") + if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { + output.NewRoomEvent.Event = ev + } else { + return errors.New("unable to get new_room_event.event") + } + log.WithFields(log.Fields{ "event_id": output.NewRoomEvent.Event.EventID(), "room_id": output.NewRoomEvent.Event.RoomID(), diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 9966c8101..cf72f6ced 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -110,26 +111,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return errors.New("room version was not in sarama headers") } - fmt.Println("room version is", roomVersion) - - // Prepare the room event so that it has the correct field types - // for the room version - output.NewRoomEvent = &api.OutputNewRoomEvent{ - Event: gomatrixserverlib.Event{}, - } - if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": roomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } + evJSON := gjson.Get(string(msg.Value), "new_room_event.event") + if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { + output.NewRoomEvent.Event = ev + } else { + return errors.New("unable to get new_room_event.event") + } + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index 64f722098..0f43278b3 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -92,24 +93,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return errors.New("room version was not in sarama headers") } - // Prepare the room event so that it has the correct field types - // for the room version - output.NewRoomEvent = &api.OutputNewRoomEvent{ - Event: gomatrixserverlib.Event{}, - } - if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": roomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } + evJSON := gjson.Get(string(msg.Value), "new_room_event.event") + if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { + output.NewRoomEvent.Event = ev + } else { + return errors.New("unable to get new_room_event.event") + } + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 56f35cad6..57456815d 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -102,46 +103,30 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return errors.New("room version was not in sarama headers") } + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + switch output.Type { case api.OutputTypeNewRoomEvent: - output.NewRoomEvent = &api.OutputNewRoomEvent{ - Event: gomatrixserverlib.Event{}, - } - if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": roomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil + evJSON := gjson.Get(string(msg.Value), "new_room_event.event") + if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { + output.NewRoomEvent.Event = ev + } else { + return errors.New("unable to get new_room_event.event") } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) case api.OutputTypeNewInviteEvent: - output.NewInviteEvent = &api.OutputNewInviteEvent{ - Event: gomatrixserverlib.Event{}, - } - if err := output.NewInviteEvent.Event.PrepareAs(roomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": roomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil + evJSON := gjson.Get(string(msg.Value), "new_invite_event.event") + if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { + output.NewInviteEvent.Event = ev + } else { + return errors.New("unable to get new_invite_event.event") } return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: - output.RetireInviteEvent = &api.OutputRetireInviteEvent{} - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) default: log.WithField("type", output.Type).Debug(