From c97e4230aca6bb3e752f099792879164975207e6 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 16 Feb 2021 16:27:53 +0000 Subject: [PATCH] Look up servers less often, don't hit API for missing auth events unless there are actually missing auth events --- federationapi/routing/send.go | 94 ++++++++++++----------------------- 1 file changed, 33 insertions(+), 61 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 96b5355ea..6290ece0a 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -107,6 +107,7 @@ type txnReq struct { keyAPI keyapi.KeyInternalAPI keys gomatrixserverlib.JSONVerifier federation txnFederationClient + servers []gomatrixserverlib.ServerName // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. haveEvents map[string]*gomatrixserverlib.HeaderedEvent @@ -482,14 +483,10 @@ func (t *txnReq) retrieveMissingAuthEvents( missingAuthEvents[missingAuthEventID] = struct{}{} } - servers := t.getServers(ctx, e.RoomID()) - if len(servers) > 5 { - servers = servers[:5] - } withNextEvent: for missingAuthEventID := range missingAuthEvents { withNextServer: - for _, server := range servers { + for _, server := range t.servers { logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server) tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID) if err != nil { @@ -558,7 +555,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e *gomatrixse // event ids and then use /event to fetch the individual events. // However not all version of synapse support /state_ids so you may // need to fallback to /state. - + t.servers = t.getServers(ctx, e.RoomID()) // Attempt to fill in the gap using /get_missing_events // This will either: // - fill in the gap completely then process event `e` returning no backwards extremity @@ -692,13 +689,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err) } - servers := t.getServers(ctx, roomID) - if len(servers) > 5 { - servers = servers[:5] - } - // fetch the event we're missing and add it to the pile - h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers) + h, err := t.lookupEvent(ctx, roomVersion, eventID, false) switch err.(type) { case verifySigError: return respState, false, nil @@ -740,11 +732,10 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event t.haveEvents[ev.EventID()] = res.StateEvents[i] } var authEvents []*gomatrixserverlib.Event - missingAuthEvents := make(map[string]bool) + missingAuthEvents := map[string]bool{} for _, ev := range res.StateEvents { for _, ae := range ev.AuthEventIDs() { - aev, ok := t.haveEvents[ae] - if ok { + if aev, ok := t.haveEvents[ae]; ok { authEvents = append(authEvents, aev.Unwrap()) } else { missingAuthEvents[ae] = true @@ -753,27 +744,28 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event } // QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't // have stored the event. - var missingEventList []string - for evID := range missingAuthEvents { - missingEventList = append(missingEventList, evID) - } - queryReq := api.QueryEventsByIDRequest{ - EventIDs: missingEventList, - } - util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList) - var queryRes api.QueryEventsByIDResponse - if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { - return nil - } - for i := range queryRes.Events { - evID := queryRes.Events[i].EventID() - t.haveEvents[evID] = queryRes.Events[i] - authEvents = append(authEvents, queryRes.Events[i].Unwrap()) + if len(missingAuthEvents) > 0 { + var missingEventList []string + for evID := range missingAuthEvents { + missingEventList = append(missingEventList, evID) + } + queryReq := api.QueryEventsByIDRequest{ + EventIDs: missingEventList, + } + util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList) + var queryRes api.QueryEventsByIDResponse + if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { + return nil + } + for i := range queryRes.Events { + evID := queryRes.Events[i].EventID() + t.haveEvents[evID] = queryRes.Events[i] + authEvents = append(authEvents, queryRes.Events[i].Unwrap()) + } } - evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents) return &gomatrixserverlib.RespState{ - StateEvents: evs, + StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents), AuthEvents: authEvents, } } @@ -805,11 +797,7 @@ retryAllowedState: if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents); err != nil { switch missing := err.(type) { case gomatrixserverlib.MissingAuthEventError: - servers := t.getServers(ctx, backwardsExtremity.RoomID()) - if len(servers) > 5 { - servers = servers[:5] - } - h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers) + h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true) switch err2.(type) { case verifySigError: return &gomatrixserverlib.RespState{ @@ -857,18 +845,8 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even latestEvents[i] = res.LatestEvents[i].EventID } - servers := []gomatrixserverlib.ServerName{t.Origin} - serverReq := &api.QueryServerJoinedToRoomRequest{ - RoomID: e.RoomID(), - } - serverRes := &api.QueryServerJoinedToRoomResponse{} - if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { - servers = append(servers, serverRes.ServerNames...) - logger.Infof("Found %d server(s) to query for missing events", len(servers)) - } - var missingResp *gomatrixserverlib.RespMissingEvents - for _, server := range servers { + for _, server := range t.servers { var m gomatrixserverlib.RespMissingEvents if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ Limit: 20, @@ -887,7 +865,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even if missingResp == nil { logger.WithError(err).Errorf( "%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", - t.Origin, len(servers), + t.Origin, len(t.servers), ) return nil, missingPrevEventsError{ eventID: e.EventID(), @@ -1015,12 +993,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even "concurrent_requests": concurrentRequests, }).Info("Fetching missing state at event") - // Get a list of servers to fetch from. - servers := t.getServers(ctx, roomID) - if len(servers) > 5 { - servers = servers[:5] - } - // Create a queue containing all of the missing event IDs that we want // to retrieve. pending := make(chan string, missingCount) @@ -1046,7 +1018,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even // Define what we'll do in order to fetch the missing event ID. fetch := func(missingEventID string) { var h *gomatrixserverlib.HeaderedEvent - h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers) + h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false) switch err.(type) { case verifySigError: return @@ -1112,7 +1084,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat return &respState, nil } -func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) { +func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) { if localFirst { // fetch from the roomserver queryReq := api.QueryEventsByIDRequest{ @@ -1127,7 +1099,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib. } var event *gomatrixserverlib.Event found := false - for _, serverName := range servers { + for _, serverName := range t.servers { txn, err := t.federation.GetEvent(ctx, serverName, missingEventID) if err != nil || len(txn.PDUs) == 0 { util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("Failed to get missing /event for event ID") @@ -1142,8 +1114,8 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib. break } if !found { - util.GetLogger(ctx).WithField("event_id", missingEventID).Warnf("Failed to get missing /event for event ID from %d server(s)", len(servers)) - return nil, fmt.Errorf("wasn't able to find event via %d server(s)", len(servers)) + util.GetLogger(ctx).WithField("event_id", missingEventID).Warnf("Failed to get missing /event for event ID from %d server(s)", len(t.servers)) + return nil, fmt.Errorf("wasn't able to find event via %d server(s)", len(t.servers)) } if err := gomatrixserverlib.VerifyAllEventSignatures(ctx, []*gomatrixserverlib.Event{event}, t.keys); err != nil { util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())