Use synchronous contexts, limit time to fetch missing events

This commit is contained in:
Neil Alexander 2022-01-07 11:57:11 +00:00
parent af34b4abe3
commit 10e1d347e2
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 29 additions and 20 deletions

View file

@ -126,7 +126,7 @@ func (r *Inputer) InputRoomEvents(
inputRoomEvent := e inputRoomEvent := e
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
err := r.processRoomEvent(context.TODO(), &inputRoomEvent) err := r.processRoomEvent(ctx, &inputRoomEvent)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
} else { } else {
@ -142,6 +142,7 @@ func (r *Inputer) InputRoomEvents(
for i := 0; i < len(request.InputRoomEvents); i++ { for i := 0; i < len(request.InputRoomEvents); i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
response.ErrMsg = context.DeadlineExceeded.Error()
return return
case err := <-responses: case err := <-responses:
if err != nil { if err != nil {

View file

@ -84,7 +84,7 @@ func (r *Inputer) processRoomEvent(
}) })
if input.Origin == "" { if input.Origin == "" {
input.Origin = event.Origin() // input.Origin = event.Origin()
} }
logger.Println("XXX: Processing event") logger.Println("XXX: Processing event")
@ -131,17 +131,18 @@ func (r *Inputer) processRoomEvent(
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
} }
} }
if input.Origin != "" {
serverRes.ServerNames = append([]gomatrixserverlib.ServerName{input.Origin}, serverRes.ServerNames...)
}
// First of all, check that the auth events of the event are known. // First of all, check that the auth events of the event are known.
// If they aren't then we will ask the federation API for them. // If they aren't then we will ask the federation API for them.
isRejected := false isRejected := false
authEvents := gomatrixserverlib.NewAuthEvents(nil) authEvents := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{} knownEvents := map[string]*types.Event{}
logger.Println("Starting to check for missing auth events")
if err = r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents, serverRes.ServerNames); err != nil { if err = r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
return fmt.Errorf("r.checkForMissingAuthEvents: %w", err) return fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
} }
logger.Println("Checked for missing auth events")
// Check if the event is allowed by its auth events. If it isn't then // Check if the event is allowed by its auth events. If it isn't then
// we consider the event to be "rejected" — it will still be persisted. // we consider the event to be "rejected" — it will still be persisted.
@ -172,6 +173,7 @@ func (r *Inputer) processRoomEvent(
} }
if input.Kind != api.KindOutlier && len(missingRes.MissingPrevEventIDs) > 0 { if input.Kind != api.KindOutlier && len(missingRes.MissingPrevEventIDs) > 0 {
if len(serverRes.ServerNames) > 0 {
missingState := missingStateReq{ missingState := missingStateReq{
origin: input.Origin, origin: input.Origin,
inputer: r, inputer: r,
@ -180,7 +182,7 @@ func (r *Inputer) processRoomEvent(
federation: r.FSAPI, federation: r.FSAPI,
keys: r.KeyRing, keys: r.KeyRing,
roomsMu: internal.NewMutexByRoom(), roomsMu: internal.NewMutexByRoom(),
servers: []gomatrixserverlib.ServerName{input.Origin}, servers: serverRes.ServerNames,
hadEvents: map[string]bool{}, hadEvents: map[string]bool{},
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
} }
@ -188,6 +190,10 @@ func (r *Inputer) processRoomEvent(
isRejected = true isRejected = true
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
} }
} else {
isRejected = true
rejectionErr = fmt.Errorf("missing prev events and no other servers to ask")
}
} }
// Store the event. // Store the event.

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"time"
fedapi "github.com/matrix-org/dendrite/federationapi/api" fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -369,9 +370,10 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
var missingResp *gomatrixserverlib.RespMissingEvents var missingResp *gomatrixserverlib.RespMissingEvents
for _, server := range t.servers { for _, server := range t.servers {
logger.Infof("Trying server %q for missing events", server)
var m gomatrixserverlib.RespMissingEvents var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ rctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if m, err = t.federation.LookupMissingEvents(rctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20, Limit: 20,
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events. // The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
EarliestEvents: latestEvents, EarliestEvents: latestEvents,