Implement /get_missing_events (#1022)

* WIP get_missing_events work

* More WIP get_missing_events work

* First working /get_missing_events implementation

Flakey currently due to racing between /sync and /send

* Final tweaks

* Remove log lines

* Linting

* go mod tidy

* Clamp min depth to 0

* sort events by depth because sytest makes me sad

Specifically I think it's
4172585c25/lib/SyTest/Federation/Client.pm (L265)
to blame here.
This commit is contained in:
Kegsay 2020-05-12 16:24:28 +01:00 committed by GitHub
parent 32624697fd
commit ce5dfbebf9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 675 additions and 198 deletions

View file

@ -17,6 +17,7 @@ package routing
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sort"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -275,6 +276,12 @@ func SendJoin(
} }
} }
// sort events deterministically by depth (lower is earlier)
// We also do this because sytest's basic federation server isn't good at using the correct
// state if these lists are randomised, resulting in flakey tests. :(
sort.Sort(eventsByDepth(stateAndAuthChainResponse.StateEvents))
sort.Sort(eventsByDepth(stateAndAuthChainResponse.AuthChainEvents))
// https://matrix.org/docs/spec/server_server/latest#put-matrix-federation-v1-send-join-roomid-eventid // https://matrix.org/docs/spec/server_server/latest#put-matrix-federation-v1-send-join-roomid-eventid
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
@ -285,3 +292,15 @@ func SendJoin(
}, },
} }
} }
type eventsByDepth []gomatrixserverlib.HeaderedEvent
func (e eventsByDepth) Len() int {
return len(e)
}
func (e eventsByDepth) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
func (e eventsByDepth) Less(i, j int) bool {
return e[i].Depth() < e[j].Depth()
}

View file

