diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 04eb3d992..3e47dee56 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -101,11 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { "type": ev.Type(), }).Info("appservice received an event from roomserver") - missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event) if err != nil { return err } - events := append(missingEvents, ev) + events := append(missingEvents, ev.Event) // Send event to any relevant application services return s.filterRoomserverEvents(context.TODO(), events) diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index e90d56693..6d5bb09a6 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -91,7 +91,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { "type": ev.Type(), }).Info("received event from roomserver") - events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event) if err != nil { return err } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 39d8d62db..2d4b1b2ef 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -109,7 +109,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) + addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event.Event) if err != nil { return err } @@ -155,7 +155,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // Send the event. return s.queues.SendEvent( - &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, + &ore.Event.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, ) } @@ -178,7 +178,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ) - combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event) + combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event.Event) if err != nil { return nil, err } diff --git a/roomserver/api/output.go b/roomserver/api/output.go index c09d5a1e5..4e7adff79 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -54,7 +54,7 @@ type OutputEvent struct { // prev_events. type OutputNewRoomEvent struct { // The Event. - Event gomatrixserverlib.Event `json:"event"` + Event gomatrixserverlib.HeaderedEvent `json:"event"` // The latest events in the room after this event. // This can be used to set the prev events for new events in the room. // This also can be used to get the full current state after this event. @@ -117,7 +117,7 @@ type OutputNewRoomEvent struct { // tracked separately from the room events themselves. type OutputNewInviteEvent struct { // The "m.room.member" invite event. - Event gomatrixserverlib.Event `json:"event"` + Event gomatrixserverlib.HeaderedEvent `json:"event"` } // An OutputRetireInviteEvent is written whenever an existing invite is no longer diff --git a/roomserver/input/latest_events.go b/roomserver/input/latest_events.go index f9fd1d5d4..9a99ad76f 100644 --- a/roomserver/input/latest_events.go +++ b/roomserver/input/latest_events.go @@ -253,8 +253,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) latestEventIDs[i] = u.latest[i].EventID } + // TODO: Room version here + roomVersion := gomatrixserverlib.RoomVersionV1 + ore := api.OutputNewRoomEvent{ - Event: u.event, + Event: u.event.Headered(roomVersion), LastSentEventID: u.lastEventIDSent, LatestEventIDs: latestEventIDs, TransactionID: u.transactionID, diff --git a/roomserver/input/membership.go b/roomserver/input/membership.go index 841c5fec6..f2ac3b510 100644 --- a/roomserver/input/membership.go +++ b/roomserver/input/membership.go @@ -136,13 +136,14 @@ func updateToInviteMembership( return nil, err } if needsSending { + roomVersion := gomatrixserverlib.RoomVersionV1 // We notify the consumers using a special event even though we will // notify them about the change in current state as part of the normal // room event stream. This ensures that the consumers only have to // consider a single stream of events when determining whether a user // is invited, rather than having to combine multiple streams themselves. onie := api.OutputNewInviteEvent{ - Event: *add, + Event: (*add).Headered(roomVersion), } updates = append(updates, api.OutputEvent{ Type: api.OutputTypeNewInviteEvent, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 976dd8e8f..5aaff5627 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -98,7 +98,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onNewRoomEvent( ctx context.Context, msg api.OutputNewRoomEvent, ) error { - ev := msg.Event + ev := msg.Event.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), @@ -153,7 +153,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, ) error { - pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) + pduPos, err := s.db.AddInviteEvent(ctx, msg.Event.Event) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ @@ -163,7 +163,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( }).Panicf("roomserver output log: write invite failure") return nil } - s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos}) + s.notifier.OnNewEvent(&msg.Event.Event, "", nil, types.PaginationToken{PDUPosition: pduPos}) return nil }