Check missing event count

This commit is contained in:
Neil Alexander 2021-01-04 13:03:43 +00:00
parent 597350a67f
commit 04417d7083
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -1005,79 +1005,82 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
return t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion) return t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion)
} }
util.GetLogger(ctx).WithFields(logrus.Fields{ if missingCount > 0 {
"missing": missingCount, util.GetLogger(ctx).WithFields(logrus.Fields{
"event_id": eventID, "missing": missingCount,
"room_id": roomID, "event_id": eventID,
"total_state": len(stateIDs.StateEventIDs), "room_id": roomID,
"total_auth_events": len(stateIDs.AuthEventIDs), "total_state": len(stateIDs.StateEventIDs),
"concurrent_requests": concurrentRequests, "total_auth_events": len(stateIDs.AuthEventIDs),
}).Info("Fetching missing state at event") "concurrent_requests": concurrentRequests,
}).Info("Fetching missing state at event")
// Get a list of servers to fetch from. // Get a list of servers to fetch from.
servers := t.getServers(ctx, roomID) servers := t.getServers(ctx, roomID)
if len(servers) > 5 { if len(servers) > 5 {
servers = servers[:5] servers = servers[:5]
}
// Create a queue containing all of the missing event IDs that we want
// to retrieve.
pending := make(chan string, missingCount)
for missingEventID := range missing {
pending <- missingEventID
}
close(pending)
// Define how many workers we should start to do this.
if missingCount < concurrentRequests {
concurrentRequests = missingCount
}
// Create the wait group.
var fetchgroup sync.WaitGroup
fetchgroup.Add(concurrentRequests)
// This is the only place where we'll write to t.haveEvents from
// multiple goroutines, and everywhere else is blocked on this
// synchronous function anyway.
var haveEventsMutex sync.Mutex
// Define what we'll do in order to fetch the missing event ID.
fetch := func(missingEventID string) {
var h *gomatrixserverlib.HeaderedEvent
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
switch err.(type) {
case verifySigError:
return
case nil:
break
default:
util.GetLogger(ctx).WithFields(logrus.Fields{
"event_id": missingEventID,
"room_id": roomID,
}).Info("Failed to fetch missing event")
return
} }
haveEventsMutex.Lock()
t.haveEvents[h.EventID()] = h
haveEventsMutex.Unlock()
}
// Create the worker. // Create a queue containing all of the missing event IDs that we want
worker := func(ch <-chan string) { // to retrieve.
defer fetchgroup.Done() pending := make(chan string, missingCount)
for missingEventID := range ch { for missingEventID := range missing {
fetch(missingEventID) pending <- missingEventID
} }
close(pending)
// Define how many workers we should start to do this.
if missingCount < concurrentRequests {
concurrentRequests = missingCount
}
// Create the wait group.
var fetchgroup sync.WaitGroup
fetchgroup.Add(concurrentRequests)
// This is the only place where we'll write to t.haveEvents from
// multiple goroutines, and everywhere else is blocked on this
// synchronous function anyway.
var haveEventsMutex sync.Mutex
// Define what we'll do in order to fetch the missing event ID.
fetch := func(missingEventID string) {
var h *gomatrixserverlib.HeaderedEvent
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
switch err.(type) {
case verifySigError:
return
case nil:
break
default:
util.GetLogger(ctx).WithFields(logrus.Fields{
"event_id": missingEventID,
"room_id": roomID,
}).Info("Failed to fetch missing event")
return
}
haveEventsMutex.Lock()
t.haveEvents[h.EventID()] = h
haveEventsMutex.Unlock()
}
// Create the worker.
worker := func(ch <-chan string) {
defer fetchgroup.Done()
for missingEventID := range ch {
fetch(missingEventID)
}
}
// Start the workers.
for i := 0; i < concurrentRequests; i++ {
go worker(pending)
}
// Wait for the workers to finish.
fetchgroup.Wait()
} }
// Start the workers.
for i := 0; i < concurrentRequests; i++ {
go worker(pending)
}
// Wait for the workers to finish.
fetchgroup.Wait()
resp, err := t.createRespStateFromStateIDs(stateIDs) resp, err := t.createRespStateFromStateIDs(stateIDs)
return resp, err return resp, err
} }