mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-29 01:33:10 -06:00
Add hadEvent function
This commit is contained in:
parent
60e9256af6
commit
f269e43791
|
|
@ -241,6 +241,12 @@ type txnReq struct {
|
||||||
work string // metrics
|
work string // metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *txnReq) hadEvent(eventID string, had bool) {
|
||||||
|
t.hadEventsMutex.Lock()
|
||||||
|
defer t.hadEventsMutex.Unlock()
|
||||||
|
t.hadEvents[eventID] = had
|
||||||
|
}
|
||||||
|
|
||||||
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
||||||
type txnFederationClient interface {
|
type txnFederationClient interface {
|
||||||
LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
|
|
@ -595,14 +601,12 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
|
||||||
|
|
||||||
// Prepare a map of all the events we already had before this point, so
|
// Prepare a map of all the events we already had before this point, so
|
||||||
// that we don't send them to the roomserver again.
|
// that we don't send them to the roomserver again.
|
||||||
t.hadEventsMutex.Lock()
|
|
||||||
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
|
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
|
||||||
t.hadEvents[eventID] = true
|
t.hadEvent(eventID, true)
|
||||||
}
|
}
|
||||||
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
|
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
|
||||||
t.hadEvents[eventID] = false
|
t.hadEvent(eventID, false)
|
||||||
}
|
}
|
||||||
t.hadEventsMutex.Unlock()
|
|
||||||
|
|
||||||
if len(stateResp.MissingAuthEventIDs) > 0 {
|
if len(stateResp.MissingAuthEventIDs) > 0 {
|
||||||
t.work = MetricsWorkMissingAuthEvents
|
t.work = MetricsWorkMissingAuthEvents
|
||||||
|
|
@ -676,9 +680,7 @@ withNextEvent:
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return fmt.Errorf("api.SendEvents: %w", err)
|
return fmt.Errorf("api.SendEvents: %w", err)
|
||||||
}
|
}
|
||||||
t.hadEventsMutex.Lock()
|
t.hadEvent(ev.EventID(), true) // if the roomserver didn't know about the event before, it does now
|
||||||
t.hadEvents[ev.EventID()] = true // if the roomserver didn't know about the event before, it does now
|
|
||||||
t.hadEventsMutex.Unlock()
|
|
||||||
t.cacheAndReturn(ev.Headered(stateResp.RoomVersion))
|
t.cacheAndReturn(ev.Headered(stateResp.RoomVersion))
|
||||||
delete(missingAuthEvents, missingAuthEventID)
|
delete(missingAuthEvents, missingAuthEventID)
|
||||||
continue withNextEvent
|
continue withNextEvent
|
||||||
|
|
@ -918,9 +920,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
|
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
|
||||||
// processEvent request, which is better for memory.
|
// processEvent request, which is better for memory.
|
||||||
stateEvents[i] = t.cacheAndReturn(ev)
|
stateEvents[i] = t.cacheAndReturn(ev)
|
||||||
t.hadEventsMutex.Lock()
|
t.hadEvent(ev.EventID(), true)
|
||||||
t.hadEvents[ev.EventID()] = true
|
|
||||||
t.hadEventsMutex.Unlock()
|
|
||||||
}
|
}
|
||||||
// we should never access res.StateEvents again so we delete it here to make GC faster
|
// we should never access res.StateEvents again so we delete it here to make GC faster
|
||||||
res.StateEvents = nil
|
res.StateEvents = nil
|
||||||
|
|
@ -955,9 +955,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
||||||
}
|
}
|
||||||
for i, ev := range queryRes.Events {
|
for i, ev := range queryRes.Events {
|
||||||
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
|
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
|
||||||
t.hadEventsMutex.Lock()
|
t.hadEvent(ev.EventID(), true)
|
||||||
t.hadEvents[ev.EventID()] = true
|
|
||||||
t.hadEventsMutex.Unlock()
|
|
||||||
}
|
}
|
||||||
queryRes.Events = nil
|
queryRes.Events = nil
|
||||||
}
|
}
|
||||||
|
|
@ -1034,9 +1032,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
|
||||||
latestEvents := make([]string, len(res.LatestEvents))
|
latestEvents := make([]string, len(res.LatestEvents))
|
||||||
for i, ev := range res.LatestEvents {
|
for i, ev := range res.LatestEvents {
|
||||||
latestEvents[i] = res.LatestEvents[i].EventID
|
latestEvents[i] = res.LatestEvents[i].EventID
|
||||||
t.hadEventsMutex.Lock()
|
t.hadEvent(ev.EventID, true)
|
||||||
t.hadEvents[ev.EventID] = true
|
|
||||||
t.hadEventsMutex.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var missingResp *gomatrixserverlib.RespMissingEvents
|
var missingResp *gomatrixserverlib.RespMissingEvents
|
||||||
|
|
@ -1172,9 +1168,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
||||||
}
|
}
|
||||||
for i, ev := range queryRes.Events {
|
for i, ev := range queryRes.Events {
|
||||||
queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i])
|
queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i])
|
||||||
t.hadEventsMutex.Lock()
|
t.hadEvent(ev.EventID(), true)
|
||||||
t.hadEvents[ev.EventID()] = true
|
|
||||||
t.hadEventsMutex.Unlock()
|
|
||||||
evID := queryRes.Events[i].EventID()
|
evID := queryRes.Events[i].EventID()
|
||||||
if missing[evID] {
|
if missing[evID] {
|
||||||
delete(missing, evID)
|
delete(missing, evID)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue