mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-28 09:13:09 -06:00
Optimise memory usage when calling /g_m_e
This commit is contained in:
parent
d27607af78
commit
65e85e3da1
|
|
@ -620,7 +620,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
|
||||||
return gomatrixserverlib.Allowed(e, &authUsingState)
|
return gomatrixserverlib.Allowed(e, &authUsingState)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) processEventWithMissingState(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
|
func (t *txnReq) processEventWithMissingState(
|
||||||
|
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
|
) error {
|
||||||
// Do this with a fresh context, so that we keep working even if the
|
// Do this with a fresh context, so that we keep working even if the
|
||||||
// original request times out. With any luck, by the time the remote
|
// original request times out. With any luck, by the time the remote
|
||||||
// side retries, we'll have fetched the missing state.
|
// side retries, we'll have fetched the missing state.
|
||||||
|
|
@ -803,6 +805,14 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
|
||||||
return respState, false, nil
|
return respState, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *txnReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent {
|
||||||
|
if cached, exists := t.haveEvents[ev.EventID()]; exists {
|
||||||
|
return cached
|
||||||
|
}
|
||||||
|
t.haveEvents[ev.EventID()] = ev
|
||||||
|
return ev
|
||||||
|
}
|
||||||
|
|
||||||
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
|
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
|
||||||
var res api.QueryStateAfterEventsResponse
|
var res api.QueryStateAfterEventsResponse
|
||||||
err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
|
err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
|
||||||
|
|
@ -810,15 +820,21 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
PrevEventIDs: []string{eventID},
|
PrevEventIDs: []string{eventID},
|
||||||
}, &res)
|
}, &res)
|
||||||
if err != nil || !res.PrevEventsExist {
|
if err != nil || !res.PrevEventsExist {
|
||||||
util.GetLogger(ctx).WithError(err).Warnf("failed to query state after %s locally", eventID)
|
util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to query state after %s locally", eventID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
stateEvents := make([]*gomatrixserverlib.HeaderedEvent, len(res.StateEvents))
|
||||||
for i, ev := range res.StateEvents {
|
for i, ev := range res.StateEvents {
|
||||||
t.haveEvents[ev.EventID()] = res.StateEvents[i]
|
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
|
||||||
|
// processEvent request, which is better for memory.
|
||||||
|
stateEvents[i] = t.cacheAndReturn(ev)
|
||||||
}
|
}
|
||||||
|
// we should never access res.StateEvents again so we delete it here to make GC faster
|
||||||
|
res.StateEvents = nil
|
||||||
|
|
||||||
var authEvents []*gomatrixserverlib.Event
|
var authEvents []*gomatrixserverlib.Event
|
||||||
missingAuthEvents := map[string]bool{}
|
missingAuthEvents := map[string]bool{}
|
||||||
for _, ev := range res.StateEvents {
|
for _, ev := range stateEvents {
|
||||||
for _, ae := range ev.AuthEventIDs() {
|
for _, ae := range ev.AuthEventIDs() {
|
||||||
if aev, ok := t.haveEvents[ae]; ok {
|
if aev, ok := t.haveEvents[ae]; ok {
|
||||||
authEvents = append(authEvents, aev.Unwrap())
|
authEvents = append(authEvents, aev.Unwrap())
|
||||||
|
|
@ -843,14 +859,13 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for i := range queryRes.Events {
|
for i := range queryRes.Events {
|
||||||
evID := queryRes.Events[i].EventID()
|
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
|
||||||
t.haveEvents[evID] = queryRes.Events[i]
|
|
||||||
authEvents = append(authEvents, queryRes.Events[i].Unwrap())
|
|
||||||
}
|
}
|
||||||
|
queryRes.Events = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return &gomatrixserverlib.RespState{
|
return &gomatrixserverlib.RespState{
|
||||||
StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents),
|
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
|
||||||
AuthEvents: authEvents,
|
AuthEvents: authEvents,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -860,8 +875,6 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
func (t *txnReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
func (t *txnReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
||||||
*gomatrixserverlib.RespState, error) {
|
*gomatrixserverlib.RespState, error) {
|
||||||
|
|
||||||
util.GetLogger(ctx).Infof("lookupStateBeforeEvent %s", eventID)
|
|
||||||
|
|
||||||
// Attempt to fetch the missing state using /state_ids and /events
|
// Attempt to fetch the missing state using /state_ids and /events
|
||||||
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
||||||
}
|
}
|
||||||
|
|
@ -992,7 +1005,6 @@ Event:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we processed everything!
|
|
||||||
return newEvents, nil
|
return newEvents, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1011,7 +1023,7 @@ func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID
|
||||||
|
|
||||||
func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
*gomatrixserverlib.RespState, error) {
|
*gomatrixserverlib.RespState, error) {
|
||||||
util.GetLogger(ctx).Infof("lookupMissingStateViaStateIDs %s", eventID)
|
util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
|
||||||
// fetch the state event IDs at the time of the event
|
// fetch the state event IDs at the time of the event
|
||||||
stateIDs, err := t.federation.LookupStateIDs(ctx, t.Origin, roomID, eventID)
|
stateIDs, err := t.federation.LookupStateIDs(ctx, t.Origin, roomID, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue