diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 6d3ea808f..aa6a47e9e 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -94,7 +94,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event + // TODO: Is this trusted here? + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion) + if err != nil { + return err + } + log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index a65281514..4e9912513 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -84,7 +84,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event + // TODO: Is this trusted here? + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion) + if err != nil { + return err + } + log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 4568f44dc..e348b0188 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -85,7 +85,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { ) return nil } - ev := &output.NewRoomEvent.Event + + // TODO: Is this trusted here? + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion) + if err != nil { + return err + } + log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), @@ -109,7 +115,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) + // TODO: Is this trusted here? + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(ore.Event, false, ore.RoomVersion) + if err != nil { + return err + } + + addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev) if err != nil { return err } @@ -124,9 +136,9 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // talking to the roomserver oldJoinedHosts, err := s.db.UpdateRoom( context.TODO(), - ore.Event.RoomID(), + ev.RoomID(), ore.LastSentEventID, - ore.Event.EventID(), + ev.EventID(), addsJoinedHosts, ore.RemovesStateEventIDs, ) @@ -155,7 +167,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // Send the event. return s.queues.SendEvent( - &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, + &ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, ) } @@ -171,6 +183,12 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err func (s *OutputRoomEventConsumer) joinedHostsAtEvent( ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, ) ([]gomatrixserverlib.ServerName, error) { + // TODO: Is this trusted here? + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(ore.Event, false, ore.RoomVersion) + if err != nil { + return nil, err + } + // Combine the delta into a single delta so that the adds and removes can // cancel each other out. This should reduce the number of times we need // to fetch a state event from the room server. @@ -178,7 +196,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ) - combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event) + combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev) if err != nil { return nil, err } diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index 9a817735a..be9602b7e 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -77,7 +78,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event + // TODO: Is this trusted here? + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion) + if err != nil { + return err + } + log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), diff --git a/roomserver/api/output.go b/roomserver/api/output.go index c09d5a1e5..5abbed3b4 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -53,8 +53,10 @@ type OutputEvent struct { // event IDs in the room. The state before an event is the state after its // prev_events. type OutputNewRoomEvent struct { - // The Event. - Event gomatrixserverlib.Event `json:"event"` + // The event. + Event []byte `json:"event"` + // The event version. + RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` // The latest events in the room after this event. // This can be used to set the prev events for new events in the room. // This also can be used to get the full current state after this event. diff --git a/roomserver/input/latest_events.go b/roomserver/input/latest_events.go index cf20e2c26..424286647 100644 --- a/roomserver/input/latest_events.go +++ b/roomserver/input/latest_events.go @@ -67,10 +67,15 @@ func updateLatestEvents( } }() + roomVersion, err := db.GetRoomVersionForRoom(ctx, roomNID) + if err != nil { + return err + } + u := latestEventsUpdater{ ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID, - stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, - transactionID: transactionID, + roomVersion: roomVersion, stateAtEvent: stateAtEvent, event: event, + sendAsServer: sendAsServer, transactionID: transactionID, } if err = u.doUpdateLatestEvents(); err != nil { return err @@ -90,6 +95,7 @@ type latestEventsUpdater struct { updater types.RoomRecentEventsUpdater ow OutputRoomEventWriter roomNID types.RoomNID + roomVersion gomatrixserverlib.RoomVersion stateAtEvent types.StateAtEvent event gomatrixserverlib.Event transactionID *api.TransactionID @@ -256,8 +262,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) latestEventIDs[i] = u.latest[i].EventID } + eventJSON, err := u.event.MarshalJSON() + if err != nil { + return nil, err + } + ore := api.OutputNewRoomEvent{ - Event: u.event, + Event: eventJSON, LastSentEventID: u.lastEventIDSent, LatestEventIDs: latestEventIDs, TransactionID: u.transactionID, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 5dbef4b7d..e839a056f 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -98,7 +98,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onNewRoomEvent( ctx context.Context, msg api.OutputNewRoomEvent, ) error { - ev := msg.Event + // TODO: Is this trusted here? + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(msg.Event, false, msg.RoomVersion) + if err != nil { + return err + } + log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(),