mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
More WIP get_missing_events work
This commit is contained in:
parent
30dfe0a64b
commit
ec674ef947
|
|
@ -16,6 +16,7 @@ package producers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -67,6 +68,7 @@ func (c *RoomserverProducer) SendEventWithState(
|
|||
if haveEventIDs[outlier.EventID()] {
|
||||
continue
|
||||
}
|
||||
fmt.Println("append outlier ", outlier.EventID())
|
||||
ires = append(ires, api.InputRoomEvent{
|
||||
Kind: api.KindOutlier,
|
||||
Event: outlier.Headered(event.RoomVersion),
|
||||
|
|
|
|||
|
|
@ -99,13 +99,12 @@ func Send(
|
|||
|
||||
type txnReq struct {
|
||||
gomatrixserverlib.Transaction
|
||||
context context.Context
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
producer *producers.RoomserverProducer
|
||||
eduProducer *producers.EDUServerProducer
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
federation txnFederationClient
|
||||
getMissingEventRecursionCount int
|
||||
context context.Context
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
producer *producers.RoomserverProducer
|
||||
eduProducer *producers.EDUServerProducer
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
federation txnFederationClient
|
||||
}
|
||||
|
||||
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
||||
|
|
@ -240,7 +239,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
|
|||
|
||||
func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error {
|
||||
prevEventIDs := e.PrevEventIDs()
|
||||
util.GetLogger(t.context).Infof("%s processEvent %s with prev_events %v", e.RoomID(), e.EventID(), prevEventIDs)
|
||||
util.GetLogger(t.context).Infof("processEvent %s with prev_events %v", e.EventID(), prevEventIDs)
|
||||
|
||||
// Fetch the state needed to authenticate the event.
|
||||
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
||||
|
|
@ -254,7 +253,6 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
|
|||
return err
|
||||
}
|
||||
util.GetLogger(t.context).Infof("processEvent %s stateResp.PrevEventsExist: %v", e.EventID(), stateResp.PrevEventsExist)
|
||||
util.GetLogger(t.context).Infof("NEEDED TUPLES: %+v", needed.Tuples())
|
||||
|
||||
if !stateResp.RoomExists {
|
||||
// TODO: When synapse receives a message for a room it is not in it
|
||||
|
|
@ -324,68 +322,171 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
|
|||
|
||||
// Attempt to fill in the gap using /get_missing_events
|
||||
// This will either:
|
||||
// - fill in the gap completely then process event `e` returning ok=true err=nil
|
||||
// - fail to fill in the gap and tell us to terminate the transaction ok=false, err=not nil
|
||||
// - fail to fill in the gap and tell us to fetch state, and to not terminate the transaction, ok=false, err=nil
|
||||
ok, err := t.getMissingEvents(e, roomVersion, isInboundTxn)
|
||||
// - fill in the gap completely then process event `e` returning no backwards extremity
|
||||
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
|
||||
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
|
||||
backwardsExtremity, err := t.getMissingEvents(e, roomVersion, isInboundTxn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
if backwardsExtremity == nil {
|
||||
// we filled in the gap!
|
||||
return nil
|
||||
}
|
||||
|
||||
// Attempt to fetch the missing state using /state_ids and /events
|
||||
respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion)
|
||||
// fetch the state BEFORE the event then check that the event is allowed
|
||||
respState, haveEventIDs, err := t.lookupStateAfterEvent(roomVersion, *backwardsExtremity)
|
||||
if err != nil {
|
||||
// Fallback to /state
|
||||
util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state")
|
||||
respState, err = t.lookupMissingStateViaState(e, roomVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// security: every time we fetch state from a remote server we have to do state resolution.
|
||||
// Say you have a forward extremity F with state you trust, and a malicious event M comes in which points to a prev event P,
|
||||
// then if you take the state at M you don't do any state res. If you fetch state for P then to get the new current state of
|
||||
// the room you have to do state res across F and P
|
||||
|
||||
// Check that the event is allowed by the state.
|
||||
retryAllowedState:
|
||||
if err := checkAllowedByState(e, respState.StateEvents); err != nil {
|
||||
switch missing := err.(type) {
|
||||
case gomatrixserverlib.MissingAuthEventError:
|
||||
// An auth event was missing so let's look up that event over federation
|
||||
for _, s := range respState.StateEvents {
|
||||
if s.EventID() != missing.AuthEventID {
|
||||
continue
|
||||
}
|
||||
err = t.processEventWithMissingState(s, roomVersion, isInboundTxn)
|
||||
// If there was no error retrieving the event from federation then
|
||||
// we assume that it succeeded, so retry the original state check
|
||||
if err == nil {
|
||||
goto retryAllowedState
|
||||
}
|
||||
}
|
||||
default:
|
||||
}
|
||||
return err
|
||||
}
|
||||
fmt.Println("Calcuated lookupStateAfterEvent")
|
||||
|
||||
// pass the event along with the state to the roomserver using a background context so we don't
|
||||
// needlessly expire
|
||||
return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs)
|
||||
}
|
||||
|
||||
// getMissingEvents returns ok=true if missing events were fetched and handled, else false. Returns an error only if we should
|
||||
// terminate the transaction which initiated /get_missing_events
|
||||
// lookupStateAfterEvent returns the room state after the event e, which is all the states before e resolved via state resolution
|
||||
// then having e applied to the resulting state.
|
||||
func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, e gomatrixserverlib.Event) (*gomatrixserverlib.RespState, map[string]bool, error) {
|
||||
// de-dupe all the events
|
||||
authEvents := make(map[string]*gomatrixserverlib.Event)
|
||||
stateEvents := make(map[string]*gomatrixserverlib.Event)
|
||||
haveEventIDs := make(map[string]bool)
|
||||
for _, prevEventID := range e.PrevEventIDs() {
|
||||
// don't do auth checks on this RespState as we're just interested in grabbing state/auth events and putting it into the pot
|
||||
respState, haveIDs, err := t.lookupStateBeforeEvent(roomVersion, false, e.RoomID(), prevEventID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
for i := range respState.StateEvents {
|
||||
stateEvents[respState.StateEvents[i].EventID()] = &respState.StateEvents[i]
|
||||
}
|
||||
for i := range respState.AuthEvents {
|
||||
authEvents[respState.AuthEvents[i].EventID()] = &respState.AuthEvents[i]
|
||||
}
|
||||
for id := range haveIDs {
|
||||
haveEventIDs[id] = true
|
||||
}
|
||||
// fetch the event we're missing and add it to the pile
|
||||
h, err := t.lookupEvent(roomVersion, prevEventID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if h.StateKey() != nil {
|
||||
he := h.Unwrap()
|
||||
stateEvents[h.EventID()] = &he
|
||||
}
|
||||
}
|
||||
authEventList := make([]gomatrixserverlib.Event, len(authEvents))
|
||||
i := 0
|
||||
for _, ev := range authEvents {
|
||||
authEventList[i] = *ev
|
||||
i++
|
||||
}
|
||||
stateEventList := make([]gomatrixserverlib.Event, len(stateEvents))
|
||||
i = 0
|
||||
for _, ev := range stateEvents {
|
||||
stateEventList[i] = *ev
|
||||
i++
|
||||
}
|
||||
resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(roomVersion, stateEventList, authEventList)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// apply the current event
|
||||
if err = checkAllowedByState(e, resolvedStateEvents); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// roll forward state if this event is a state event
|
||||
if e.StateKey() != nil {
|
||||
for i := range resolvedStateEvents {
|
||||
if resolvedStateEvents[i].Type() == e.Type() && resolvedStateEvents[i].StateKeyEquals(*e.StateKey()) {
|
||||
resolvedStateEvents[i] = e
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, s := range resolvedStateEvents {
|
||||
util.GetLogger(t.context).Infof("resolved: %s -> %s", s.Type(), string(s.Content()))
|
||||
}
|
||||
for _, s := range authEventList {
|
||||
util.GetLogger(t.context).Infof("authEventList: %s -> %s", s.Type(), string(s.Content()))
|
||||
}
|
||||
|
||||
resp := &gomatrixserverlib.RespState{
|
||||
AuthEvents: authEventList,
|
||||
StateEvents: resolvedStateEvents,
|
||||
}
|
||||
if err = resp.Check(t.context, t.keys); err != nil {
|
||||
return nil, nil, fmt.Errorf("lookupStateAfterEvent: resolved state is not valid: %w", err)
|
||||
}
|
||||
|
||||
return resp, haveEventIDs, nil
|
||||
}
|
||||
|
||||
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
|
||||
// the server supports.
|
||||
func (t *txnReq) lookupStateBeforeEvent(roomVersion gomatrixserverlib.RoomVersion, doAuthCheck bool, roomID, eventID string) (
|
||||
respState *gomatrixserverlib.RespState, haveEventIDs map[string]bool, err error) {
|
||||
|
||||
util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s", eventID)
|
||||
// It's entirely possible that we know this state, as QueryStateAfterEventsRequest only returns success if ALL prev_events
|
||||
// exist, so query the roomserver for the state with just this prev event
|
||||
stateReq := api.QueryStateAfterEventsRequest{
|
||||
RoomID: roomID,
|
||||
StateToFetch: nil, // TODO: do we need everything?
|
||||
PrevEventIDs: []string{eventID},
|
||||
}
|
||||
var stateResp api.QueryStateAfterEventsResponse
|
||||
if err = t.rsAPI.QueryStateAfterEvents(t.context, &stateReq, &stateResp); err != nil || stateResp.StateEvents == nil {
|
||||
util.GetLogger(t.context).WithError(err).Warnf("Failed to lookup state before event %s via roomserver - asking remote", eventID)
|
||||
// fallthrough to remote lookup
|
||||
} else {
|
||||
util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s returned locally", eventID)
|
||||
// we have all the events
|
||||
haveEvents := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||
haveEventIDs = make(map[string]bool)
|
||||
for i, ev := range stateResp.StateEvents {
|
||||
haveEventIDs[ev.EventID()] = true
|
||||
haveEvents[ev.EventID()] = &stateResp.StateEvents[i]
|
||||
}
|
||||
var authEvents []gomatrixserverlib.Event
|
||||
for _, ev := range stateResp.StateEvents {
|
||||
for _, ae := range ev.AuthEventIDs() {
|
||||
aev, ok := haveEvents[ae]
|
||||
if ok {
|
||||
authEvents = append(authEvents, aev.Unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
respState = &gomatrixserverlib.RespState{
|
||||
AuthEvents: authEvents,
|
||||
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateResp.StateEvents),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Attempt to fetch the missing state using /state_ids and /events
|
||||
respState, haveEventIDs, err = t.lookupMissingStateViaStateIDs(roomID, eventID, doAuthCheck, roomVersion)
|
||||
if err != nil {
|
||||
// Fallback to /state
|
||||
util.GetLogger(t.context).WithError(err).Warn("lookupStateBeforeEvent failed to /state_ids, falling back to /state")
|
||||
respState, err = t.lookupMissingStateViaState(roomID, eventID, roomVersion)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should
|
||||
// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
|
||||
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
|
||||
// This means that we may recursively call this function, as we spider back up prev_events to the min depth.
|
||||
func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (ok bool, err error) {
|
||||
func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
|
||||
if !isInboundTxn {
|
||||
// we've recursed here, so just take a state snapshot please!
|
||||
return false, nil
|
||||
fmt.Println("backwards extremity is now ", e.EventID())
|
||||
return &e, nil
|
||||
}
|
||||
logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
||||
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
||||
|
|
@ -397,7 +498,7 @@ func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatri
|
|||
var res api.QueryLatestEventsAndStateResponse
|
||||
if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil {
|
||||
logger.WithError(err).Warn("Failed to query latest events")
|
||||
return false, nil
|
||||
return &e, nil
|
||||
}
|
||||
latestEvents := make([]string, len(res.LatestEvents))
|
||||
for i := range res.LatestEvents {
|
||||
|
|
@ -430,11 +531,12 @@ func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatri
|
|||
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||
t.Origin,
|
||||
)
|
||||
return false, missingPrevEventsError{
|
||||
return nil, missingPrevEventsError{
|
||||
eventID: e.EventID(),
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
|
||||
|
||||
// topologically sort and sanity check that we are making forward progress
|
||||
newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||
|
|
@ -455,7 +557,7 @@ Event:
|
|||
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||
t.Origin,
|
||||
)
|
||||
return false, missingPrevEventsError{
|
||||
return nil, missingPrevEventsError{
|
||||
eventID: e.EventID(),
|
||||
err: err,
|
||||
}
|
||||
|
|
@ -464,16 +566,17 @@ Event:
|
|||
for _, ev := range append(newEvents, e) {
|
||||
err := t.processEvent(ev, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
// we processed everything!
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
func (t *txnReq) lookupMissingStateViaState(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
respState *gomatrixserverlib.RespState, err error) {
|
||||
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)
|
||||
state, err := t.federation.LookupState(t.context, t.Origin, roomID, eventID, roomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -484,11 +587,11 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi
|
|||
return &state, nil
|
||||
}
|
||||
|
||||
func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, doAuthCheck bool, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
*gomatrixserverlib.RespState, map[string]bool, error) {
|
||||
|
||||
util.GetLogger(t.context).Infof("lookupMissingStateViaStateIDs %s", eventID)
|
||||
// fetch the state event IDs at the time of the event
|
||||
stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID())
|
||||
stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, roomID, eventID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -522,46 +625,33 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe
|
|||
}
|
||||
util.GetLogger(t.context).WithFields(logrus.Fields{
|
||||
"missing": len(missing),
|
||||
"event_id": e.EventID(),
|
||||
"room_id": e.RoomID(),
|
||||
"event_id": eventID,
|
||||
"room_id": roomID,
|
||||
"already_have": len(haveEventMap),
|
||||
"total_state": len(stateIDs.StateEventIDs),
|
||||
"total_auth_events": len(stateIDs.AuthEventIDs),
|
||||
}).Info("Fetching missing state at event")
|
||||
|
||||
for missingEventID := range missing {
|
||||
var txn gomatrixserverlib.Transaction
|
||||
txn, err = t.federation.GetEvent(t.context, t.Origin, missingEventID)
|
||||
var h *gomatrixserverlib.HeaderedEvent
|
||||
h, err = t.lookupEvent(roomVersion, missingEventID)
|
||||
if err != nil {
|
||||
util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
|
||||
return nil, nil, err
|
||||
}
|
||||
for _, pdu := range txn.PDUs {
|
||||
var event gomatrixserverlib.Event
|
||||
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
|
||||
if err != nil {
|
||||
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
|
||||
return nil, nil, unmarshalError{err}
|
||||
}
|
||||
if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
|
||||
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
|
||||
return nil, nil, verifySigError{event.EventID(), err}
|
||||
}
|
||||
h := event.Headered(roomVersion)
|
||||
haveEventMap[event.EventID()] = &h
|
||||
}
|
||||
haveEventMap[h.EventID()] = h
|
||||
}
|
||||
resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap)
|
||||
resp, err := t.createRespStateFromStateIDs(stateIDs, doAuthCheck, haveEventMap)
|
||||
return resp, haveEventIDs, err
|
||||
}
|
||||
|
||||
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) (
|
||||
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, doAuthCheck bool, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) (
|
||||
*gomatrixserverlib.RespState, error) {
|
||||
// create a RespState response using the response to /state_ids as a guide
|
||||
respState := gomatrixserverlib.RespState{
|
||||
AuthEvents: make([]gomatrixserverlib.Event, len(stateIDs.AuthEventIDs)),
|
||||
StateEvents: make([]gomatrixserverlib.Event, len(stateIDs.StateEventIDs)),
|
||||
}
|
||||
var roomVer gomatrixserverlib.RoomVersion
|
||||
|
||||
for i := range stateIDs.StateEventIDs {
|
||||
ev, ok := haveEventMap[stateIDs.StateEventIDs[i]]
|
||||
|
|
@ -569,6 +659,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
|
|||
return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i])
|
||||
}
|
||||
respState.StateEvents[i] = ev.Unwrap()
|
||||
roomVer = ev.RoomVersion
|
||||
}
|
||||
for i := range stateIDs.AuthEventIDs {
|
||||
ev, ok := haveEventMap[stateIDs.AuthEventIDs[i]]
|
||||
|
|
@ -578,8 +669,46 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
|
|||
respState.AuthEvents[i] = ev.Unwrap()
|
||||
}
|
||||
// Check that the returned state is valid.
|
||||
retryCheck:
|
||||
if err := respState.Check(t.context, t.keys); err != nil {
|
||||
return nil, err
|
||||
switch missing := err.(type) {
|
||||
case gomatrixserverlib.MissingAuthEventError:
|
||||
// An auth event was missing so let's look up that event over federation
|
||||
var newEv *gomatrixserverlib.HeaderedEvent
|
||||
newEv, err = t.lookupEvent(roomVer, missing.AuthEventID)
|
||||
if err != nil {
|
||||
// we can't find this event, fail
|
||||
return nil, fmt.Errorf("missing auth event %s and cannot find it: %w", missing.AuthEventID, err)
|
||||
}
|
||||
respState.AuthEvents = append(respState.AuthEvents, newEv.Unwrap())
|
||||
goto retryCheck
|
||||
}
|
||||
if doAuthCheck {
|
||||
return nil, err
|
||||
} else {
|
||||
return &respState, nil
|
||||
}
|
||||
}
|
||||
return &respState, nil
|
||||
}
|
||||
|
||||
func (t *txnReq) lookupEvent(roomVersion gomatrixserverlib.RoomVersion, missingEventID string) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID)
|
||||
if err != nil || len(txn.PDUs) == 0 {
|
||||
util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
|
||||
return nil, err
|
||||
}
|
||||
pdu := txn.PDUs[0]
|
||||
var event gomatrixserverlib.Event
|
||||
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
|
||||
if err != nil {
|
||||
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
|
||||
return nil, unmarshalError{err}
|
||||
}
|
||||
if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
|
||||
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
|
||||
return nil, verifySigError{event.EventID(), err}
|
||||
}
|
||||
h := event.Headered(roomVersion)
|
||||
return &h, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -160,7 +159,7 @@ func (t *testRoomserverAPI) QueryMembershipForUser(
|
|||
request *api.QueryMembershipForUserRequest,
|
||||
response *api.QueryMembershipForUserResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Query a list of membership events for a room
|
||||
|
|
@ -169,7 +168,7 @@ func (t *testRoomserverAPI) QueryMembershipsForRoom(
|
|||
request *api.QueryMembershipsForRoomRequest,
|
||||
response *api.QueryMembershipsForRoomResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Query a list of invite event senders for a user in a room.
|
||||
|
|
@ -178,7 +177,7 @@ func (t *testRoomserverAPI) QueryInvitesForUser(
|
|||
request *api.QueryInvitesForUserRequest,
|
||||
response *api.QueryInvitesForUserResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Query whether a server is allowed to see an event
|
||||
|
|
@ -187,7 +186,7 @@ func (t *testRoomserverAPI) QueryServerAllowedToSeeEvent(
|
|||
request *api.QueryServerAllowedToSeeEventRequest,
|
||||
response *api.QueryServerAllowedToSeeEventResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Query missing events for a room from roomserver
|
||||
|
|
@ -196,7 +195,7 @@ func (t *testRoomserverAPI) QueryMissingEvents(
|
|||
request *api.QueryMissingEventsRequest,
|
||||
response *api.QueryMissingEventsResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Query to get state and auth chain for a (potentially hypothetical) event.
|
||||
|
|
@ -207,7 +206,7 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain(
|
|||
request *api.QueryStateAndAuthChainRequest,
|
||||
response *api.QueryStateAndAuthChainResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Query a given amount (or less) of events prior to a given set of events.
|
||||
|
|
@ -216,7 +215,7 @@ func (t *testRoomserverAPI) QueryBackfill(
|
|||
request *api.QueryBackfillRequest,
|
||||
response *api.QueryBackfillResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Asks for the default room version as preferred by the server.
|
||||
|
|
@ -225,7 +224,7 @@ func (t *testRoomserverAPI) QueryRoomVersionCapabilities(
|
|||
request *api.QueryRoomVersionCapabilitiesRequest,
|
||||
response *api.QueryRoomVersionCapabilitiesResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Asks for the room version for a given room.
|
||||
|
|
@ -244,7 +243,7 @@ func (t *testRoomserverAPI) SetRoomAlias(
|
|||
req *api.SetRoomAliasRequest,
|
||||
response *api.SetRoomAliasResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Get the room ID for an alias
|
||||
|
|
@ -253,7 +252,7 @@ func (t *testRoomserverAPI) GetRoomIDForAlias(
|
|||
req *api.GetRoomIDForAliasRequest,
|
||||
response *api.GetRoomIDForAliasResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Get all known aliases for a room ID
|
||||
|
|
@ -262,7 +261,7 @@ func (t *testRoomserverAPI) GetAliasesForRoomID(
|
|||
req *api.GetAliasesForRoomIDRequest,
|
||||
response *api.GetAliasesForRoomIDResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Get the user ID of the creator of an alias
|
||||
|
|
@ -271,7 +270,7 @@ func (t *testRoomserverAPI) GetCreatorIDForAlias(
|
|||
req *api.GetCreatorIDForAliasRequest,
|
||||
response *api.GetCreatorIDForAliasResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Remove a room alias
|
||||
|
|
@ -280,7 +279,7 @@ func (t *testRoomserverAPI) RemoveRoomAlias(
|
|||
req *api.RemoveRoomAliasRequest,
|
||||
response *api.RemoveRoomAliasResponse,
|
||||
) error {
|
||||
return nil
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
type txnFedClient struct {
|
||||
|
|
@ -381,6 +380,9 @@ NextTuple:
|
|||
}
|
||||
|
||||
func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []gomatrixserverlib.HeaderedEvent) {
|
||||
for _, g := range got {
|
||||
fmt.Println("GOT ", g.Event.EventID())
|
||||
}
|
||||
if len(got) != len(want) {
|
||||
t.Errorf("wrong number of InputRoomEvents: got %d want %d", len(got), len(want))
|
||||
return
|
||||
|
|
@ -508,32 +510,91 @@ func TestTransactionFetchMissingPrevEvents(t *testing.T) {
|
|||
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{prevEvent, inputEvent})
|
||||
}
|
||||
|
||||
// The purpose of this test is to check that when there are missing prev_events that state is fetched via /state_ids
|
||||
// and /event and not /state. It works by setting PrevEventsExist=false in the roomserver query response, resulting in
|
||||
// a call to /state_ids which returns the whole room state. It should attempt to fetch as many of these events from the
|
||||
// roomserver FIRST, resulting in a call to QueryEventsByID. However, this will be missing the m.room.power_levels event which
|
||||
// should then be requested via /event. The net result is that the transaction should succeed and there should be 2
|
||||
// new events, first the m.room.power_levels event we were missing, then the transaction PDU.
|
||||
// The purpose of this test is to check that when there are missing prev_events and we still haven't been able to fill
|
||||
// in the hole with /get_missing_events that the state BEFORE the events we want to persist is fetched via /state_ids
|
||||
// and /event. It works by setting PrevEventsExist=false in the roomserver query response, resulting in
|
||||
// a call to /get_missing_events which returns 1 out of the 2 events it needs to fill in the gap. Synapse and Dendrite
|
||||
// both give up after 1x /get_missing_events call, relying on requesting the state AFTER the missing event in order to
|
||||
// continue. The DAG looks something like:
|
||||
// FE GME TXN
|
||||
// A ---> B ---> C ---> D
|
||||
// TXN=event in the txn, GME=response to /get_missing_events, FE=roomserver's forward extremity. Should result in:
|
||||
// - /state_ids?event=B is requested, then /event/B to get the state AFTER B. B is a state event.
|
||||
// - state resolution is done to check C is allowed.
|
||||
// This results in B being sent as an outlier FIRST, then C,D.
|
||||
func TestTransactionFetchMissingStateByStateIDs(t *testing.T) {
|
||||
missingStateEvent := testStateEvents[gomatrixserverlib.StateKeyTuple{
|
||||
eventA := testEvents[len(testEvents)-5]
|
||||
// this is also len(testEvents)-4
|
||||
eventB := testStateEvents[gomatrixserverlib.StateKeyTuple{
|
||||
EventType: gomatrixserverlib.MRoomPowerLevels,
|
||||
StateKey: "",
|
||||
}]
|
||||
rsAPI := &testRoomserverAPI{
|
||||
eventC := testEvents[len(testEvents)-3]
|
||||
eventD := testEvents[len(testEvents)-2]
|
||||
fmt.Println("a:", eventA.EventID())
|
||||
fmt.Println("b:", eventB.EventID())
|
||||
fmt.Println("c:", eventC.EventID())
|
||||
fmt.Println("d:", eventD.EventID())
|
||||
var rsAPI *testRoomserverAPI
|
||||
rsAPI = &testRoomserverAPI{
|
||||
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
|
||||
// if we have event C from GME, then PrevEventsExist: True, else it is false
|
||||
prevEventExists := false
|
||||
omitTuples := []gomatrixserverlib.StateKeyTuple{
|
||||
gomatrixserverlib.StateKeyTuple{
|
||||
EventType: gomatrixserverlib.MRoomPowerLevels,
|
||||
StateKey: "",
|
||||
},
|
||||
}
|
||||
for _, ev := range rsAPI.inputRoomEvents {
|
||||
if ev.Event.EventID() == eventC.EventID() && len(req.PrevEventIDs) == 1 && req.PrevEventIDs[0] == eventC.EventID() {
|
||||
prevEventExists = true
|
||||
}
|
||||
if ev.Event.EventID() == eventB.EventID() {
|
||||
omitTuples = nil
|
||||
}
|
||||
}
|
||||
var stateEvents []gomatrixserverlib.HeaderedEvent
|
||||
if prevEventExists {
|
||||
stateEvents = fromStateTuples(req.StateToFetch, omitTuples)
|
||||
}
|
||||
return api.QueryStateAfterEventsResponse{
|
||||
// setting this to false should trigger a call to /state_ids
|
||||
PrevEventsExist: false,
|
||||
// setting this to false should trigger a call to /get_missing_events or /state_ids depending
|
||||
// on far back we've gone. The first time should trigger /get_missing_events but we should
|
||||
// give up on subsequent calls and just use the /state_ids
|
||||
PrevEventsExist: prevEventExists,
|
||||
RoomExists: true,
|
||||
StateEvents: nil,
|
||||
StateEvents: stateEvents,
|
||||
}
|
||||
},
|
||||
queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse {
|
||||
omitTuples := []gomatrixserverlib.StateKeyTuple{
|
||||
{EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: ""},
|
||||
}
|
||||
return api.QueryLatestEventsAndStateResponse{
|
||||
RoomExists: true,
|
||||
Depth: eventA.Depth(),
|
||||
LatestEvents: []gomatrixserverlib.EventReference{
|
||||
eventA.EventReference(),
|
||||
},
|
||||
StateEvents: fromStateTuples(req.StateToFetch, omitTuples),
|
||||
}
|
||||
},
|
||||
queryEventsByID: func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse {
|
||||
var res api.QueryEventsByIDResponse
|
||||
fmt.Println("queryEventsByID ", req.EventIDs)
|
||||
for _, wantEventID := range req.EventIDs {
|
||||
for _, ev := range testStateEvents {
|
||||
// roomserver is missing the power levels event
|
||||
if wantEventID == missingStateEvent.EventID() {
|
||||
// roomserver is missing the power levels event unless it's been sent to us recently as an outlier
|
||||
if wantEventID == eventB.EventID() {
|
||||
fmt.Println("Asked for pl event")
|
||||
for _, inEv := range rsAPI.inputRoomEvents {
|
||||
fmt.Println("recv ", inEv.Event.EventID())
|
||||
if inEv.Event.EventID() == wantEventID {
|
||||
res.Events = append(res.Events, inEv.Event)
|
||||
break
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
if ev.EventID() == wantEventID {
|
||||
|
|
@ -545,91 +606,55 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) {
|
|||
return res
|
||||
},
|
||||
}
|
||||
inputEvent := testEvents[len(testEvents)-1]
|
||||
// /state_ids for event B returns every state event but B (it's the state before)
|
||||
var authEventIDs []string
|
||||
var stateEventIDs []string
|
||||
for _, ev := range testStateEvents {
|
||||
if ev.EventID() == eventB.EventID() {
|
||||
continue
|
||||
}
|
||||
// state res checks what auth events you give it, and this isn't a valid auth event
|
||||
if ev.Type() != gomatrixserverlib.MRoomHistoryVisibility {
|
||||
authEventIDs = append(authEventIDs, ev.EventID())
|
||||
}
|
||||
stateEventIDs = append(stateEventIDs, ev.EventID())
|
||||
}
|
||||
cli := &txnFedClient{
|
||||
// /state_ids returns all the state events
|
||||
stateIDs: map[string]gomatrixserverlib.RespStateIDs{
|
||||
inputEvent.EventID(): gomatrixserverlib.RespStateIDs{
|
||||
eventB.EventID(): gomatrixserverlib.RespStateIDs{
|
||||
StateEventIDs: stateEventIDs,
|
||||
AuthEventIDs: stateEventIDs,
|
||||
AuthEventIDs: authEventIDs,
|
||||
},
|
||||
},
|
||||
// /event for the missing state event returns it
|
||||
// /event for event B returns it
|
||||
getEvent: map[string]gomatrixserverlib.Transaction{
|
||||
missingStateEvent.EventID(): gomatrixserverlib.Transaction{
|
||||
eventB.EventID(): gomatrixserverlib.Transaction{
|
||||
PDUs: []json.RawMessage{
|
||||
missingStateEvent.JSON(),
|
||||
eventB.JSON(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pdus := []json.RawMessage{
|
||||
testData[len(testData)-1], // a message event
|
||||
}
|
||||
txn := mustCreateTransaction(rsAPI, cli, pdus)
|
||||
mustProcessTransaction(t, txn, nil)
|
||||
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{missingStateEvent, inputEvent})
|
||||
}
|
||||
|
||||
// The purpose of this test is to check that when there are missing prev_events and /state_ids fails, that we fallback to
|
||||
// calling /state which returns the entire room state at that event. It works by setting PrevEventsExist=false in the
|
||||
// roomserver query response, resulting in a call to /state_ids which fails (unset). It should then fetch via /state.
|
||||
func TestTransactionFetchMissingStateByFallbackState(t *testing.T) {
|
||||
rsAPI := &testRoomserverAPI{
|
||||
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
|
||||
return api.QueryStateAfterEventsResponse{
|
||||
// setting this to false should trigger a call to /state_ids
|
||||
PrevEventsExist: false,
|
||||
RoomExists: true,
|
||||
StateEvents: nil,
|
||||
// /get_missing_events should be done exactly once
|
||||
getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) {
|
||||
if !reflect.DeepEqual(missing.EarliestEvents, []string{eventA.EventID()}) {
|
||||
t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, eventA.EventID())
|
||||
}
|
||||
},
|
||||
}
|
||||
inputEvent := testEvents[len(testEvents)-1]
|
||||
// first 5 events are the state events, in auth event order.
|
||||
stateEvents := testEvents[:5]
|
||||
|
||||
cli := &txnFedClient{
|
||||
// /state_ids purposefully unset
|
||||
stateIDs: nil,
|
||||
// /state returns the state at that event (which is the current state)
|
||||
state: map[string]gomatrixserverlib.RespState{
|
||||
inputEvent.EventID(): gomatrixserverlib.RespState{
|
||||
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
|
||||
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
|
||||
},
|
||||
if !reflect.DeepEqual(missing.LatestEvents, []string{eventD.EventID()}) {
|
||||
t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, eventD.EventID())
|
||||
}
|
||||
// just return event C, not event B so /state_ids logic kicks in as there will STILL be missing prev_events
|
||||
return gomatrixserverlib.RespMissingEvents{
|
||||
Events: []gomatrixserverlib.Event{
|
||||
eventC.Unwrap(),
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
pdus := []json.RawMessage{
|
||||
testData[len(testData)-1], // a message event
|
||||
eventD.JSON(),
|
||||
}
|
||||
txn := mustCreateTransaction(rsAPI, cli, pdus)
|
||||
mustProcessTransaction(t, txn, nil)
|
||||
// the roomserver should get all state events and the new input event
|
||||
// TODO: it should really be only giving the missing ones
|
||||
got := rsAPI.inputRoomEvents
|
||||
if len(got) != len(stateEvents)+1 {
|
||||
t.Fatalf("wrong number of InputRoomEvents: got %d want %d", len(got), len(stateEvents)+1)
|
||||
}
|
||||
last := got[len(got)-1]
|
||||
if last.Event.EventID() != inputEvent.EventID() {
|
||||
t.Errorf("last event should be the input event but it wasn't. got %s want %s", last.Event.EventID(), inputEvent.EventID())
|
||||
}
|
||||
gots := make([]string, len(stateEvents))
|
||||
wants := make([]string, len(stateEvents))
|
||||
for i := range stateEvents {
|
||||
gots[i] = got[i].Event.EventID()
|
||||
wants[i] = stateEvents[i].EventID()
|
||||
}
|
||||
sort.Strings(gots)
|
||||
sort.Strings(wants)
|
||||
if !reflect.DeepEqual(gots, wants) {
|
||||
t.Errorf("state events returned mismatch, got (sorted): %+v want %+v", gots, wants)
|
||||
}
|
||||
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{eventB, eventC, eventD})
|
||||
}
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -17,7 +17,7 @@ require (
|
|||
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
|
||||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200505092542-ef8abbde3f6b
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200507185533-bc21abd9ca04
|
||||
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
|
||||
github.com/mattn/go-sqlite3 v2.0.2+incompatible
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -367,8 +367,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200505092542-ef8abbde3f6b h1:gxLun/noFJ7DplX7rqT8E4v4NkeDJ45tqW7LXC6k4C4=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200505092542-ef8abbde3f6b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200507185533-bc21abd9ca04 h1:8+6bOm9r2TCD6cudtt0zpAY2St8sko2+Xe7fqHYAH0Y=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200507185533-bc21abd9ca04/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
||||
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
|
||||
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
|
||||
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
|
||||
|
|
|
|||
Loading…
Reference in a new issue