@ -226,6 +226,28 @@ func Setup(
}, },
)).Methods(http.MethodGet) )).Methods(http.MethodGet)
v1fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI(
"federation_send_join", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
if err != nil {
return util.ErrorResponse(err)
}
roomID := vars["roomID"]
eventID := vars["eventID"]
res := SendJoin(
httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID,
)
return util.JSONResponse{
Headers: res.Headers,
Code: res.Code,
JSON: []interface{}{
res.Code, res.JSON,
},
}
},
)).Methods(http.MethodPut)
v2fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI( v2fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI(
"federation_send_join", cfg.Matrix.ServerName, keys, "federation_send_join", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {

View file

@ -48,6 +48,8 @@ func Send(
eduProducer: eduProducer, eduProducer: eduProducer,
keys: keys, keys: keys,
federation: federation, federation: federation,
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool),
} }
var txnEvents struct { var txnEvents struct {
@ -105,6 +107,11 @@ type txnReq struct {
eduProducer *producers.EDUServerProducer eduProducer *producers.EDUServerProducer
keys gomatrixserverlib.JSONVerifier keys gomatrixserverlib.JSONVerifier
federation txnFederationClient federation txnFederationClient
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
// new events which the roomserver does not know about
newEvents map[string]bool
} }
// A subset of FederationClient functionality that txn requires. Useful for testing. // A subset of FederationClient functionality that txn requires. Useful for testing.
@ -114,6 +121,8 @@ type txnFederationClient interface {
) )
LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error)
GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
} }
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
@ -148,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Process the events. // Process the events.
for _, e := range pdus { for _, e := range pdus {
err := t.processEvent(e.Unwrap()) err := t.processEvent(e.Unwrap(), true)
if err != nil { if err != nil {
// If the error is due to the event itself being bad then we skip // If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the // it and move onto the next event. We report an error so that the
@ -168,7 +177,9 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
switch err.(type) { switch err.(type) {
case roomNotFoundError: case roomNotFoundError:
case *gomatrixserverlib.NotAllowed: case *gomatrixserverlib.NotAllowed:
case missingPrevEventsError:
default: default:
util.GetLogger(t.context).Warnf("Processing %s failed: %s", e.EventID(), err)
// Any other error should be the result of a temporary error in // Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely. // our server so we should bail processing the transaction entirely.
return nil, err return nil, err
@ -197,12 +208,30 @@ type verifySigError struct {
eventID string eventID string
err error err error
} }
type missingPrevEventsError struct {
eventID string
err error
}
func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) } func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) }
func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) } func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) }
func (e verifySigError) Error() string { func (e verifySigError) Error() string {
return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err) return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err)
} }
func (e missingPrevEventsError) Error() string {
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
}
func (t *txnReq) haveEventIDs() map[string]bool {
result := make(map[string]bool, len(t.haveEvents))
for eventID := range t.haveEvents {
if t.newEvents[eventID] {
continue
}
result[eventID] = true
}
return result
}
func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for _, e := range edus { for _, e := range edus {
@ -227,7 +256,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
} }
} }
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error {
prevEventIDs := e.PrevEventIDs() prevEventIDs := e.PrevEventIDs()
// Fetch the state needed to authenticate the event. // Fetch the state needed to authenticate the event.
@ -253,21 +282,14 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
} }
if !stateResp.PrevEventsExist { if !stateResp.PrevEventsExist {
return t.processEventWithMissingState(e, stateResp.RoomVersion) return t.processEventWithMissingState(e, stateResp.RoomVersion, isInboundTxn)
} }
// Check that the event is allowed by the state at the event. // Check that the event is allowed by the state at the event.
var events []gomatrixserverlib.Event if err := checkAllowedByState(e, gomatrixserverlib.UnwrapEventHeaders(stateResp.StateEvents)); err != nil {
for _, headeredEvent := range stateResp.StateEvents {
events = append(events, headeredEvent.Unwrap())
}
if err := checkAllowedByState(e, events); err != nil {
return err return err
} }
// TODO: Check that the roomserver has a copy of all of the auth_events.
// TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver // pass the event to the roomserver
_, err := t.producer.SendEvents( _, err := t.producer.SendEvents(
t.context, t.context,
@ -291,7 +313,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
return gomatrixserverlib.Allowed(e, &authUsingState) return gomatrixserverlib.Allowed(e, &authUsingState)
} }
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error {
// We are missing the previous events for this events. // We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the // This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap: // room. There two ways that we can handle such a gap:
@ -306,49 +328,315 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// event ids and then use /event to fetch the individual events. // event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may // However not all version of synapse support /state_ids so you may
// need to fallback to /state. // need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
// Attempt to fetch the missing state using /state_ids and /events // Attempt to fill in the gap using /get_missing_events
respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion) // This will either:
if err != nil { // - fill in the gap completely then process event `e` returning no backwards extremity
// Fallback to /state // - fail to fill in the gap and tell us to terminate the transaction err=not nil
util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state") // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
respState, err = t.lookupMissingStateViaState(e, roomVersion) backwardsExtremity, err := t.getMissingEvents(e, roomVersion, isInboundTxn)
if err != nil { if err != nil {
return err return err
} }
if backwardsExtremity == nil {
// we filled in the gap!
return nil
} }
// Check that the event is allowed by the state. // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
retryAllowedState: // security: we have to do state resolution on the new backwards extremity (TODO: WHY)
if err := checkAllowedByState(e, respState.StateEvents); err != nil { // Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
switch missing := err.(type) { // the state AFTER all the prev_events for this event, then mix in our current room state and apply state resolution
case gomatrixserverlib.MissingAuthEventError: // to that to get the state before the event.
// An auth event was missing so let's look up that event over federation var states []*gomatrixserverlib.RespState
for _, s := range respState.StateEvents { needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
if s.EventID() != missing.AuthEventID { for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
continue var prevState *gomatrixserverlib.RespState
prevState, err = t.lookupStateAfterEvent(roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
if err != nil {
util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return err
} }
err = t.processEventWithMissingState(s, roomVersion) states = append(states, prevState)
// If there was no error retrieving the event from federation then
// we assume that it succeeded, so retry the original state check
if err == nil {
goto retryAllowedState
} }
// mix in the current room state
currState, err := t.lookupCurrentState(backwardsExtremity)
if err != nil {
util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup current room state")
return err
} }
default: states = append(states, currState)
} resolvedState, err := t.resolveStatesAndCheck(roomVersion, states, backwardsExtremity)
if err != nil {
util.GetLogger(t.context).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
return err return err
} }
// pass the event along with the state to the roomserver using a background context so we don't // pass the event along with the state to the roomserver using a background context so we don't
// needlessly expire // needlessly expire
return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs) return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs())
} }
func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) {
// try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(roomID, eventID, needed)
if respState != nil {
return respState, nil
}
respState, err := t.lookupStateBeforeEvent(roomVersion, roomID, eventID)
if err != nil {
return nil, err
}
// fetch the event we're missing and add it to the pile
h, err := t.lookupEvent(roomVersion, eventID, false)
if err != nil {
return nil, err
}
t.haveEvents[h.EventID()] = h
if h.StateKey() != nil {
addedToState := false
for i := range respState.StateEvents {
se := respState.StateEvents[i]
if se.Type() == h.Type() && se.StateKeyEquals(*h.StateKey()) {
respState.StateEvents[i] = h.Unwrap()
addedToState = true
break
}
}
if !addedToState {
respState.StateEvents = append(respState.StateEvents, h.Unwrap())
}
}
return respState, nil
}
func (t *txnReq) lookupStateAfterEventLocally(roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState {
var res api.QueryStateAfterEventsResponse
err := t.rsAPI.QueryStateAfterEvents(t.context, &api.QueryStateAfterEventsRequest{
RoomID: roomID,
PrevEventIDs: []string{eventID},
StateToFetch: needed,
}, &res)
if err != nil || !res.PrevEventsExist {
util.GetLogger(t.context).WithError(err).Warnf("failed to query state after %s locally", eventID)
return nil
}
for i, ev := range res.StateEvents {
t.haveEvents[ev.EventID()] = &res.StateEvents[i]
}
var authEvents []gomatrixserverlib.Event
missingAuthEvents := make(map[string]bool)
for _, ev := range res.StateEvents {
for _, ae := range ev.AuthEventIDs() {
aev, ok := t.haveEvents[ae]
if ok {
authEvents = append(authEvents, aev.Unwrap())
} else {
missingAuthEvents[ae] = true
}
}
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
var missingEventList []string
for evID := range missingAuthEvents {
missingEventList = append(missingEventList, evID)
}
queryReq := api.QueryEventsByIDRequest{
EventIDs: missingEventList,
}
util.GetLogger(t.context).Infof("Fetching missing auth events: %v", missingEventList)
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
return nil
}
for i := range queryRes.Events {
evID := queryRes.Events[i].EventID()
t.haveEvents[evID] = &queryRes.Events[i]
authEvents = append(authEvents, queryRes.Events[i].Unwrap())
}
evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
return &gomatrixserverlib.RespState{
StateEvents: evs,
AuthEvents: authEvents,
}
}
func (t *txnReq) lookupCurrentState(newEvent *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
// Ask the roomserver for information about this room
queryReq := api.QueryLatestEventsAndStateRequest{
RoomID: newEvent.RoomID(),
StateToFetch: gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*newEvent}).Tuples(),
}
var queryRes api.QueryLatestEventsAndStateResponse
if err := t.rsAPI.QueryLatestEventsAndState(t.context, &queryReq, &queryRes); err != nil {
return nil, fmt.Errorf("lookupCurrentState rsAPI.QueryLatestEventsAndState: %w", err)
}
evs := gomatrixserverlib.UnwrapEventHeaders(queryRes.StateEvents)
return &gomatrixserverlib.RespState{
StateEvents: evs,
AuthEvents: evs,
}, nil
}
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
// the server supports.
func (t *txnReq) lookupStateBeforeEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
respState *gomatrixserverlib.RespState, err error) { respState *gomatrixserverlib.RespState, err error) {
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)
util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s", eventID)
// Attempt to fetch the missing state using /state_ids and /events
respState, err = t.lookupMissingStateViaStateIDs(roomID, eventID, roomVersion)
if err != nil {
// Fallback to /state
util.GetLogger(t.context).WithError(err).Warn("lookupStateBeforeEvent failed to /state_ids, falling back to /state")
respState, err = t.lookupMissingStateViaState(roomID, eventID, roomVersion)
}
return
}
func (t *txnReq) resolveStatesAndCheck(roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
var authEventList []gomatrixserverlib.Event
var stateEventList []gomatrixserverlib.Event
for _, state := range states {
authEventList = append(authEventList, state.AuthEvents...)
stateEventList = append(stateEventList, state.StateEvents...)
}
resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(roomVersion, stateEventList, authEventList)
if err != nil {
return nil, err
}
// apply the current event
retryAllowedState:
if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
h, err2 := t.lookupEvent(roomVersion, missing.AuthEventID, true)
if err2 != nil {
return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err2)
}
util.GetLogger(t.context).Infof("fetched event %s", missing.AuthEventID)
resolvedStateEvents = append(resolvedStateEvents, h.Unwrap())
goto retryAllowedState
default:
}
return nil, err
}
return &gomatrixserverlib.RespState{
AuthEvents: authEventList,
StateEvents: resolvedStateEvents,
}, nil
}
// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should
// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
// This means that we may recursively call this function, as we spider back up prev_events to the min depth.
func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
if !isInboundTxn {
// we've recursed here, so just take a state snapshot please!
return &e, nil
}
logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
// query latest events (our trusted forward extremities)
req := api.QueryLatestEventsAndStateRequest{
RoomID: e.RoomID(),
StateToFetch: needed.Tuples(),
}
var res api.QueryLatestEventsAndStateResponse
if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil {
logger.WithError(err).Warn("Failed to query latest events")
return &e, nil
}
latestEvents := make([]string, len(res.LatestEvents))
for i := range res.LatestEvents {
latestEvents[i] = res.LatestEvents[i].EventID
}
// this server just sent us an event for which we do not know its prev_events - ask that server for those prev_events.
minDepth := int(res.Depth) - 20
if minDepth < 0 {
minDepth = 0
}
missingResp, err := t.federation.LookupMissingEvents(t.context, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20,
// synapse uses the min depth they've ever seen in that room
MinDepth: minDepth,
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
EarliestEvents: latestEvents,
// The event IDs to retrieve the previous events for.
LatestEvents: []string{e.EventID()},
}, roomVersion)
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
// There's 2 scenarios to consider:
// - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true)
// - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false)
// In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room,
// as it was called in response to an inbound txn which had it as a prev_event.
// In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad
// because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked.
// https://github.com/matrix-org/synapse/pull/3456
// https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
// For now, we do not allow Case B, so reject the event.
if err != nil {
logger.WithError(err).Errorf(
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.Origin,
)
return nil, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
}
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
// topologically sort and sanity check that we are making forward progress
newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
shouldHaveSomeEventIDs := e.PrevEventIDs()
hasPrevEvent := false
Event:
for _, pe := range shouldHaveSomeEventIDs {
for _, ev := range newEvents {
if ev.EventID() == pe {
hasPrevEvent = true
break Event
}
}
}
if !hasPrevEvent {
err = fmt.Errorf("called /get_missing_events but server %s didn't return any prev_events with IDs %v", t.Origin, shouldHaveSomeEventIDs)
logger.WithError(err).Errorf(
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.Origin,
)
return nil, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
}
// process the missing events then the event which started this whole thing
for _, ev := range append(newEvents, e) {
err := t.processEvent(ev, false)
if err != nil {
return nil, err
}
}
// we processed everything!
return nil, nil
}
func (t *txnReq) lookupMissingStateViaState(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
respState *gomatrixserverlib.RespState, err error) {
state, err := t.federation.LookupState(t.context, t.Origin, roomID, eventID, roomVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -359,78 +647,64 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi
return &state, nil return &state, nil
} }
func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*gomatrixserverlib.RespState, map[string]bool, error) { *gomatrixserverlib.RespState, error) {
util.GetLogger(t.context).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event // fetch the state event IDs at the time of the event
stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID()) stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, roomID, eventID)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// fetch as many as we can from the roomserver, do them as 2 calls rather than
// 1 to try to reduce the number of parameters in the bulk query this will use
haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(stateIDs.StateEventIDs))
haveEventIDs := make(map[string]bool)
for _, eventList := range [][]string{stateIDs.StateEventIDs, stateIDs.AuthEventIDs} {
queryReq := api.QueryEventsByIDRequest{
EventIDs: eventList,
}
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
return nil, nil, err
}
// allow indexing of current state by event ID
for i := range queryRes.Events {
haveEventMap[queryRes.Events[i].EventID()] = &queryRes.Events[i]
haveEventIDs[queryRes.Events[i].EventID()] = true
}
}
// work out which auth/state IDs are missing // work out which auth/state IDs are missing
wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...) wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...)
missing := make(map[string]bool) missing := make(map[string]bool)
var missingEventList []string
for _, sid := range wantIDs { for _, sid := range wantIDs {
if _, ok := haveEventMap[sid]; !ok { if _, ok := t.haveEvents[sid]; !ok {
if !missing[sid] {
missing[sid] = true missing[sid] = true
missingEventList = append(missingEventList, sid)
} }
} }
}
// fetch as many as we can from the roomserver
queryReq := api.QueryEventsByIDRequest{
EventIDs: missingEventList,
}
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
return nil, err
}
for i := range queryRes.Events {
evID := queryRes.Events[i].EventID()
t.haveEvents[evID] = &queryRes.Events[i]
if missing[evID] {
delete(missing, evID)
}
}
util.GetLogger(t.context).WithFields(logrus.Fields{ util.GetLogger(t.context).WithFields(logrus.Fields{
"missing": len(missing), "missing": len(missing),
"event_id": e.EventID(), "event_id": eventID,
"room_id": e.RoomID(), "room_id": roomID,
"already_have": len(haveEventMap),
"total_state": len(stateIDs.StateEventIDs), "total_state": len(stateIDs.StateEventIDs),
"total_auth_events": len(stateIDs.AuthEventIDs), "total_auth_events": len(stateIDs.AuthEventIDs),
}).Info("Fetching missing state at event") }).Info("Fetching missing state at event")
for missingEventID := range missing { for missingEventID := range missing {
var txn gomatrixserverlib.Transaction var h *gomatrixserverlib.HeaderedEvent
txn, err = t.federation.GetEvent(t.context, t.Origin, missingEventID) h, err = t.lookupEvent(roomVersion, missingEventID, false)
if err != nil { if err != nil {
util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID") return nil, err
return nil, nil, err
} }
for _, pdu := range txn.PDUs { t.haveEvents[h.EventID()] = h
var event gomatrixserverlib.Event
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
return nil, nil, unmarshalError{err}
} }
if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { resp, err := t.createRespStateFromStateIDs(stateIDs)
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) return resp, err
return nil, nil, verifySigError{event.EventID(), err}
}
h := event.Headered(roomVersion)
haveEventMap[event.EventID()] = &h
}
}
resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap)
return resp, haveEventIDs, err
} }
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) ( func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
*gomatrixserverlib.RespState, error) { *gomatrixserverlib.RespState, error) {
// create a RespState response using the response to /state_ids as a guide // create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib.RespState{ respState := gomatrixserverlib.RespState{
@ -439,22 +713,55 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
} }
for i := range stateIDs.StateEventIDs { for i := range stateIDs.StateEventIDs {
ev, ok := haveEventMap[stateIDs.StateEventIDs[i]] ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]
if !ok { if !ok {
return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i]) return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i])
} }
respState.StateEvents[i] = ev.Unwrap() respState.StateEvents[i] = ev.Unwrap()
} }
for i := range stateIDs.AuthEventIDs { for i := range stateIDs.AuthEventIDs {
ev, ok := haveEventMap[stateIDs.AuthEventIDs[i]] ev, ok := t.haveEvents[stateIDs.AuthEventIDs[i]]
if !ok { if !ok {
return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i]) return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i])
} }
respState.AuthEvents[i] = ev.Unwrap() respState.AuthEvents[i] = ev.Unwrap()
} }
// Check that the returned state is valid. // We purposefully do not do auth checks on the returned events, as they will still
if err := respState.Check(t.context, t.keys); err != nil { // be processed in the exact same way, just as a 'rejected' event
return nil, err // TODO: Add a field to HeaderedEvent to indicate if the event is rejected.
}
return &respState, nil return &respState, nil
} }
func (t *txnReq) lookupEvent(roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst {
// fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{
EventIDs: []string{missingEventID},
}
var queryRes api.QueryEventsByIDResponse
if err := t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
util.GetLogger(t.context).Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
} else if len(queryRes.Events) == 1 {
return &queryRes.Events[0], nil
}
}
txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
return nil, err
}
pdu := txn.PDUs[0]
var event gomatrixserverlib.Event
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
return nil, unmarshalError{err}
}
if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, verifySigError{event.EventID(), err}
}
h := event.Headered(roomVersion)
t.newEvents[h.EventID()] = true
return &h, nil
}

