diff --git a/federationapi/routing/publicrooms.go b/federationapi/routing/publicrooms.go index 5b9be8807..a253f86eb 100644 --- a/federationapi/routing/publicrooms.go +++ b/federationapi/routing/publicrooms.go @@ -133,8 +133,6 @@ func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.Room util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") return nil, err } - util.GetLogger(ctx).Infof("room IDs: %+v", roomIDs) - util.GetLogger(ctx).Infof("State res: %+v", stateRes.Rooms) chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) i := 0 for roomID, data := range stateRes.Rooms { diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 8678f2240..10ce8f840 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -136,7 +136,7 @@ func (q *sendFIFOQueue) pop() (*inputTask, bool) { type inputTask struct { ctx context.Context t *txnReq - event *gomatrixserverlib.Event + event *gomatrixserverlib.HeaderedEvent wg *sync.WaitGroup err error // written back by worker, only safe to read when all tasks are done duration time.Duration // written back by worker, only safe to read when all tasks are done @@ -338,7 +338,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res task := &inputTask{ ctx: ctx, t: t, - event: event, + event: event.Headered(verRes.RoomVersion), wg: &wg, } tasks = append(tasks, task) @@ -420,12 +420,6 @@ func (t *inputWorker) run() { } } -type roomNotFoundError struct { - roomID string -} - -func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) } - func (t *txnReq) processEDUs(ctx context.Context) { for _, e := range t.EDUs { eduCountTotal.Inc() @@ -568,41 +562,43 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli } } -func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { +func (t *txnReq) processEvent(_ context.Context, e *gomatrixserverlib.HeaderedEvent) error { t.work = "" // reset from previous event - // Ask the roomserver if we know about the room and/or if we're joined - // to it. If we aren't then we won't bother processing the event. - joinedReq := api.QueryServerJoinedToRoomRequest{ - RoomID: e.RoomID(), - } - var joinedRes api.QueryServerJoinedToRoomResponse - if err := t.rsAPI.QueryServerJoinedToRoom(ctx, &joinedReq, &joinedRes); err != nil { - return fmt.Errorf("t.rsAPI.QueryServerJoinedToRoom: %w", err) - } + /* + // Ask the roomserver if we know about the room and/or if we're joined + // to it. If we aren't then we won't bother processing the event. + joinedReq := api.QueryServerJoinedToRoomRequest{ + RoomID: e.RoomID(), + } + var joinedRes api.QueryServerJoinedToRoomResponse + if err := t.rsAPI.QueryServerJoinedToRoom(ctx, &joinedReq, &joinedRes); err != nil { + return fmt.Errorf("t.rsAPI.QueryServerJoinedToRoom: %w", err) + } - if !joinedRes.RoomExists || !joinedRes.IsInRoom { - // We don't believe we're a member of this room, therefore there's - // no point in wasting work trying to figure out what to do with - // missing auth or prev events. Drop the event. - return roomNotFoundError{e.RoomID()} - } + if !joinedRes.RoomExists || !joinedRes.IsInRoom { + // We don't believe we're a member of this room, therefore there's + // no point in wasting work trying to figure out what to do with + // missing auth or prev events. Drop the event. + return roomNotFoundError{e.RoomID()} + } - // Work out if the roomserver knows everything it needs to know to auth - // the event. This includes the prev_events and auth_events. - // NOTE! This is going to include prev_events that have an empty state - // snapshot. This is because we will need to re-request the event, and - // it's /state_ids, in order for it to exist in the roomserver correctly - // before the roomserver tries to work out - stateReq := api.QueryMissingAuthPrevEventsRequest{ - RoomID: e.RoomID(), - AuthEventIDs: nil, //e.AuthEventIDs(), - PrevEventIDs: nil, //e.PrevEventIDs(), - } - var stateResp api.QueryMissingAuthPrevEventsResponse - if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil { - return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err) - } + // Work out if the roomserver knows everything it needs to know to auth + // the event. This includes the prev_events and auth_events. + // NOTE! This is going to include prev_events that have an empty state + // snapshot. This is because we will need to re-request the event, and + // it's /state_ids, in order for it to exist in the roomserver correctly + // before the roomserver tries to work out + stateReq := api.QueryMissingAuthPrevEventsRequest{ + RoomID: e.RoomID(), + AuthEventIDs: nil, //e.AuthEventIDs(), + PrevEventIDs: nil, //e.PrevEventIDs(), + } + var stateResp api.QueryMissingAuthPrevEventsResponse + if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil { + return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err) + } + */ // pass the event to the roomserver which will do auth checks // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently @@ -611,9 +607,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e context.Background(), t.rsAPI, api.KindNew, - []*gomatrixserverlib.HeaderedEvent{ - e.Headered(stateResp.RoomVersion), - }, + []*gomatrixserverlib.HeaderedEvent{e}, t.Origin, api.DoNotSendToOtherServers, nil, diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 8e6b9ed89..12577516e 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -64,7 +64,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec( func (r *Inputer) processRoomEvent( ctx context.Context, input *api.InputRoomEvent, -) (eventID string, err error) { +) (string, error) { // Measure how long it takes to process this event. started := time.Now() defer func() { @@ -105,26 +105,37 @@ func (r *Inputer) processRoomEvent( } } - missingReq := &api.QueryMissingAuthPrevEventsRequest{ - RoomID: event.RoomID(), - AuthEventIDs: event.AuthEventIDs(), - PrevEventIDs: event.PrevEventIDs(), - } missingRes := &api.QueryMissingAuthPrevEventsResponse{} + serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} if event.Type() != gomatrixserverlib.MRoomCreate { - if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil { + missingReq := &api.QueryMissingAuthPrevEventsRequest{ + RoomID: event.RoomID(), + AuthEventIDs: event.AuthEventIDs(), + PrevEventIDs: event.PrevEventIDs(), + } + if err := r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil { return "", fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err) } } + if len(missingRes.MissingAuthEventIDs) > 0 || len(missingRes.MissingPrevEventIDs) > 0 { + serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{ + RoomID: event.RoomID(), + } + if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { + return "", fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) + } + } // First of all, check that the auth events of the event are known. // If they aren't then we will ask the federation API for them. isRejected := false authEvents := gomatrixserverlib.NewAuthEvents(nil) knownEvents := map[string]*types.Event{} - if err = r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents); err != nil { + logger.Println("Starting to check for missing auth events") + if err := r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents, serverRes.ServerNames); err != nil { return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err) } + logger.Println("Checked for missing auth events") // Check if the event is allowed by its auth events. If it isn't then // we consider the event to be "rejected" — it will still be persisted. @@ -138,6 +149,9 @@ func (r *Inputer) processRoomEvent( authEventIDs := event.AuthEventIDs() authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) for _, authEventID := range authEventIDs { + if _, ok := knownEvents[authEventID]; !ok { + return "", fmt.Errorf("missing auth event %s", authEventID) + } authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID) } @@ -145,6 +159,7 @@ func (r *Inputer) processRoomEvent( if input.Kind == api.KindNew { // Check that the event passes authentication checks based on the // current room state. + var err error softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs) if err != nil { logger.WithError(err).Info("Error authing soft-failed event") @@ -187,6 +202,7 @@ func (r *Inputer) processRoomEvent( } if len(missingRes.MissingPrevEventIDs) > 0 { + logger.Println("Starting to check for missing prev events") missingState := missingStateReq{ origin: input.Origin, inputer: r, @@ -201,6 +217,7 @@ func (r *Inputer) processRoomEvent( if err = missingState.processEventWithMissingState(ctx, input.Event.Unwrap(), roomInfo.RoomVersion); err != nil { return "", fmt.Errorf("r.checkForMissingPrevEvents: %w", err) } + logger.Println("Checked for missing prev events") } if stateAtEvent.BeforeStateSnapshotNID == 0 { @@ -274,6 +291,7 @@ func (r *Inputer) checkForMissingAuthEvents( event *gomatrixserverlib.HeaderedEvent, auth *gomatrixserverlib.AuthEvents, known map[string]*types.Event, + servers []gomatrixserverlib.ServerName, ) error { authEventIDs := event.AuthEventIDs() if len(authEventIDs) == 0 { @@ -300,20 +318,11 @@ func (r *Inputer) checkForMissingAuthEvents( if len(unknown) > 0 { logger.Printf("XXX: There are %d missing auth events", len(unknown)) - - serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{ - RoomID: event.RoomID(), - } - serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} - if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { - return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) - } - - logger.Printf("XXX: Asking servers %+v", serverRes.ServerNames) + logger.Printf("XXX: Asking servers %+v", servers) var res gomatrixserverlib.RespEventAuth var found bool - for _, serverName := range serverRes.ServerNames { + for _, serverName := range servers { res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID()) if err != nil { logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err) @@ -324,7 +333,7 @@ func (r *Inputer) checkForMissingAuthEvents( break } if !found { - logger.Printf("XXX: None of the %d servers provided us with auth events", len(serverRes.ServerNames)) + logger.Printf("XXX: None of the %d servers provided us with auth events", len(servers)) return fmt.Errorf("no servers provided event auth") } @@ -334,7 +343,7 @@ func (r *Inputer) checkForMissingAuthEvents( ) { // If we already know about this event then we don't need to store // it or do anything further with it. - if _, ok := known[event.EventID()]; ok { + if ev, ok := known[event.EventID()]; ok && ev != nil { continue } @@ -358,7 +367,6 @@ func (r *Inputer) checkForMissingAuthEvents( } // Let's take a note of the fact that we now know about this event. - known[event.EventID()] = nil if err := auth.AddEvent(event); err != nil { return fmt.Errorf("auth.AddEvent: %w", err) } @@ -387,228 +395,6 @@ func (r *Inputer) checkForMissingAuthEvents( return nil } -/* -func (r *Inputer) checkForMissingPrevEvents( - ctx context.Context, - logger *logrus.Entry, - event *gomatrixserverlib.HeaderedEvent, - roomInfo *types.RoomInfo, - known map[string]*types.Event, -) error { - prevStates := map[string]*types.StateAtEvent{} - prevEventIDs := event.PrevEventIDs() - if len(prevEventIDs) == 0 && event.Type() != gomatrixserverlib.MRoomCreate { - return fmt.Errorf("expected to find some prev events for event type %q", event.Type()) - } - - for _, eventID := range prevEventIDs { - state, err := r.DB.StateAtEventIDs(ctx, []string{eventID}) - if err != nil { - if _, ok := err.(types.MissingEventError); ok { - continue - } - return fmt.Errorf("r.DB.StateAtEventIDs: %w", err) - } - if len(state) == 1 { - prevStates[eventID] = &state[0] - continue - } - } - - // If we know all of the states of the previous events then there is nothing more to - // do here, as the state across them will be resolved later. - if len(prevStates) == len(prevEventIDs) { - return nil - } - if r.FSAPI == nil { - return fmt.Errorf("cannot satisfy missing events without federation") - } - - // Ask the federation API which servers we should ask. In theory the roomserver - // doesn't need the help of the federation API to do this because we already know - // all of the membership states, it's just that the federation API tracks this in - // a table for this purpose. TODO: Work out what makes most sense here. - serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{ - RoomID: event.RoomID(), - } - serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} - if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { - return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) - } - - // Attempt to fill in the gap using /get_missing_events - // This will either: - // - fill in the gap completely then process event `e` returning no backwards extremity - // - fail to fill in the gap and tell us to terminate the transaction err=not nil - // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction - newEvents, err := r.getMissingEvents(ctx, logger, event, roomInfo, serverRes.ServerNames, known) - if err != nil { - return err - } - if len(newEvents) == 0 { - return fmt.Errorf("/get_missing_events returned no new events") - } - - return nil -} - -func (r *Inputer) getMissingEvents( - ctx context.Context, - logger *logrus.Entry, - event *gomatrixserverlib.HeaderedEvent, - roomInfo *types.RoomInfo, - servers []gomatrixserverlib.ServerName, - known map[string]*types.Event, -) (newEvents []*gomatrixserverlib.Event, err error) { - logger.Printf("XXX: get_missing_events called") - needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()}) - - // Ask the roomserver for our current forward extremities. These will form - // the "earliest" part of the `/get_missing_events` request. - req := &api.QueryLatestEventsAndStateRequest{ - RoomID: event.RoomID(), - StateToFetch: needed.Tuples(), - } - res := &api.QueryLatestEventsAndStateResponse{} - if err = r.Queryer.QueryLatestEventsAndState(ctx, req, res); err != nil { - logger.WithError(err).Warn("Failed to query latest events") - return nil, err - } - - // Accumulate the event IDs of our forward extremities for use in the request. - latestEvents := make([]string, len(res.LatestEvents)) - for i := range res.LatestEvents { - latestEvents[i] = res.LatestEvents[i].EventID - } - - var missingResp *gomatrixserverlib.RespMissingEvents - for _, server := range servers { - logger.Printf("XXX: Calling /get_missing_events via %q", server) - var m gomatrixserverlib.RespMissingEvents - if m, err = r.FSAPI.LookupMissingEvents(ctx, server, event.RoomID(), gomatrixserverlib.MissingEvents{ - Limit: 20, - EarliestEvents: latestEvents, - LatestEvents: []string{event.EventID()}, - }, event.RoomVersion); err == nil { - missingResp = &m - break - } else if errors.Is(err, context.DeadlineExceeded) { - break - } - } - - if missingResp == nil { - return nil, fmt.Errorf("/get_missing_events failed via all candidate servers") - } - if len(missingResp.Events) == 0 { - return nil, fmt.Errorf("/get_missing_events returned no events") - } - - // security: how we handle failures depends on whether or not this event will become the new forward extremity for the room. - // There's 2 scenarios to consider: - // - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true) - // - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false) - // In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room, - // as it was called in response to an inbound txn which had it as a prev_event. - // In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad - // because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked. - // https://github.com/matrix-org/synapse/pull/3456 - // https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335 - // For now, we do not allow Case B, so reject the event. - logger.Printf("XXX: get_missing_events returned %d events", len(missingResp.Events)) - - newEvents = gomatrixserverlib.ReverseTopologicalOrdering( - missingResp.Events, - gomatrixserverlib.TopologicalOrderByPrevEvents, - ) - for _, pe := range event.PrevEventIDs() { - hasPrevEvent := false - for _, ev := range newEvents { - if ev.EventID() == pe { - hasPrevEvent = true - break - } - } - if !hasPrevEvent { - logger.Errorf("Prev event %q is still missing after /get_missing_events", pe) - } - } - - backwardExtremity := newEvents[0] - fastForwardEvents := newEvents[1:] - - // Do we know about the state of the backward extremity already? - if _, err := r.DB.StateAtEventIDs(ctx, []string{backwardExtremity.EventID()}); err == nil { - // Yes, we do, so we don't need to store that event. - } else { - // No, we don't, so let's go find it. - // r.FSAPI.LookupStateIDs() - } - - for _, ev := range fastForwardEvents { - if _, err := r.processRoomEvent(ctx, &api.InputRoomEvent{ - Kind: api.KindOld, - Event: ev.Headered(event.RoomVersion), - AuthEventIDs: ev.AuthEventIDs(), - }); err != nil { - return nil, fmt.Errorf("r.processRoomEvent (prev event): %w", err) - } - } - - return newEvents, nil -} - -func (r *Inputer) lookupStateBeforeEvent( - ctx context.Context, - logger *logrus.Entry, - event *gomatrixserverlib.HeaderedEvent, - roomInfo *types.RoomInfo, - servers []gomatrixserverlib.ServerName, -) error { - knownPrevStates := map[string]types.StateAtEvent{} - unknownPrevStates := map[string]struct{}{} - neededStateEvents := map[string]struct{}{} - - for _, prevEventID := range event.PrevEventIDs() { - if state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID}); err == nil && len(state) == 1 { - knownPrevStates[prevEventID] = state[0] - } else { - unknownPrevStates[prevEventID] = struct{}{} - } - } - - for prevEventID := range unknownPrevStates { - stateIDs, err := r.FSAPI.LookupStateIDs(ctx, "TODO: SERVER", event.RoomID(), prevEventID) - if err != nil { - return fmt.Errorf("r.FSAPI.LookupStateIDs: %w", err) - } - events, err := r.DB.EventsFromIDs(ctx, stateIDs.StateEventIDs) - if err != nil { - return fmt.Errorf("r.DB.EventsFromIDs: %w", err) - } - for i, eventID := range stateIDs.StateEventIDs { - if events[i].Event == nil || events[i].EventNID == 0 { - neededStateEvents[eventID] = struct{}{} - } - } - - if len(neededStateEvents) > (len(stateIDs.StateEventIDs) / 2) { - // More than 50% of the state events are missing, so let's just - // call `/state` instead of fetching the events individually. - state, err := r.FSAPI.LookupState(ctx, "", event.RoomID(), prevEventID, roomInfo.RoomVersion) - if err != nil { - return fmt.Errorf("r.FSAPI.LookupState: %w", err) - } - knownPrevStates[prevEventID] = types.StateAtEvent{ - StateEntry: types.StateEntry{}, - } - } - } - - return nil -} -*/ - func (r *Inputer) calculateAndSetState( ctx context.Context, input *api.InputRoomEvent, diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 65d17adfd..3fd233ad4 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -146,37 +146,39 @@ func (t *missingStateReq) processEventWithMissingState( } t.hadEventsMutex.Unlock() - err = api.SendEventWithState( - context.Background(), - t.inputer, - api.KindOld, - resolvedState, - backwardsExtremity.Headered(roomVersion), - t.origin, - hadEvents, - ) + stateIDs := make([]string, len(resolvedState.StateEvents)) + for _, event := range resolvedState.StateEvents { + stateIDs = append(stateIDs, event.EventID()) + } + + _, err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{ + Kind: api.KindOld, + Event: backwardsExtremity.Headered(roomVersion), + Origin: t.origin, + AuthEventIDs: backwardsExtremity.AuthEventIDs(), + HasState: true, + StateEventIDs: stateIDs, + SendAsServer: api.DoNotSendToOtherServers, + }) if err != nil { - return fmt.Errorf("api.SendEventWithState: %w", err) + return fmt.Errorf("t.inputer.processRoomEvent: %w", err) } // Then send all of the newer backfilled events, of which will all be newer // than the backward extremity, into the roomserver without state. This way // they will automatically fast-forward based on the room state at the // extremity in the last step. - headeredNewEvents := make([]*gomatrixserverlib.HeaderedEvent, len(newEvents)) - for i, newEvent := range newEvents { - headeredNewEvents[i] = newEvent.Headered(roomVersion) - } - if err = api.SendEvents( - context.Background(), - t.inputer, - api.KindOld, - append(headeredNewEvents, e.Headered(roomVersion)), - t.origin, - api.DoNotSendToOtherServers, - nil, - ); err != nil { - return fmt.Errorf("api.SendEvents: %w", err) + for _, newEvent := range newEvents { + _, err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{ + Kind: api.KindOld, + Event: newEvent.Headered(roomVersion), + Origin: t.origin, + AuthEventIDs: backwardsExtremity.AuthEventIDs(), + SendAsServer: api.DoNotSendToOtherServers, + }) + if err != nil { + return fmt.Errorf("t.inputer.processRoomEvent: %w", err) + } } return nil @@ -627,7 +629,7 @@ func (t *missingStateReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib return &respState, nil } -func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) { +func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) { if localFirst { // fetch from the roomserver queryReq := api.QueryEventsByIDRequest{