From c81f7b6be2d0c3edb37d564bd813daf13cae8621 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 11 Jun 2020 17:59:25 +0100 Subject: [PATCH] OutputEvent now includes AddStateEvents which contain the full event of extra state events --- appservice/consumers/roomserver.go | 51 +------------ clientapi/consumers/roomserver.go | 61 +--------------- federationapi/routing/send_test.go | 6 +- federationsender/consumers/roomserver.go | 8 +- publicroomsapi/consumers/roomserver.go | 17 +---- roomserver/api/output.go | 7 ++ roomserver/internal/input_latest_events.go | 66 +++++++++++++---- syncapi/consumers/roomserver.go | 85 +--------------------- 8 files changed, 74 insertions(+), 227 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index bb4df7906..1657fe542 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -91,60 +91,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "type": ev.Type(), - }).Info("appservice received an event from roomserver") - - missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) - if err != nil { - return err - } - events := append(missingEvents, ev) + events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} + events = append(events, output.NewRoomEvent.AddStateEvents...) // Send event to any relevant application services return s.filterRoomserverEvents(context.TODO(), events) } -// lookupMissingStateEvents looks up the state events that are added by a new event, -// and returns any not already present. -func (s *OutputRoomEventConsumer) lookupMissingStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent, -) ([]gomatrixserverlib.HeaderedEvent, error) { - // Fast path if there aren't any new state events. - if len(addsStateEventIDs) == 0 { - return []gomatrixserverlib.HeaderedEvent{}, nil - } - - // Fast path if the only state event added is the event itself. - if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.HeaderedEvent{}, nil - } - - result := []gomatrixserverlib.HeaderedEvent{} - missing := []string{} - for _, id := range addsStateEventIDs { - if id != event.EventID() { - // If the event isn't the current one, add it to the list of events - // to retrieve from the roomserver - missing = append(missing, id) - } - } - - // Request the missing events from the roomserver - eventReq := api.QueryEventsByIDRequest{EventIDs: missing} - var eventResp api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { - return nil, err - } - - result = append(result, eventResp.Events...) - - return result, nil -} - // filterRoomserverEvents takes in events and decides whether any of them need // to be passed on to an external application service. It does this by checking // each namespace of each registered application service, and if there is a diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index bd8ac1dcd..698581520 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -84,63 +84,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "type": ev.Type(), - }).Info("received event from roomserver") + events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} + events = append(events, output.NewRoomEvent.AddStateEvents...) - events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event) - if err != nil { - return err - } - - return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs) -} - -// lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEventConsumer) lookupStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.Event, -) ([]gomatrixserverlib.Event, error) { - // Fast path if there aren't any new state events. - if len(addsStateEventIDs) == 0 { - // If the event is a membership update (e.g. for a profile update), it won't - // show up in AddsStateEventIDs, so we need to add it manually - if event.Type() == "m.room.member" { - return []gomatrixserverlib.Event{event}, nil - } - return nil, nil - } - - // Fast path if the only state event added is the event itself. - if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.Event{event}, nil - } - - result := []gomatrixserverlib.Event{} - missing := []string{} - for _, id := range addsStateEventIDs { - // Append the current event in the results if its ID is in the events list - if id == event.EventID() { - result = append(result, event) - } else { - // If the event isn't the current one, add it to the list of events - // to retrieve from the roomserver - missing = append(missing, id) - } - } - - // Request the missing events from the roomserver - eventReq := api.QueryEventsByIDRequest{EventIDs: missing} - var eventResp api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { - return nil, err - } - - for _, headeredEvent := range eventResp.Events { - result = append(result, headeredEvent.Event) - } - - return result, nil + return s.db.UpdateMemberships(context.TODO(), gomatrixserverlib.UnwrapEventHeaders(events), output.NewRoomEvent.RemovesStateEventIDs) } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 9081a8700..adae7c221 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -211,10 +211,10 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain( } // Query a given amount (or less) of events prior to a given set of events. -func (t *testRoomserverAPI) QueryBackfill( +func (t *testRoomserverAPI) PerformBackfill( ctx context.Context, - request *api.QueryBackfillRequest, - response *api.QueryBackfillResponse, + request *api.PerformBackfillRequest, + response *api.PerformBackfillResponse, ) error { return fmt.Errorf("not implemented") } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 5f8a555bd..adca255f7 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -131,11 +131,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event.Event) - if err != nil { - return err - } - addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents) + addStateEvents := []gomatrixserverlib.HeaderedEvent{ore.Event} + addStateEvents = append(addStateEvents, ore.AddStateEvents...) + addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addStateEvents)) if err != nil { return err } diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index c513d3b24..8bfa53569 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -78,20 +78,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "type": ev.Type(), - }).Info("received event from roomserver") - - addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs} - var addQueryRes api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil { - log.Warn(err) - return err - } - remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} var remQueryRes api.QueryEventsByIDResponse if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { @@ -100,9 +86,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } var addQueryEvents, remQueryEvents []gomatrixserverlib.Event - for _, headeredEvent := range addQueryRes.Events { + for _, headeredEvent := range output.NewRoomEvent.AddStateEvents { addQueryEvents = append(addQueryEvents, headeredEvent.Event) } + addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap()) for _, headeredEvent := range remQueryRes.Events { remQueryEvents = append(remQueryEvents, headeredEvent.Event) } diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 92a468a96..4157c19b7 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -63,6 +63,13 @@ type OutputNewRoomEvent struct { // Together with RemovesStateEventIDs this allows the receiver to keep an up to date // view of the current state of the room. AddsStateEventIDs []string `json:"adds_state_event_ids"` + // All extra newly added state events. This is only set if there are *extra* events + // other than `Event`. This can happen when forks get merged because state resolution + // may decide a bunch of state events on one branch are now valid, so they will be + // present in this list. This is useful when trying to maintain the current state of a room + // as to do so you need to include both these events and `Event`. + AddStateEvents []gomatrixserverlib.HeaderedEvent `json:"adds_state_events"` + // The state event IDs that were removed from the state of the room by this event. RemovesStateEventIDs []string `json:"removes_state_event_ids"` // The ID of the event that was output before this event. diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index aea85ca95..e69307ada 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -19,6 +19,7 @@ package internal import ( "bytes" "context" + "fmt" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" @@ -310,24 +311,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) TransactionID: u.transactionID, } - var stateEventNIDs []types.EventNID - for _, entry := range u.added { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.removed { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.stateBeforeEventRemoves { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.stateBeforeEventAdds { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] - eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs) + eventIDMap, err := u.stateEventMap() if err != nil { return nil, err } + for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } @@ -342,12 +330,60 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } ore.SendAsServer = u.sendAsServer + // include extra state events if they were added as nearly every downstream component will care about it + // and we'd rather not have them all hit QueryEventsByID at the same time! + if len(ore.AddsStateEventIDs) > 0 { + ore.AddStateEvents, err = u.extraEventsForIDs(roomVersion, ore.AddsStateEventIDs) + if err != nil { + return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) + } + } + return &api.OutputEvent{ Type: api.OutputTypeNewRoomEvent, NewRoomEvent: &ore, }, nil } +// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being +// updated. +func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { + var extraEventIDs []string + for _, e := range eventIDs { + if e == u.event.EventID() { + continue + } + extraEventIDs = append(extraEventIDs, e) + } + if len(extraEventIDs) == 0 { + return nil, nil + } + extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs) + if err != nil { + return nil, err + } + var h []gomatrixserverlib.HeaderedEvent + for _, e := range extraEvents { + h = append(h, e.Headered(roomVersion)) + } + return h, nil +} + +// retrieve an event nid -> event ID map for all events that need updating +func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) { + var stateEventNIDs []types.EventNID + var allStateEntries []types.StateEntry + allStateEntries = append(allStateEntries, u.added...) + allStateEntries = append(allStateEntries, u.removed...) + allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...) + allStateEntries = append(allStateEntries, u.stateBeforeEventAdds...) + for _, entry := range allStateEntries { + stateEventNIDs = append(stateEventNIDs, entry.EventNID) + } + stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] + return u.api.DB.EventIDs(u.ctx, stateEventNIDs) +} + type eventNIDSorter []types.EventNID func (s eventNIDSorter) Len() int { return len(s) } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 055f76603..d6f8ad73e 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -17,7 +17,6 @@ package consumers import ( "context" "encoding/json" - "fmt" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" @@ -105,17 +104,10 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( "room_version": ev.RoomVersion, }).Info("received event from roomserver") - addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev) - if err != nil { - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": msg.AddsStateEventIDs, - "del": msg.RemovesStateEventIDs, - }).Panicf("roomserver output log: state event lookup failure") - } + addsStateEvents := []gomatrixserverlib.HeaderedEvent{ev} + addsStateEvents = append(addsStateEvents, msg.AddStateEvents...) - ev, err = s.updateStateEvent(ev) + ev, err := s.updateStateEvent(ev) if err != nil { return err } @@ -185,63 +177,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( return nil } -// lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEventConsumer) lookupStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent, -) ([]gomatrixserverlib.HeaderedEvent, error) { - // Fast path if there aren't any new state events. - if len(addsStateEventIDs) == 0 { - return nil, nil - } - - // Fast path if the only state event added is the event itself. - if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.HeaderedEvent{event}, nil - } - - // Check if this is re-adding a state events that we previously processed - // If we have previously received a state event it may still be in - // our event database. - result, err := s.db.Events(context.TODO(), addsStateEventIDs) - if err != nil { - return nil, err - } - missing := missingEventsFrom(result, addsStateEventIDs) - - // Check if event itself is being added. - for _, eventID := range missing { - if eventID == event.EventID() { - result = append(result, event) - break - } - } - missing = missingEventsFrom(result, addsStateEventIDs) - - if len(missing) == 0 { - return result, nil - } - - // At this point the missing events are neither the event itself nor are - // they present in our local database. Our only option is to fetch them - // from the roomserver using the query API. - eventReq := api.QueryEventsByIDRequest{EventIDs: missing} - var eventResp api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { - return nil, err - } - - result = append(result, eventResp.Events...) - missing = missingEventsFrom(result, addsStateEventIDs) - - if len(missing) != 0 { - return nil, fmt.Errorf( - "missing %d state events IDs at event %q", len(missing), event.EventID(), - ) - } - - return result, nil -} - func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) { var stateKey string if event.StateKey() == nil { @@ -270,17 +205,3 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Heade event.Event, err = event.SetUnsigned(prev) return event, err } - -func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string { - have := map[string]bool{} - for _, event := range events { - have[event.EventID()] = true - } - var missing []string - for _, eventID := range required { - if !have[eventID] { - missing = append(missing, eventID) - } - } - return missing -}