Optimise memory usage when calling /g_m_e (#1819)
* Optimise memory usage when calling /g_m_e * cache more events * refactor handling of device list update pokes * Sigh
This commit is contained in:
parent
5ade348d14
commit
b769d5a25e
|
@ -620,7 +620,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
|
||||||
return gomatrixserverlib.Allowed(e, &authUsingState)
|
return gomatrixserverlib.Allowed(e, &authUsingState)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) processEventWithMissingState(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
|
func (t *txnReq) processEventWithMissingState(
|
||||||
|
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
|
) error {
|
||||||
// Do this with a fresh context, so that we keep working even if the
|
// Do this with a fresh context, so that we keep working even if the
|
||||||
// original request times out. With any luck, by the time the remote
|
// original request times out. With any luck, by the time the remote
|
||||||
// side retries, we'll have fetched the missing state.
|
// side retries, we'll have fetched the missing state.
|
||||||
|
@ -784,7 +786,7 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
|
||||||
default:
|
default:
|
||||||
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
|
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
|
||||||
}
|
}
|
||||||
t.haveEvents[h.EventID()] = h
|
t.cacheAndReturn(h)
|
||||||
if h.StateKey() != nil {
|
if h.StateKey() != nil {
|
||||||
addedToState := false
|
addedToState := false
|
||||||
for i := range respState.StateEvents {
|
for i := range respState.StateEvents {
|
||||||
|
@ -803,6 +805,14 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
|
||||||
return respState, false, nil
|
return respState, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *txnReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent {
|
||||||
|
if cached, exists := t.haveEvents[ev.EventID()]; exists {
|
||||||
|
return cached
|
||||||
|
}
|
||||||
|
t.haveEvents[ev.EventID()] = ev
|
||||||
|
return ev
|
||||||
|
}
|
||||||
|
|
||||||
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
|
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
|
||||||
var res api.QueryStateAfterEventsResponse
|
var res api.QueryStateAfterEventsResponse
|
||||||
err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
|
err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
|
||||||
|
@ -810,15 +820,21 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
PrevEventIDs: []string{eventID},
|
PrevEventIDs: []string{eventID},
|
||||||
}, &res)
|
}, &res)
|
||||||
if err != nil || !res.PrevEventsExist {
|
if err != nil || !res.PrevEventsExist {
|
||||||
util.GetLogger(ctx).WithError(err).Warnf("failed to query state after %s locally", eventID)
|
util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to query state after %s locally, prev exists=%v", eventID, res.PrevEventsExist)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
stateEvents := make([]*gomatrixserverlib.HeaderedEvent, len(res.StateEvents))
|
||||||
for i, ev := range res.StateEvents {
|
for i, ev := range res.StateEvents {
|
||||||
t.haveEvents[ev.EventID()] = res.StateEvents[i]
|
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
|
||||||
|
// processEvent request, which is better for memory.
|
||||||
|
stateEvents[i] = t.cacheAndReturn(ev)
|
||||||
}
|
}
|
||||||
|
// we should never access res.StateEvents again so we delete it here to make GC faster
|
||||||
|
res.StateEvents = nil
|
||||||
|
|
||||||
var authEvents []*gomatrixserverlib.Event
|
var authEvents []*gomatrixserverlib.Event
|
||||||
missingAuthEvents := map[string]bool{}
|
missingAuthEvents := map[string]bool{}
|
||||||
for _, ev := range res.StateEvents {
|
for _, ev := range stateEvents {
|
||||||
for _, ae := range ev.AuthEventIDs() {
|
for _, ae := range ev.AuthEventIDs() {
|
||||||
if aev, ok := t.haveEvents[ae]; ok {
|
if aev, ok := t.haveEvents[ae]; ok {
|
||||||
authEvents = append(authEvents, aev.Unwrap())
|
authEvents = append(authEvents, aev.Unwrap())
|
||||||
|
@ -843,14 +859,13 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for i := range queryRes.Events {
|
for i := range queryRes.Events {
|
||||||
evID := queryRes.Events[i].EventID()
|
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
|
||||||
t.haveEvents[evID] = queryRes.Events[i]
|
|
||||||
authEvents = append(authEvents, queryRes.Events[i].Unwrap())
|
|
||||||
}
|
}
|
||||||
|
queryRes.Events = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return &gomatrixserverlib.RespState{
|
return &gomatrixserverlib.RespState{
|
||||||
StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents),
|
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
|
||||||
AuthEvents: authEvents,
|
AuthEvents: authEvents,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -860,8 +875,6 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
func (t *txnReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
func (t *txnReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
||||||
*gomatrixserverlib.RespState, error) {
|
*gomatrixserverlib.RespState, error) {
|
||||||
|
|
||||||
util.GetLogger(ctx).Infof("lookupStateBeforeEvent %s", eventID)
|
|
||||||
|
|
||||||
// Attempt to fetch the missing state using /state_ids and /events
|
// Attempt to fetch the missing state using /state_ids and /events
|
||||||
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
||||||
}
|
}
|
||||||
|
@ -992,7 +1005,6 @@ Event:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we processed everything!
|
|
||||||
return newEvents, nil
|
return newEvents, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1011,7 +1023,7 @@ func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID
|
||||||
|
|
||||||
func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
*gomatrixserverlib.RespState, error) {
|
*gomatrixserverlib.RespState, error) {
|
||||||
util.GetLogger(ctx).Infof("lookupMissingStateViaStateIDs %s", eventID)
|
util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
|
||||||
// fetch the state event IDs at the time of the event
|
// fetch the state event IDs at the time of the event
|
||||||
stateIDs, err := t.federation.LookupStateIDs(ctx, t.Origin, roomID, eventID)
|
stateIDs, err := t.federation.LookupStateIDs(ctx, t.Origin, roomID, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1040,14 +1052,16 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
||||||
}
|
}
|
||||||
for i := range queryRes.Events {
|
for i := range queryRes.Events {
|
||||||
evID := queryRes.Events[i].EventID()
|
evID := queryRes.Events[i].EventID()
|
||||||
t.haveEvents[evID] = queryRes.Events[i]
|
t.cacheAndReturn(queryRes.Events[i])
|
||||||
if missing[evID] {
|
if missing[evID] {
|
||||||
delete(missing, evID)
|
delete(missing, evID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
queryRes.Events = nil // allow it to be GCed
|
||||||
|
|
||||||
concurrentRequests := 8
|
concurrentRequests := 8
|
||||||
missingCount := len(missing)
|
missingCount := len(missing)
|
||||||
|
util.GetLogger(ctx).WithField("room_id", roomID).WithField("event_id", eventID).Infof("lookupMissingStateViaStateIDs missing %d/%d events", missingCount, len(wantIDs))
|
||||||
|
|
||||||
// If over 50% of the auth/state events from /state_ids are missing
|
// If over 50% of the auth/state events from /state_ids are missing
|
||||||
// then we'll just call /state instead, otherwise we'll just end up
|
// then we'll just call /state instead, otherwise we'll just end up
|
||||||
|
@ -1112,7 +1126,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
haveEventsMutex.Lock()
|
haveEventsMutex.Lock()
|
||||||
t.haveEvents[h.EventID()] = h
|
t.cacheAndReturn(h)
|
||||||
haveEventsMutex.Unlock()
|
haveEventsMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -72,10 +72,8 @@ func init() {
|
||||||
// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests
|
// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests
|
||||||
// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse
|
// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse
|
||||||
// than being stuck behind foo.bar
|
// than being stuck behind foo.bar
|
||||||
// In the event that the query fails, the worker spins up a short-lived goroutine whose sole purpose is to inject the server
|
// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
|
||||||
// name back into the channel after a certain amount of time. If in the interim the device lists have been updated, then
|
// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
|
||||||
// the database query will return no stale lists. Reinjection into the channel continues until success or the server terminates,
|
|
||||||
// when it will be reloaded on startup.
|
|
||||||
type DeviceListUpdater struct {
|
type DeviceListUpdater struct {
|
||||||
// A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1
|
// A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1
|
||||||
// request to the remote server and race.
|
// request to the remote server and race.
|
||||||
|
@ -297,42 +295,38 @@ func (u *DeviceListUpdater) clearChannel(userID string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
|
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
|
||||||
// It's possible to get many of the same server name in the channel, so in order
|
retries := make(map[gomatrixserverlib.ServerName]time.Time)
|
||||||
// to prevent processing the same server over and over we keep track of when we
|
retriesMu := &sync.Mutex{}
|
||||||
// last made a request to the server. If we get the server name during the cooloff
|
// restarter goroutine which will inject failed servers into ch when it is time
|
||||||
// period, we'll ignore the poke.
|
go func() {
|
||||||
lastProcessed := make(map[gomatrixserverlib.ServerName]time.Time)
|
for {
|
||||||
// this can't be too long else sytest will give up trying to do a test
|
var serversToRetry []gomatrixserverlib.ServerName
|
||||||
cooloffPeriod := 500 * time.Millisecond
|
time.Sleep(time.Second)
|
||||||
shouldProcess := func(srv gomatrixserverlib.ServerName) bool {
|
retriesMu.Lock()
|
||||||
// we should process requests when now is after the last process time + cooloff
|
now := time.Now()
|
||||||
return time.Now().After(lastProcessed[srv].Add(cooloffPeriod))
|
for srv, retryAt := range retries {
|
||||||
|
if now.After(retryAt) {
|
||||||
|
serversToRetry = append(serversToRetry, srv)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// on failure, spin up a short-lived goroutine to inject the server name again.
|
for _, srv := range serversToRetry {
|
||||||
scheduledRetries := make(map[gomatrixserverlib.ServerName]time.Time)
|
delete(retries, srv)
|
||||||
inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) {
|
}
|
||||||
time.Sleep(duration)
|
retriesMu.Unlock()
|
||||||
|
for _, srv := range serversToRetry {
|
||||||
ch <- srv
|
ch <- srv
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
for serverName := range ch {
|
for serverName := range ch {
|
||||||
if !shouldProcess(serverName) {
|
|
||||||
if time.Now().Before(scheduledRetries[serverName]) {
|
|
||||||
// do not inject into the channel as we know there will be a sleeping goroutine
|
|
||||||
// which will do it after the cooloff period expires
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
|
|
||||||
go inject(serverName, cooloffPeriod)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lastProcessed[serverName] = time.Now()
|
|
||||||
waitTime, shouldRetry := u.processServer(serverName)
|
waitTime, shouldRetry := u.processServer(serverName)
|
||||||
if shouldRetry {
|
if shouldRetry {
|
||||||
scheduledRetries[serverName] = time.Now().Add(waitTime)
|
retriesMu.Lock()
|
||||||
go inject(serverName, waitTime)
|
_, exists := retries[serverName]
|
||||||
|
if !exists {
|
||||||
|
retries[serverName] = time.Now().Add(waitTime)
|
||||||
|
}
|
||||||
|
retriesMu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -380,7 +374,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if failCount > 0 {
|
if failCount > 0 {
|
||||||
logger.WithField("total", len(userIDs)).WithField("failed", failCount).Error("failed to query device keys for some users")
|
logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Error("failed to query device keys for some users")
|
||||||
}
|
}
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
// always clear the channel to unblock Update calls regardless of success/failure
|
// always clear the channel to unblock Update calls regardless of success/failure
|
||||||
|
|
Loading…
Reference in a new issue