View file

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"sort"
"testing" "testing"
"time" "time"
@ -82,6 +81,7 @@ type testRoomserverAPI struct {
inputRoomEvents []api.InputRoomEvent inputRoomEvents []api.InputRoomEvent
queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse
queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse
queryLatestEventsAndState func(*api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse
} }
func (t *testRoomserverAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {} func (t *testRoomserverAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {}
@ -92,6 +92,9 @@ func (t *testRoomserverAPI) InputRoomEvents(
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) error { ) error {
t.inputRoomEvents = append(t.inputRoomEvents, request.InputRoomEvents...) t.inputRoomEvents = append(t.inputRoomEvents, request.InputRoomEvents...)
for _, ire := range request.InputRoomEvents {
fmt.Println("InputRoomEvents: ", ire.Event.EventID())
}
return nil return nil
} }
@ -117,6 +120,13 @@ func (t *testRoomserverAPI) QueryLatestEventsAndState(
request *api.QueryLatestEventsAndStateRequest, request *api.QueryLatestEventsAndStateRequest,
response *api.QueryLatestEventsAndStateResponse, response *api.QueryLatestEventsAndStateResponse,
) error { ) error {
r := t.queryLatestEventsAndState(request)
response.QueryLatestEventsAndStateRequest = *request
response.RoomExists = r.RoomExists
response.RoomVersion = testRoomVersion
response.LatestEvents = r.LatestEvents
response.StateEvents = r.StateEvents
response.Depth = r.Depth
return nil return nil
} }
@ -152,7 +162,7 @@ func (t *testRoomserverAPI) QueryMembershipForUser(
request *api.QueryMembershipForUserRequest, request *api.QueryMembershipForUserRequest,
response *api.QueryMembershipForUserResponse, response *api.QueryMembershipForUserResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Query a list of membership events for a room // Query a list of membership events for a room
@ -161,7 +171,7 @@ func (t *testRoomserverAPI) QueryMembershipsForRoom(
request *api.QueryMembershipsForRoomRequest, request *api.QueryMembershipsForRoomRequest,
response *api.QueryMembershipsForRoomResponse, response *api.QueryMembershipsForRoomResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Query a list of invite event senders for a user in a room. // Query a list of invite event senders for a user in a room.
@ -170,7 +180,7 @@ func (t *testRoomserverAPI) QueryInvitesForUser(
request *api.QueryInvitesForUserRequest, request *api.QueryInvitesForUserRequest,
response *api.QueryInvitesForUserResponse, response *api.QueryInvitesForUserResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Query whether a server is allowed to see an event // Query whether a server is allowed to see an event
@ -179,7 +189,7 @@ func (t *testRoomserverAPI) QueryServerAllowedToSeeEvent(
request *api.QueryServerAllowedToSeeEventRequest, request *api.QueryServerAllowedToSeeEventRequest,
response *api.QueryServerAllowedToSeeEventResponse, response *api.QueryServerAllowedToSeeEventResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Query missing events for a room from roomserver // Query missing events for a room from roomserver
@ -188,7 +198,7 @@ func (t *testRoomserverAPI) QueryMissingEvents(
request *api.QueryMissingEventsRequest, request *api.QueryMissingEventsRequest,
response *api.QueryMissingEventsResponse, response *api.QueryMissingEventsResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Query to get state and auth chain for a (potentially hypothetical) event. // Query to get state and auth chain for a (potentially hypothetical) event.
@ -199,7 +209,7 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain(
request *api.QueryStateAndAuthChainRequest, request *api.QueryStateAndAuthChainRequest,
response *api.QueryStateAndAuthChainResponse, response *api.QueryStateAndAuthChainResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Query a given amount (or less) of events prior to a given set of events. // Query a given amount (or less) of events prior to a given set of events.
@ -208,7 +218,7 @@ func (t *testRoomserverAPI) QueryBackfill(
request *api.QueryBackfillRequest, request *api.QueryBackfillRequest,
response *api.QueryBackfillResponse, response *api.QueryBackfillResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Asks for the default room version as preferred by the server. // Asks for the default room version as preferred by the server.
@ -217,7 +227,7 @@ func (t *testRoomserverAPI) QueryRoomVersionCapabilities(
request *api.QueryRoomVersionCapabilitiesRequest, request *api.QueryRoomVersionCapabilitiesRequest,
response *api.QueryRoomVersionCapabilitiesResponse, response *api.QueryRoomVersionCapabilitiesResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Asks for the room version for a given room. // Asks for the room version for a given room.
@ -236,7 +246,7 @@ func (t *testRoomserverAPI) SetRoomAlias(
req *api.SetRoomAliasRequest, req *api.SetRoomAliasRequest,
response *api.SetRoomAliasResponse, response *api.SetRoomAliasResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Get the room ID for an alias // Get the room ID for an alias
@ -245,7 +255,7 @@ func (t *testRoomserverAPI) GetRoomIDForAlias(
req *api.GetRoomIDForAliasRequest, req *api.GetRoomIDForAliasRequest,
response *api.GetRoomIDForAliasResponse, response *api.GetRoomIDForAliasResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Get all known aliases for a room ID // Get all known aliases for a room ID
@ -254,7 +264,7 @@ func (t *testRoomserverAPI) GetAliasesForRoomID(
req *api.GetAliasesForRoomIDRequest, req *api.GetAliasesForRoomIDRequest,
response *api.GetAliasesForRoomIDResponse, response *api.GetAliasesForRoomIDResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Get the user ID of the creator of an alias // Get the user ID of the creator of an alias
@ -263,7 +273,7 @@ func (t *testRoomserverAPI) GetCreatorIDForAlias(
req *api.GetCreatorIDForAliasRequest, req *api.GetCreatorIDForAliasRequest,
response *api.GetCreatorIDForAliasResponse, response *api.GetCreatorIDForAliasResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
// Remove a room alias // Remove a room alias
@ -272,18 +282,20 @@ func (t *testRoomserverAPI) RemoveRoomAlias(
req *api.RemoveRoomAliasRequest, req *api.RemoveRoomAliasRequest,
response *api.RemoveRoomAliasResponse, response *api.RemoveRoomAliasResponse,
) error { ) error {
return nil return fmt.Errorf("not implemented")
} }
type txnFedClient struct { type txnFedClient struct {
state map[string]gomatrixserverlib.RespState // event_id to response state map[string]gomatrixserverlib.RespState // event_id to response
stateIDs map[string]gomatrixserverlib.RespStateIDs // event_id to response stateIDs map[string]gomatrixserverlib.RespStateIDs // event_id to response
getEvent map[string]gomatrixserverlib.Transaction // event_id to response getEvent map[string]gomatrixserverlib.Transaction // event_id to response
getMissingEvents func(gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error)
} }
func (c *txnFedClient) LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( func (c *txnFedClient) LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
res gomatrixserverlib.RespState, err error, res gomatrixserverlib.RespState, err error,
) { ) {
fmt.Println("testFederationClient.LookupState", eventID)
r, ok := c.state[eventID] r, ok := c.state[eventID]
if !ok { if !ok {
err = fmt.Errorf("txnFedClient: no /state for event %s", eventID) err = fmt.Errorf("txnFedClient: no /state for event %s", eventID)
@ -293,6 +305,7 @@ func (c *txnFedClient) LookupState(ctx context.Context, s gomatrixserverlib.Serv
return return
} }
func (c *txnFedClient) LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) { func (c *txnFedClient) LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) {
fmt.Println("testFederationClient.LookupStateIDs", eventID)
r, ok := c.stateIDs[eventID] r, ok := c.stateIDs[eventID]
if !ok { if !ok {
err = fmt.Errorf("txnFedClient: no /state_ids for event %s", eventID) err = fmt.Errorf("txnFedClient: no /state_ids for event %s", eventID)
@ -302,6 +315,7 @@ func (c *txnFedClient) LookupStateIDs(ctx context.Context, s gomatrixserverlib.S
return return
} }
func (c *txnFedClient) GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) { func (c *txnFedClient) GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) {
fmt.Println("testFederationClient.GetEvent", eventID)
r, ok := c.getEvent[eventID] r, ok := c.getEvent[eventID]
if !ok { if !ok {
err = fmt.Errorf("txnFedClient: no /event for event ID %s", eventID) err = fmt.Errorf("txnFedClient: no /event for event ID %s", eventID)
@ -310,6 +324,10 @@ func (c *txnFedClient) GetEvent(ctx context.Context, s gomatrixserverlib.ServerN
res = r res = r
return return
} }
func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) {
return c.getMissingEvents(missing)
}
func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
t := &txnReq{ t := &txnReq{
@ -319,6 +337,8 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat
eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}), eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}),
keys: &testNopJSONVerifier{}, keys: &testNopJSONVerifier{},
federation: fedClient, federation: fedClient,
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool),
} }
t.PDUs = pdus t.PDUs = pdus
t.Origin = testOrigin t.Origin = testOrigin
@ -368,6 +388,9 @@ NextTuple:
} }
func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []gomatrixserverlib.HeaderedEvent) { func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []gomatrixserverlib.HeaderedEvent) {
for _, g := range got {
fmt.Println("GOT ", g.Event.EventID())
}
if len(got) != len(want) { if len(got) != len(want) {
t.Errorf("wrong number of InputRoomEvents: got %d want %d", len(got), len(want)) t.Errorf("wrong number of InputRoomEvents: got %d want %d", len(got), len(want))
return return
@ -424,32 +447,167 @@ func TestTransactionFailAuthChecks(t *testing.T) {
assertInputRoomEvents(t, rsAPI.inputRoomEvents, nil) // expect no messages to be sent to the roomserver assertInputRoomEvents(t, rsAPI.inputRoomEvents, nil) // expect no messages to be sent to the roomserver
} }
// The purpose of this test is to check that when there are missing prev_events that state is fetched via /state_ids // The purpose of this test is to make sure that when an event is received for which we do not know the prev_events,
// and /event and not /state. It works by setting PrevEventsExist=false in the roomserver query response, resulting in // we request them from /get_missing_events. It works by setting PrevEventsExist=false in the roomserver query response,
// a call to /state_ids which returns the whole room state. It should attempt to fetch as many of these events from the // resulting in a call to /get_missing_events which returns the missing prev event. Both events should be processed in
// roomserver FIRST, resulting in a call to QueryEventsByID. However, this will be missing the m.room.power_levels event which // topological order and sent to the roomserver.
// should then be requested via /event. The net result is that the transaction should succeed and there should be 2 func TestTransactionFetchMissingPrevEvents(t *testing.T) {
// new events, first the m.room.power_levels event we were missing, then the transaction PDU. haveEvent := testEvents[len(testEvents)-3]
prevEvent := testEvents[len(testEvents)-2]
inputEvent := testEvents[len(testEvents)-1]
var rsAPI *testRoomserverAPI // ref here so we can refer to inputRoomEvents inside these functions
rsAPI = &testRoomserverAPI{
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
// we expect this to be called three times:
// - first with input event to realise there's a gap
// - second with the prevEvent to realise there is no gap
// - third with the input event to realise there is no longer a gap
prevEventsExist := false
if len(req.PrevEventIDs) == 1 {
switch req.PrevEventIDs[0] {
case haveEvent.EventID():
prevEventsExist = true
case prevEvent.EventID():
// we only have this event if we've been send prevEvent
if len(rsAPI.inputRoomEvents) == 1 && rsAPI.inputRoomEvents[0].Event.EventID() == prevEvent.EventID() {
prevEventsExist = true
}
}
}
return api.QueryStateAfterEventsResponse{
PrevEventsExist: prevEventsExist,
RoomExists: true,
StateEvents: fromStateTuples(req.StateToFetch, nil),
}
},
queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse {
return api.QueryLatestEventsAndStateResponse{
RoomExists: true,
Depth: haveEvent.Depth(),
LatestEvents: []gomatrixserverlib.EventReference{
haveEvent.EventReference(),
},
StateEvents: fromStateTuples(req.StateToFetch, nil),
}
},
}
cli := &txnFedClient{
getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) {
if !reflect.DeepEqual(missing.EarliestEvents, []string{haveEvent.EventID()}) {
t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, haveEvent.EventID())
}
if !reflect.DeepEqual(missing.LatestEvents, []string{inputEvent.EventID()}) {
t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, inputEvent.EventID())
}
return gomatrixserverlib.RespMissingEvents{
Events: []gomatrixserverlib.Event{
prevEvent.Unwrap(),
},
}, nil
},
}
pdus := []json.RawMessage{
inputEvent.JSON(),
}
txn := mustCreateTransaction(rsAPI, cli, pdus)
mustProcessTransaction(t, txn, nil)
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{prevEvent, inputEvent})
}
// The purpose of this test is to check that when there are missing prev_events and we still haven't been able to fill
// in the hole with /get_missing_events that the state BEFORE the events we want to persist is fetched via /state_ids
// and /event. It works by setting PrevEventsExist=false in the roomserver query response, resulting in
// a call to /get_missing_events which returns 1 out of the 2 events it needs to fill in the gap. Synapse and Dendrite
// both give up after 1x /get_missing_events call, relying on requesting the state AFTER the missing event in order to
// continue. The DAG looks something like:
// FE GME TXN
// A ---> B ---> C ---> D
// TXN=event in the txn, GME=response to /get_missing_events, FE=roomserver's forward extremity. Should result in:
// - /state_ids?event=B is requested, then /event/B to get the state AFTER B. B is a state event.
// - state resolution is done to check C is allowed.
// This results in B being sent as an outlier FIRST, then C,D.
func TestTransactionFetchMissingStateByStateIDs(t *testing.T) { func TestTransactionFetchMissingStateByStateIDs(t *testing.T) {
missingStateEvent := testStateEvents[gomatrixserverlib.StateKeyTuple{ eventA := testEvents[len(testEvents)-5]
// this is also len(testEvents)-4
eventB := testStateEvents[gomatrixserverlib.StateKeyTuple{
EventType: gomatrixserverlib.MRoomPowerLevels, EventType: gomatrixserverlib.MRoomPowerLevels,
StateKey: "", StateKey: "",
}] }]
rsAPI := &testRoomserverAPI{ eventC := testEvents[len(testEvents)-3]
eventD := testEvents[len(testEvents)-2]
fmt.Println("a:", eventA.EventID())
fmt.Println("b:", eventB.EventID())
fmt.Println("c:", eventC.EventID())
fmt.Println("d:", eventD.EventID())
var rsAPI *testRoomserverAPI
rsAPI = &testRoomserverAPI{
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse { queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
omitTuples := []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomPowerLevels,
StateKey: "",
},
}
askingForEvent := req.PrevEventIDs[0]
haveEventB := false
haveEventC := false
for _, ev := range rsAPI.inputRoomEvents {
switch ev.Event.EventID() {
case eventB.EventID():
haveEventB = true
omitTuples = nil // include event B now
case eventC.EventID():
haveEventC = true
}
}
prevEventExists := false
if askingForEvent == eventC.EventID() {
prevEventExists = haveEventC
} else if askingForEvent == eventB.EventID() {
prevEventExists = haveEventB
}
var stateEvents []gomatrixserverlib.HeaderedEvent
if prevEventExists {
stateEvents = fromStateTuples(req.StateToFetch, omitTuples)
}
return api.QueryStateAfterEventsResponse{ return api.QueryStateAfterEventsResponse{
// setting this to false should trigger a call to /state_ids PrevEventsExist: prevEventExists,
PrevEventsExist: false,
RoomExists: true, RoomExists: true,
StateEvents: nil, StateEvents: stateEvents,
}
},
queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse {
omitTuples := []gomatrixserverlib.StateKeyTuple{
{EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: ""},
}
return api.QueryLatestEventsAndStateResponse{
RoomExists: true,
Depth: eventA.Depth(),
LatestEvents: []gomatrixserverlib.EventReference{
eventA.EventReference(),
},
StateEvents: fromStateTuples(req.StateToFetch, omitTuples),
} }
}, },
queryEventsByID: func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse { queryEventsByID: func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse {
var res api.QueryEventsByIDResponse var res api.QueryEventsByIDResponse
fmt.Println("queryEventsByID ", req.EventIDs)
for _, wantEventID := range req.EventIDs { for _, wantEventID := range req.EventIDs {
for _, ev := range testStateEvents { for _, ev := range testStateEvents {
// roomserver is missing the power levels event // roomserver is missing the power levels event unless it's been sent to us recently as an outlier
if wantEventID == missingStateEvent.EventID() { if wantEventID == eventB.EventID() {
fmt.Println("Asked for pl event")
for _, inEv := range rsAPI.inputRoomEvents {
fmt.Println("recv ", inEv.Event.EventID())
if inEv.Event.EventID() == wantEventID {
res.Events = append(res.Events, inEv.Event)
break
}
}
continue continue
} }
if ev.EventID() == wantEventID { if ev.EventID() == wantEventID {
@ -461,91 +619,55 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) {
return res return res
}, },
} }
inputEvent := testEvents[len(testEvents)-1] // /state_ids for event B returns every state event but B (it's the state before)
var authEventIDs []string
var stateEventIDs []string var stateEventIDs []string
for _, ev := range testStateEvents { for _, ev := range testStateEvents {
if ev.EventID() == eventB.EventID() {
continue
}
// state res checks what auth events you give it, and this isn't a valid auth event
if ev.Type() != gomatrixserverlib.MRoomHistoryVisibility {
authEventIDs = append(authEventIDs, ev.EventID())
}
stateEventIDs = append(stateEventIDs, ev.EventID()) stateEventIDs = append(stateEventIDs, ev.EventID())
} }
cli := &txnFedClient{ cli := &txnFedClient{
// /state_ids returns all the state events
stateIDs: map[string]gomatrixserverlib.RespStateIDs{ stateIDs: map[string]gomatrixserverlib.RespStateIDs{
inputEvent.EventID(): gomatrixserverlib.RespStateIDs{ eventB.EventID(): {
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
AuthEventIDs: stateEventIDs, AuthEventIDs: authEventIDs,
}, },
}, },
// /event for the missing state event returns it // /event for event B returns it
getEvent: map[string]gomatrixserverlib.Transaction{ getEvent: map[string]gomatrixserverlib.Transaction{
missingStateEvent.EventID(): gomatrixserverlib.Transaction{ eventB.EventID(): {
PDUs: []json.RawMessage{ PDUs: []json.RawMessage{
missingStateEvent.JSON(), eventB.JSON(),
}, },
}, },
}, },
// /get_missing_events should be done exactly once
getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) {
if !reflect.DeepEqual(missing.EarliestEvents, []string{eventA.EventID()}) {
t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, eventA.EventID())
}
if !reflect.DeepEqual(missing.LatestEvents, []string{eventD.EventID()}) {
t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, eventD.EventID())
}
// just return event C, not event B so /state_ids logic kicks in as there will STILL be missing prev_events
return gomatrixserverlib.RespMissingEvents{
Events: []gomatrixserverlib.Event{
eventC.Unwrap(),
},
}, nil
},
} }
pdus := []json.RawMessage{ pdus := []json.RawMessage{
testData[len(testData)-1], // a message event eventD.JSON(),
} }
txn := mustCreateTransaction(rsAPI, cli, pdus) txn := mustCreateTransaction(rsAPI, cli, pdus)
mustProcessTransaction(t, txn, nil) mustProcessTransaction(t, txn, nil)
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{missingStateEvent, inputEvent}) assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{eventB, eventC, eventD})
}
// The purpose of this test is to check that when there are missing prev_events and /state_ids fails, that we fallback to
// calling /state which returns the entire room state at that event. It works by setting PrevEventsExist=false in the
// roomserver query response, resulting in a call to /state_ids which fails (unset). It should then fetch via /state.
func TestTransactionFetchMissingStateByFallbackState(t *testing.T) {
rsAPI := &testRoomserverAPI{
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
return api.QueryStateAfterEventsResponse{
// setting this to false should trigger a call to /state_ids
PrevEventsExist: false,
RoomExists: true,
StateEvents: nil,
}
},
}
inputEvent := testEvents[len(testEvents)-1]
// first 5 events are the state events, in auth event order.
stateEvents := testEvents[:5]
cli := &txnFedClient{
// /state_ids purposefully unset
stateIDs: nil,
// /state returns the state at that event (which is the current state)
state: map[string]gomatrixserverlib.RespState{
inputEvent.EventID(): gomatrixserverlib.RespState{
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
},
},
}
pdus := []json.RawMessage{
testData[len(testData)-1], // a message event
}
txn := mustCreateTransaction(rsAPI, cli, pdus)
mustProcessTransaction(t, txn, nil)
// the roomserver should get all state events and the new input event
// TODO: it should really be only giving the missing ones
got := rsAPI.inputRoomEvents
if len(got) != len(stateEvents)+1 {
t.Fatalf("wrong number of InputRoomEvents: got %d want %d", len(got), len(stateEvents)+1)
}
last := got[len(got)-1]
if last.Event.EventID() != inputEvent.EventID() {
t.Errorf("last event should be the input event but it wasn't. got %s want %s", last.Event.EventID(), inputEvent.EventID())
}
gots := make([]string, len(stateEvents))
wants := make([]string, len(stateEvents))
for i := range stateEvents {
gots[i] = got[i].Event.EventID()
wants[i] = stateEvents[i].EventID()
}
sort.Strings(gots)
sort.Strings(wants)
if !reflect.DeepEqual(gots, wants) {
t.Errorf("state events returned mismatch, got (sorted): %+v want %+v", gots, wants)
}
} }

View file

@ -55,7 +55,7 @@ func processRoomEvent(
// Check that the event passes authentication checks and work out the numeric IDs for the auth events. // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
if err != nil { if err != nil {
logrus.WithError(err).WithField("event_id", event.EventID()).Error("processRoomEvent.checkAuthEvents failed for event") logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event")
return return
} }

View file

@ -264,4 +264,11 @@ User can invite local user to room with version 5
remote user can join room with version 5 remote user can join room with version 5
User can invite remote user to room with version 5 User can invite remote user to room with version 5
Remote user can backfill in a room with version 5 Remote user can backfill in a room with version 5
Inbound federation can receive v1 /send_join
Inbound federation can get state for a room
Inbound federation of state requires event_id as a mandatory paramater
Inbound federation can get state_ids for a room
Inbound federation of state_ids requires event_id as a mandatory paramater
Federation rejects inbound events where the prev_events cannot be found
Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state
Alternative server names do not cause a routing loop Alternative server names do not cause a routing loop