OutputEvent now includes AddStateEvents which contain the full event of extra state events

This commit is contained in:
Kegan Dougal 2020-06-11 17:59:25 +01:00
parent 0459285e33
commit c81f7b6be2
8 changed files with 74 additions and 227 deletions

View file

@ -91,60 +91,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
ev := output.NewRoomEvent.Event events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
log.WithFields(log.Fields{ events = append(events, output.NewRoomEvent.AddStateEvents...)
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
}).Info("appservice received an event from roomserver")
missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil {
return err
}
events := append(missingEvents, ev)
// Send event to any relevant application services // Send event to any relevant application services
return s.filterRoomserverEvents(context.TODO(), events) return s.filterRoomserverEvents(context.TODO(), events)
} }
// lookupMissingStateEvents looks up the state events that are added by a new event,
// and returns any not already present.
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
) ([]gomatrixserverlib.HeaderedEvent, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
return []gomatrixserverlib.HeaderedEvent{}, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.HeaderedEvent{}, nil
}
result := []gomatrixserverlib.HeaderedEvent{}
missing := []string{}
for _, id := range addsStateEventIDs {
if id != event.EventID() {
// If the event isn't the current one, add it to the list of events
// to retrieve from the roomserver
missing = append(missing, id)
}
}
// Request the missing events from the roomserver
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err
}
result = append(result, eventResp.Events...)
return result, nil
}
// filterRoomserverEvents takes in events and decides whether any of them need // filterRoomserverEvents takes in events and decides whether any of them need
// to be passed on to an external application service. It does this by checking // to be passed on to an external application service. It does this by checking
// each namespace of each registered application service, and if there is a // each namespace of each registered application service, and if there is a

View file

@ -84,63 +84,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
ev := output.NewRoomEvent.Event events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
log.WithFields(log.Fields{ events = append(events, output.NewRoomEvent.AddStateEvents...)
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
}).Info("received event from roomserver")
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event) return s.db.UpdateMemberships(context.TODO(), gomatrixserverlib.UnwrapEventHeaders(events), output.NewRoomEvent.RemovesStateEventIDs)
if err != nil {
return err
}
return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs)
}
// lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
// If the event is a membership update (e.g. for a profile update), it won't
// show up in AddsStateEventIDs, so we need to add it manually
if event.Type() == "m.room.member" {
return []gomatrixserverlib.Event{event}, nil
}
return nil, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.Event{event}, nil
}
result := []gomatrixserverlib.Event{}
missing := []string{}
for _, id := range addsStateEventIDs {
// Append the current event in the results if its ID is in the events list
if id == event.EventID() {
result = append(result, event)
} else {
// If the event isn't the current one, add it to the list of events
// to retrieve from the roomserver
missing = append(missing, id)
}
}
// Request the missing events from the roomserver
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err
}
for _, headeredEvent := range eventResp.Events {
result = append(result, headeredEvent.Event)
}
return result, nil
} }

View file

@ -211,10 +211,10 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain(
} }
// Query a given amount (or less) of events prior to a given set of events. // Query a given amount (or less) of events prior to a given set of events.
func (t *testRoomserverAPI) QueryBackfill( func (t *testRoomserverAPI) PerformBackfill(
ctx context.Context, ctx context.Context,
request *api.QueryBackfillRequest, request *api.PerformBackfillRequest,
response *api.QueryBackfillResponse, response *api.PerformBackfillResponse,
) error { ) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }

View file

@ -131,11 +131,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event.Event) addStateEvents := []gomatrixserverlib.HeaderedEvent{ore.Event}
if err != nil { addStateEvents = append(addStateEvents, ore.AddStateEvents...)
return err addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addStateEvents))
}
addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents)
if err != nil { if err != nil {
return err return err
} }

View file

@ -78,20 +78,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
}).Info("received event from roomserver")
addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs}
var addQueryRes api.QueryEventsByIDResponse
if err := s.rsAPI.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil {
log.Warn(err)
return err
}
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
var remQueryRes api.QueryEventsByIDResponse var remQueryRes api.QueryEventsByIDResponse
if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
@ -100,9 +86,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
} }
var addQueryEvents, remQueryEvents []gomatrixserverlib.Event var addQueryEvents, remQueryEvents []gomatrixserverlib.Event
for _, headeredEvent := range addQueryRes.Events { for _, headeredEvent := range output.NewRoomEvent.AddStateEvents {
addQueryEvents = append(addQueryEvents, headeredEvent.Event) addQueryEvents = append(addQueryEvents, headeredEvent.Event)
} }
addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap())
for _, headeredEvent := range remQueryRes.Events { for _, headeredEvent := range remQueryRes.Events {
remQueryEvents = append(remQueryEvents, headeredEvent.Event) remQueryEvents = append(remQueryEvents, headeredEvent.Event)
} }

View file

