diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index 698581520..caa028ba3 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -84,8 +84,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} - events = append(events, output.NewRoomEvent.AddStateEvents...) - - return s.db.UpdateMemberships(context.TODO(), gomatrixserverlib.UnwrapEventHeaders(events), output.NewRoomEvent.RemovesStateEventIDs) + return s.db.UpdateMemberships( + context.TODO(), + gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()), + output.NewRoomEvent.RemovesStateEventIDs, + ) } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index ff5230161..a15937f9e 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -131,7 +131,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddStateEvents)) + addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState())) if err != nil { return err } diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index 8bfa53569..ba187cb11 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -86,7 +86,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } var addQueryEvents, remQueryEvents []gomatrixserverlib.Event - for _, headeredEvent := range output.NewRoomEvent.AddStateEvents { + for _, headeredEvent := range output.NewRoomEvent.AddsState() { addQueryEvents = append(addQueryEvents, headeredEvent.Event) } addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap()) diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 4157c19b7..2bbd97af8 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -119,6 +119,26 @@ type OutputNewRoomEvent struct { TransactionID *TransactionID `json:"transaction_id"` } +// AddsState returns all added state events from this event. +// +// This function is needed because `AddStateEvents` will not include a copy of +// the original event to save space, so you cannot use that slice alone. +// Instead, use this function which will add the original event if it is present +// in `AddsStateEventIDs`. +func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent { + includeOutputEvent := false + for _, id := range ore.AddsStateEventIDs { + if id == ore.Event.EventID() { + includeOutputEvent = true + break + } + } + if !includeOutputEvent { + return ore.AddStateEvents + } + return append(ore.AddStateEvents, ore.Event) +} + // An OutputNewInviteEvent is written whenever an invite becomes active. // Invite events can be received outside of an existing room so have to be // tracked separately from the room events themselves. diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index d6f8ad73e..135976823 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -104,8 +104,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( "room_version": ev.RoomVersion, }).Info("received event from roomserver") - addsStateEvents := []gomatrixserverlib.HeaderedEvent{ev} - addsStateEvents = append(addsStateEvents, msg.AddStateEvents...) + addsStateEvents := msg.AddsState() ev, err := s.updateStateEvent(ev) if err != nil {