|
|
|
|
@ -183,7 +183,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
|
|
|
|
|
|
|
|
|
|
// Process the events.
|
|
|
|
|
for _, e := range pdus {
|
|
|
|
|
if err := t.processEvent(ctx, e.Unwrap(), true); err != nil {
|
|
|
|
|
if err := t.processEvent(ctx, e.Unwrap()); err != nil {
|
|
|
|
|
// If the error is due to the event itself being bad then we skip
|
|
|
|
|
// it and move onto the next event. We report an error so that the
|
|
|
|
|
// sender knows that we have skipped processing it.
|
|
|
|
|
@ -246,9 +246,6 @@ func isProcessingErrorFatal(err error) bool {
|
|
|
|
|
type roomNotFoundError struct {
|
|
|
|
|
roomID string
|
|
|
|
|
}
|
|
|
|
|
type unmarshalError struct {
|
|
|
|
|
err error
|
|
|
|
|
}
|
|
|
|
|
type verifySigError struct {
|
|
|
|
|
eventID string
|
|
|
|
|
err error
|
|
|
|
|
@ -259,7 +256,6 @@ type missingPrevEventsError struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) }
|
|
|
|
|
func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) }
|
|
|
|
|
func (e verifySigError) Error() string {
|
|
|
|
|
return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err)
|
|
|
|
|
}
|
|
|
|
|
@ -338,11 +334,28 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, isInboundTxn bool) error {
|
|
|
|
|
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
|
|
|
|
|
servers := []gomatrixserverlib.ServerName{t.Origin}
|
|
|
|
|
serverReq := &api.QueryServerJoinedToRoomRequest{
|
|
|
|
|
RoomID: roomID,
|
|
|
|
|
}
|
|
|
|
|
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
|
|
|
|
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
|
|
|
|
servers = append(servers, serverRes.ServerNames...)
|
|
|
|
|
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID)
|
|
|
|
|
}
|
|
|
|
|
return servers
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) error {
|
|
|
|
|
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
|
|
|
|
|
|
|
|
|
// Work out if the roomserver knows everything it needs to know to auth
|
|
|
|
|
// the event.
|
|
|
|
|
// the event. This includes the prev_events and auth_events.
|
|
|
|
|
// NOTE! This is going to include prev_events that have an empty state
|
|
|
|
|
// snapshot. This is because we will need to re-request the event, and
|
|
|
|
|
// it's /state_ids, in order for it to exist in the roomserver correctly
|
|
|
|
|
// before the roomserver tries to work out
|
|
|
|
|
stateReq := api.QueryMissingAuthPrevEventsRequest{
|
|
|
|
|
RoomID: e.RoomID(),
|
|
|
|
|
AuthEventIDs: e.AuthEventIDs(),
|
|
|
|
|
@ -350,7 +363,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
|
|
|
|
|
}
|
|
|
|
|
var stateResp api.QueryMissingAuthPrevEventsResponse
|
|
|
|
|
if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !stateResp.RoomExists {
|
|
|
|
|
@ -365,52 +378,14 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
|
|
|
|
|
|
|
|
|
|
if len(stateResp.MissingAuthEventIDs) > 0 {
|
|
|
|
|
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
|
|
|
|
|
|
|
|
|
|
servers := []gomatrixserverlib.ServerName{t.Origin}
|
|
|
|
|
serverReq := &api.QueryServerJoinedToRoomRequest{
|
|
|
|
|
RoomID: e.RoomID(),
|
|
|
|
|
}
|
|
|
|
|
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
|
|
|
|
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
|
|
|
|
servers = append(servers, serverRes.ServerNames...)
|
|
|
|
|
logger.Infof("Found %d server(s) to query for missing events", len(servers))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
getAuthEvent:
|
|
|
|
|
for _, missingAuthEventID := range stateResp.MissingAuthEventIDs {
|
|
|
|
|
for _, server := range servers {
|
|
|
|
|
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
|
|
|
|
|
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
continue // try the next server
|
|
|
|
|
}
|
|
|
|
|
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.WithError(err).Errorf("Failed to unmarshal auth event %q", missingAuthEventID)
|
|
|
|
|
continue // try the next server
|
|
|
|
|
}
|
|
|
|
|
if err = api.SendInputRoomEvents(
|
|
|
|
|
context.Background(),
|
|
|
|
|
t.rsAPI,
|
|
|
|
|
[]api.InputRoomEvent{
|
|
|
|
|
{
|
|
|
|
|
Kind: api.KindOutlier,
|
|
|
|
|
Event: ev.Headered(stateResp.RoomVersion),
|
|
|
|
|
AuthEventIDs: ev.AuthEventIDs(),
|
|
|
|
|
SendAsServer: api.DoNotSendToOtherServers,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
); err != nil {
|
|
|
|
|
logger.WithError(err).Errorf("Failed to send auth event %q to roomserver", missingAuthEventID)
|
|
|
|
|
continue getAuthEvent // move onto the next event
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil {
|
|
|
|
|
return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(stateResp.MissingPrevEventIDs) > 0 {
|
|
|
|
|
logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs))
|
|
|
|
|
return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion, isInboundTxn)
|
|
|
|
|
return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// pass the event to the roomserver which will do auth checks
|
|
|
|
|
@ -427,6 +402,60 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *txnReq) retrieveMissingAuthEvents(
|
|
|
|
|
ctx context.Context, e gomatrixserverlib.Event, stateResp *api.QueryMissingAuthPrevEventsResponse,
|
|
|
|
|
) error {
|
|
|
|
|
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
|
|
|
|
|
|
|
|
|
missingAuthEvents := make(map[string]struct{})
|
|
|
|
|
for _, missingAuthEventID := range stateResp.MissingAuthEventIDs {
|
|
|
|
|
missingAuthEvents[missingAuthEventID] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
servers := t.getServers(ctx, e.RoomID())
|
|
|
|
|
if len(servers) > 5 {
|
|
|
|
|
servers = servers[:5]
|
|
|
|
|
}
|
|
|
|
|
withNextEvent:
|
|
|
|
|
for missingAuthEventID := range missingAuthEvents {
|
|
|
|
|
withNextServer:
|
|
|
|
|
for _, server := range servers {
|
|
|
|
|
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
|
|
|
|
|
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.WithError(err).Warnf("Failed to retrieve auth event %q", missingAuthEventID)
|
|
|
|
|
continue withNextServer
|
|
|
|
|
}
|
|
|
|
|
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.WithError(err).Warnf("Failed to unmarshal auth event %q", missingAuthEventID)
|
|
|
|
|
continue withNextServer
|
|
|
|
|
}
|
|
|
|
|
if err = api.SendInputRoomEvents(
|
|
|
|
|
context.Background(),
|
|
|
|
|
t.rsAPI,
|
|
|
|
|
[]api.InputRoomEvent{
|
|
|
|
|
{
|
|
|
|
|
Kind: api.KindOutlier,
|
|
|
|
|
Event: ev.Headered(stateResp.RoomVersion),
|
|
|
|
|
AuthEventIDs: ev.AuthEventIDs(),
|
|
|
|
|
SendAsServer: api.DoNotSendToOtherServers,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
); err != nil {
|
|
|
|
|
return fmt.Errorf("api.SendEvents: %w", err)
|
|
|
|
|
}
|
|
|
|
|
delete(missingAuthEvents, missingAuthEventID)
|
|
|
|
|
continue withNextEvent
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if missing := len(missingAuthEvents); missing > 0 {
|
|
|
|
|
return fmt.Errorf("Event refers to %d auth_events which we failed to fetch", missing)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
|
|
|
|
|
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
|
|
|
|
|
for i := range stateEvents {
|
|
|
|
|
@ -438,7 +467,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
|
|
|
|
|
return gomatrixserverlib.Allowed(e, &authUsingState)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error {
|
|
|
|
|
func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
|
|
|
|
|
// Do this with a fresh context, so that we keep working even if the
|
|
|
|
|
// original request times out. With any luck, by the time the remote
|
|
|
|
|
// side retries, we'll have fetched the missing state.
|
|
|
|
|
@ -464,39 +493,82 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
|
|
|
|
|
// - fill in the gap completely then process event `e` returning no backwards extremity
|
|
|
|
|
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
|
|
|
|
|
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
|
|
|
|
|
backwardsExtremity, err := t.getMissingEvents(gmectx, e, roomVersion, isInboundTxn)
|
|
|
|
|
newEvents, err := t.getMissingEvents(gmectx, e, roomVersion)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if backwardsExtremity == nil {
|
|
|
|
|
// we filled in the gap!
|
|
|
|
|
if len(newEvents) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
backwardsExtremity := &newEvents[0]
|
|
|
|
|
newEvents = newEvents[1:]
|
|
|
|
|
|
|
|
|
|
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
|
|
|
|
|
// security: we have to do state resolution on the new backwards extremity (TODO: WHY)
|
|
|
|
|
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
|
|
|
|
|
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
|
|
|
|
|
var states []*gomatrixserverlib.RespState
|
|
|
|
|
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
|
|
|
|
|
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
|
|
|
|
|
// Look up what the state is after the backward extremity. This will either
|
|
|
|
|
// come from the roomserver, if we know all the required events, or it will
|
|
|
|
|
// come from a remote server via /state_ids if not.
|
|
|
|
|
var prevState *gomatrixserverlib.RespState
|
|
|
|
|
prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
|
|
|
|
|
if err != nil {
|
|
|
|
|
util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// Append the state onto the collected state. We'll run this through the
|
|
|
|
|
// state resolution next.
|
|
|
|
|
states = append(states, prevState)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Now that we have collected all of the state from the prev_events, we'll
|
|
|
|
|
// run the state through the appropriate state resolution algorithm for the
|
|
|
|
|
// room. This does a couple of things:
|
|
|
|
|
// 1. Ensures that the state is deduplicated fully for each state-key tuple
|
|
|
|
|
// 2. Ensures that we pick the latest events from both sets, in the case that
|
|
|
|
|
// one of the prev_events is quite a bit older than the others
|
|
|
|
|
resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity)
|
|
|
|
|
if err != nil {
|
|
|
|
|
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// pass the event along with the state to the roomserver using a background context so we don't
|
|
|
|
|
// needlessly expire
|
|
|
|
|
return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs())
|
|
|
|
|
// First of all, send the backward extremity into the roomserver with the
|
|
|
|
|
// newly resolved state. This marks the "oldest" point in the backfill and
|
|
|
|
|
// sets the baseline state for any new events after this.
|
|
|
|
|
err = api.SendEventWithState(
|
|
|
|
|
context.Background(),
|
|
|
|
|
t.rsAPI,
|
|
|
|
|
resolvedState,
|
|
|
|
|
backwardsExtremity.Headered(roomVersion),
|
|
|
|
|
t.haveEventIDs(),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("api.SendEventWithState: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Then send all of the newer backfilled events, of which will all be newer
|
|
|
|
|
// than the backward extremity, into the roomserver without state. This way
|
|
|
|
|
// they will automatically fast-forward based on the room state at the
|
|
|
|
|
// extremity in the last step.
|
|
|
|
|
headeredNewEvents := make([]gomatrixserverlib.HeaderedEvent, len(newEvents))
|
|
|
|
|
for i, newEvent := range newEvents {
|
|
|
|
|
headeredNewEvents[i] = newEvent.Headered(roomVersion)
|
|
|
|
|
}
|
|
|
|
|
if err = api.SendEvents(
|
|
|
|
|
context.Background(),
|
|
|
|
|
t.rsAPI,
|
|
|
|
|
append(headeredNewEvents, e.Headered(roomVersion)),
|
|
|
|
|
api.DoNotSendToOtherServers,
|
|
|
|
|
nil,
|
|
|
|
|
); err != nil {
|
|
|
|
|
return fmt.Errorf("api.SendEvents: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
|
|
|
|
|
@ -510,18 +582,23 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
|
|
|
|
|
|
|
|
|
|
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
return nil, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
servers := t.getServers(ctx, roomID)
|
|
|
|
|
if len(servers) > 5 {
|
|
|
|
|
servers = servers[:5]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// fetch the event we're missing and add it to the pile
|
|
|
|
|
h, err := t.lookupEvent(ctx, roomVersion, eventID, false)
|
|
|
|
|
h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
|
|
|
|
|
switch err.(type) {
|
|
|
|
|
case verifySigError:
|
|
|
|
|
return respState, nil
|
|
|
|
|
case nil:
|
|
|
|
|
// do nothing
|
|
|
|
|
default:
|
|
|
|
|
return nil, err
|
|
|
|
|
return nil, fmt.Errorf("t.lookupEvent: %w", err)
|
|
|
|
|
}
|
|
|
|
|
t.haveEvents[h.EventID()] = h
|
|
|
|
|
if h.StateKey() != nil {
|
|
|
|
|
@ -622,7 +699,11 @@ retryAllowedState:
|
|
|
|
|
if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil {
|
|
|
|
|
switch missing := err.(type) {
|
|
|
|
|
case gomatrixserverlib.MissingAuthEventError:
|
|
|
|
|
h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true)
|
|
|
|
|
servers := t.getServers(ctx, backwardsExtremity.RoomID())
|
|
|
|
|
if len(servers) > 5 {
|
|
|
|
|
servers = servers[:5]
|
|
|
|
|
}
|
|
|
|
|
h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers)
|
|
|
|
|
switch err2.(type) {
|
|
|
|
|
case verifySigError:
|
|
|
|
|
return &gomatrixserverlib.RespState{
|
|
|
|
|
@ -652,11 +733,7 @@ retryAllowedState:
|
|
|
|
|
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
|
|
|
|
|
// This means that we may recursively call this function, as we spider back up prev_events.
|
|
|
|
|
// nolint:gocyclo
|
|
|
|
|
func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
|
|
|
|
|
if !isInboundTxn {
|
|
|
|
|
// we've recursed here, so just take a state snapshot please!
|
|
|
|
|
return &e, nil
|
|
|
|
|
}
|
|
|
|
|
func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []gomatrixserverlib.Event, err error) {
|
|
|
|
|
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
|
|
|
|
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
|
|
|
|
// query latest events (our trusted forward extremities)
|
|
|
|
|
@ -667,7 +744,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
|
|
|
|
|
var res api.QueryLatestEventsAndStateResponse
|
|
|
|
|
if err = t.rsAPI.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
|
|
|
|
|
logger.WithError(err).Warn("Failed to query latest events")
|
|
|
|
|
return &e, nil
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
latestEvents := make([]string, len(res.LatestEvents))
|
|
|
|
|
for i := range res.LatestEvents {
|
|
|
|
|
@ -726,7 +803,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
|
|
|
|
|
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
|
|
|
|
|
|
|
|
|
|
// topologically sort and sanity check that we are making forward progress
|
|
|
|
|
newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
|
|
|
|
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
|
|
|
|
shouldHaveSomeEventIDs := e.PrevEventIDs()
|
|
|
|
|
hasPrevEvent := false
|
|
|
|
|
Event:
|
|
|
|
|
@ -749,16 +826,9 @@ Event:
|
|
|
|
|
err: err,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// process the missing events then the event which started this whole thing
|
|
|
|
|
for _, ev := range append(newEvents, e) {
|
|
|
|
|
err := t.processEvent(ctx, ev, false)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// we processed everything!
|
|
|
|
|
return nil, nil
|
|
|
|
|
return newEvents, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
|
|
|
|
@ -838,6 +908,12 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
|
|
|
|
"concurrent_requests": concurrentRequests,
|
|
|
|
|
}).Info("Fetching missing state at event")
|
|
|
|
|
|
|
|
|
|
// Get a list of servers to fetch from.
|
|
|
|
|
servers := t.getServers(ctx, roomID)
|
|
|
|
|
if len(servers) > 5 {
|
|
|
|
|
servers = servers[:5]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create a queue containing all of the missing event IDs that we want
|
|
|
|
|
// to retrieve.
|
|
|
|
|
pending := make(chan string, missingCount)
|
|
|
|
|
@ -863,7 +939,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
|
|
|
|
// Define what we'll do in order to fetch the missing event ID.
|
|
|
|
|
fetch := func(missingEventID string) {
|
|
|
|
|
var h *gomatrixserverlib.HeaderedEvent
|
|
|
|
|
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false)
|
|
|
|
|
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
|
|
|
|
|
switch err.(type) {
|
|
|
|
|
case verifySigError:
|
|
|
|
|
return
|
|
|
|
|
@ -901,26 +977,25 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
|
|
|
|
|
*gomatrixserverlib.RespState, error) {
|
|
|
|
|
*gomatrixserverlib.RespState, error) { // nolint:unparam
|
|
|
|
|
// create a RespState response using the response to /state_ids as a guide
|
|
|
|
|
respState := gomatrixserverlib.RespState{
|
|
|
|
|
AuthEvents: make([]gomatrixserverlib.Event, len(stateIDs.AuthEventIDs)),
|
|
|
|
|
StateEvents: make([]gomatrixserverlib.Event, len(stateIDs.StateEventIDs)),
|
|
|
|
|
}
|
|
|
|
|
respState := gomatrixserverlib.RespState{}
|
|
|
|
|
|
|
|
|
|
for i := range stateIDs.StateEventIDs {
|
|
|
|
|
ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i])
|
|
|
|
|
logrus.Warnf("Missing state event in createRespStateFromStateIDs: %s", stateIDs.StateEventIDs[i])
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
respState.StateEvents[i] = ev.Unwrap()
|
|
|
|
|
respState.StateEvents = append(respState.StateEvents, ev.Unwrap())
|
|
|
|
|
}
|
|
|
|
|
for i := range stateIDs.AuthEventIDs {
|
|
|
|
|
ev, ok := t.haveEvents[stateIDs.AuthEventIDs[i]]
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i])
|
|
|
|
|
logrus.Warnf("Missing auth event in createRespStateFromStateIDs: %s", stateIDs.AuthEventIDs[i])
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
respState.AuthEvents[i] = ev.Unwrap()
|
|
|
|
|
respState.AuthEvents = append(respState.AuthEvents, ev.Unwrap())
|
|
|
|
|
}
|
|
|
|
|
// We purposefully do not do auth checks on the returned events, as they will still
|
|
|
|
|
// be processed in the exact same way, just as a 'rejected' event
|
|
|
|
|
@ -928,7 +1003,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
|
|
|
|
|
return &respState, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
|
|
|
|
|
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) {
|
|
|
|
|
if localFirst {
|
|
|
|
|
// fetch from the roomserver
|
|
|
|
|
queryReq := api.QueryEventsByIDRequest{
|
|
|
|
|
@ -941,19 +1016,27 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
|
|
|
|
|
return &queryRes.Events[0], nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
txn, err := t.federation.GetEvent(ctx, t.Origin, missingEventID)
|
|
|
|
|
if err != nil || len(txn.PDUs) == 0 {
|
|
|
|
|
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
pdu := txn.PDUs[0]
|
|
|
|
|
var event gomatrixserverlib.Event
|
|
|
|
|
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
|
|
|
|
|
if err != nil {
|
|
|
|
|
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
|
|
|
|
|
return nil, unmarshalError{err}
|
|
|
|
|
found := false
|
|
|
|
|
for _, serverName := range servers {
|
|
|
|
|
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
|
|
|
|
|
if err != nil || len(txn.PDUs) == 0 {
|
|
|
|
|
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("Failed to get missing /event for event ID")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(txn.PDUs[0], roomVersion)
|
|
|
|
|
if err != nil {
|
|
|
|
|
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warnf("Transaction: Failed to parse event JSON of event")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
found = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
if err = gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
|
|
|
|
|
if !found {
|
|
|
|
|
util.GetLogger(ctx).WithField("event_id", missingEventID).Warnf("Failed to get missing /event for event ID from %d server(s)", len(servers))
|
|
|
|
|
return nil, fmt.Errorf("wasn't able to find event via %d server(s)", len(servers))
|
|
|
|
|
}
|
|
|
|
|
if err := gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
|
|
|
|
|
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
|
|
|
|
|
return nil, verifySigError{event.EventID(), err}
|
|
|
|
|
}
|
|
|
|
|
|