@ -63,6 +63,13 @@ type OutputNewRoomEvent struct {
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date // Together with RemovesStateEventIDs this allows the receiver to keep an up to date
// view of the current state of the room. // view of the current state of the room.
AddsStateEventIDs []string `json:"adds_state_event_ids"` AddsStateEventIDs []string `json:"adds_state_event_ids"`
// All extra newly added state events. This is only set if there are *extra* events
// other than `Event`. This can happen when forks get merged because state resolution
// may decide a bunch of state events on one branch are now valid, so they will be
// present in this list. This is useful when trying to maintain the current state of a room
// as to do so you need to include both these events and `Event`.
AddStateEvents []gomatrixserverlib.HeaderedEvent `json:"adds_state_events"`
// The state event IDs that were removed from the state of the room by this event. // The state event IDs that were removed from the state of the room by this event.
RemovesStateEventIDs []string `json:"removes_state_event_ids"` RemovesStateEventIDs []string `json:"removes_state_event_ids"`
// The ID of the event that was output before this event. // The ID of the event that was output before this event.

View file

@ -19,6 +19,7 @@ package internal
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -310,24 +311,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
TransactionID: u.transactionID, TransactionID: u.transactionID,
} }
var stateEventNIDs []types.EventNID eventIDMap, err := u.stateEventMap()
for _, entry := range u.added {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
for _, entry := range u.removed {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
for _, entry := range u.stateBeforeEventRemoves {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
for _, entry := range u.stateBeforeEventAdds {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, entry := range u.added { for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
} }
@ -342,12 +330,60 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
} }
ore.SendAsServer = u.sendAsServer ore.SendAsServer = u.sendAsServer
// include extra state events if they were added as nearly every downstream component will care about it
// and we'd rather not have them all hit QueryEventsByID at the same time!
if len(ore.AddsStateEventIDs) > 0 {
ore.AddStateEvents, err = u.extraEventsForIDs(roomVersion, ore.AddsStateEventIDs)
if err != nil {
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
}
}
return &api.OutputEvent{ return &api.OutputEvent{
Type: api.OutputTypeNewRoomEvent, Type: api.OutputTypeNewRoomEvent,
NewRoomEvent: &ore, NewRoomEvent: &ore,
}, nil }, nil
} }
// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being
// updated.
func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
var extraEventIDs []string
for _, e := range eventIDs {
if e == u.event.EventID() {
continue
}
extraEventIDs = append(extraEventIDs, e)
}
if len(extraEventIDs) == 0 {
return nil, nil
}
extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs)
if err != nil {
return nil, err
}
var h []gomatrixserverlib.HeaderedEvent
for _, e := range extraEvents {
h = append(h, e.Headered(roomVersion))
}
return h, nil
}
// retrieve an event nid -> event ID map for all events that need updating
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
var stateEventNIDs []types.EventNID
var allStateEntries []types.StateEntry
allStateEntries = append(allStateEntries, u.added...)
allStateEntries = append(allStateEntries, u.removed...)
allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
allStateEntries = append(allStateEntries, u.stateBeforeEventAdds...)
for _, entry := range allStateEntries {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
return u.api.DB.EventIDs(u.ctx, stateEventNIDs)
}
type eventNIDSorter []types.EventNID type eventNIDSorter []types.EventNID
func (s eventNIDSorter) Len() int { return len(s) } func (s eventNIDSorter) Len() int { return len(s) }

View file

@ -17,7 +17,6 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -105,17 +104,10 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
"room_version": ev.RoomVersion, "room_version": ev.RoomVersion,
}).Info("received event from roomserver") }).Info("received event from roomserver")
addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev) addsStateEvents := []gomatrixserverlib.HeaderedEvent{ev}
if err != nil { addsStateEvents = append(addsStateEvents, msg.AddStateEvents...)
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": msg.AddsStateEventIDs,
"del": msg.RemovesStateEventIDs,
}).Panicf("roomserver output log: state event lookup failure")
}
ev, err = s.updateStateEvent(ev) ev, err := s.updateStateEvent(ev)
if err != nil { if err != nil {
return err return err
} }
@ -185,63 +177,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
return nil return nil
} }
// lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
) ([]gomatrixserverlib.HeaderedEvent, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
return nil, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.HeaderedEvent{event}, nil
}
// Check if this is re-adding a state events that we previously processed
// If we have previously received a state event it may still be in
// our event database.
result, err := s.db.Events(context.TODO(), addsStateEventIDs)
if err != nil {
return nil, err
}
missing := missingEventsFrom(result, addsStateEventIDs)
// Check if event itself is being added.
for _, eventID := range missing {
if eventID == event.EventID() {
result = append(result, event)
break
}
}
missing = missingEventsFrom(result, addsStateEventIDs)
if len(missing) == 0 {
return result, nil
}
// At this point the missing events are neither the event itself nor are
// they present in our local database. Our only option is to fetch them
// from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err
}
result = append(result, eventResp.Events...)
missing = missingEventsFrom(result, addsStateEventIDs)
if len(missing) != 0 {
return nil, fmt.Errorf(
"missing %d state events IDs at event %q", len(missing), event.EventID(),
)
}
return result, nil
}
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) { func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
var stateKey string var stateKey string
if event.StateKey() == nil { if event.StateKey() == nil {
@ -270,17 +205,3 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Heade
event.Event, err = event.SetUnsigned(prev) event.Event, err = event.SetUnsigned(prev)
return event, err return event, err
} }
func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string {
have := map[string]bool{}
for _, event := range events {
have[event.EventID()] = true
}
var missing []string
for _, eventID := range required {
if !have[eventID] {
missing = append(missing, eventID)
}
}
return missing
}