Get adding state right

This commit is contained in:
Kegan Dougal 2020-06-11 19:24:28 +01:00
parent 942491cef4
commit 11d8b10c10
5 changed files with 28 additions and 8 deletions

View file

@ -84,8 +84,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} return s.db.UpdateMemberships(
events = append(events, output.NewRoomEvent.AddStateEvents...) context.TODO(),
gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()),
return s.db.UpdateMemberships(context.TODO(), gomatrixserverlib.UnwrapEventHeaders(events), output.NewRoomEvent.RemovesStateEventIDs) output.NewRoomEvent.RemovesStateEventIDs,
)
} }

View file

@ -131,7 +131,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddStateEvents)) addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState()))
if err != nil { if err != nil {
return err return err
} }

View file

@ -86,7 +86,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
} }
var addQueryEvents, remQueryEvents []gomatrixserverlib.Event var addQueryEvents, remQueryEvents []gomatrixserverlib.Event
for _, headeredEvent := range output.NewRoomEvent.AddStateEvents { for _, headeredEvent := range output.NewRoomEvent.AddsState() {
addQueryEvents = append(addQueryEvents, headeredEvent.Event) addQueryEvents = append(addQueryEvents, headeredEvent.Event)
} }
addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap()) addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap())

View file

@ -119,6 +119,26 @@ type OutputNewRoomEvent struct {
TransactionID *TransactionID `json:"transaction_id"` TransactionID *TransactionID `json:"transaction_id"`
} }
// AddsState returns all added state events from this event.
//
// This function is needed because `AddStateEvents` will not include a copy of
// the original event to save space, so you cannot use that slice alone.
// Instead, use this function which will add the original event if it is present
// in `AddsStateEventIDs`.
func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent {
includeOutputEvent := false
for _, id := range ore.AddsStateEventIDs {
if id == ore.Event.EventID() {
includeOutputEvent = true
break
}
}
if !includeOutputEvent {
return ore.AddStateEvents
}
return append(ore.AddStateEvents, ore.Event)
}
// An OutputNewInviteEvent is written whenever an invite becomes active. // An OutputNewInviteEvent is written whenever an invite becomes active.
// Invite events can be received outside of an existing room so have to be // Invite events can be received outside of an existing room so have to be
// tracked separately from the room events themselves. // tracked separately from the room events themselves.

View file

@ -104,8 +104,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
"room_version": ev.RoomVersion, "room_version": ev.RoomVersion,
}).Info("received event from roomserver") }).Info("received event from roomserver")
addsStateEvents := []gomatrixserverlib.HeaderedEvent{ev} addsStateEvents := msg.AddsState()
addsStateEvents = append(addsStateEvents, msg.AddStateEvents...)
ev, err := s.updateStateEvent(ev) ev, err := s.updateStateEvent(ev)
if err != nil { if err != nil {