mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Add StartTask
This commit is contained in:
parent
208b77d339
commit
96432760b3
|
|
@ -51,7 +51,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
||||||
response *api.RoomAliasExistsResponse,
|
response *api.RoomAliasExistsResponse,
|
||||||
) error {
|
) error {
|
||||||
trace, ctx := internal.StartRegion(ctx, "ApplicationServiceRoomAlias")
|
trace, ctx := internal.StartRegion(ctx, "ApplicationServiceRoomAlias")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// Determine which application service should handle this request
|
// Determine which application service should handle this request
|
||||||
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
||||||
|
|
@ -118,7 +118,7 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
||||||
response *api.UserIDExistsResponse,
|
response *api.UserIDExistsResponse,
|
||||||
) error {
|
) error {
|
||||||
trace, ctx := internal.StartRegion(ctx, "ApplicationServiceUserID")
|
trace, ctx := internal.StartRegion(ctx, "ApplicationServiceUserID")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// Determine which application service should handle this request
|
// Determine which application service should handle this request
|
||||||
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
"os"
|
"os"
|
||||||
"runtime/trace"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
|
|
@ -186,10 +185,8 @@ func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, task := trace.NewTask(req.Context(), metricsName)
|
trace, ctx := internal.StartTask(req.Context(), metricsName)
|
||||||
defer task.End()
|
defer trace.EndTask()
|
||||||
trace, ctx := internal.StartRegion(ctx, metricsName)
|
|
||||||
defer trace.End()
|
|
||||||
req = req.WithContext(ctx)
|
req = req.WithContext(ctx)
|
||||||
h.ServeHTTP(nextWriter, req)
|
h.ServeHTTP(nextWriter, req)
|
||||||
|
|
||||||
|
|
@ -202,8 +199,8 @@ func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse
|
||||||
// This is used to serve HTML alongside JSON error messages
|
// This is used to serve HTML alongside JSON error messages
|
||||||
func MakeHTMLAPI(metricsName string, enableMetrics bool, f func(http.ResponseWriter, *http.Request)) http.Handler {
|
func MakeHTMLAPI(metricsName string, enableMetrics bool, f func(http.ResponseWriter, *http.Request)) http.Handler {
|
||||||
withSpan := func(w http.ResponseWriter, req *http.Request) {
|
withSpan := func(w http.ResponseWriter, req *http.Request) {
|
||||||
trace, ctx := internal.StartRegion(req.Context(), metricsName)
|
trace, ctx := internal.StartTask(req.Context(), metricsName)
|
||||||
defer trace.End()
|
defer trace.EndTask()
|
||||||
req = req.WithContext(ctx)
|
req = req.WithContext(ctx)
|
||||||
f(w, req)
|
f(w, req)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ func NewHTTPClient(disableTLSValidation bool) Client {
|
||||||
|
|
||||||
func (h *httpClient) Notify(ctx context.Context, url string, req *NotifyRequest, resp *NotifyResponse) error {
|
func (h *httpClient) Notify(ctx context.Context, url string, req *NotifyRequest, resp *NotifyResponse) error {
|
||||||
trace, ctx := internal.StartRegion(ctx, "Notify")
|
trace, ctx := internal.StartRegion(ctx, "Notify")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
body, err := json.Marshal(req)
|
body, err := json.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,16 @@ import (
|
||||||
type Trace struct {
|
type Trace struct {
|
||||||
span opentracing.Span
|
span opentracing.Span
|
||||||
region *trace.Region
|
region *trace.Region
|
||||||
|
task *trace.Task
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartTask(inCtx context.Context, name string) (Trace, context.Context) {
|
||||||
|
ctx, task := trace.NewTask(inCtx, name)
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, name)
|
||||||
|
return Trace{
|
||||||
|
span: span,
|
||||||
|
task: task,
|
||||||
|
}, ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartRegion(inCtx context.Context, name string) (Trace, context.Context) {
|
func StartRegion(inCtx context.Context, name string) (Trace, context.Context) {
|
||||||
|
|
@ -35,9 +45,18 @@ func StartRegion(inCtx context.Context, name string) (Trace, context.Context) {
|
||||||
}, ctx
|
}, ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t Trace) End() {
|
func (t Trace) EndRegion() {
|
||||||
t.span.Finish()
|
t.span.Finish()
|
||||||
t.region.End()
|
if t.region != nil {
|
||||||
|
t.region.End()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t Trace) EndTask() {
|
||||||
|
t.span.Finish()
|
||||||
|
if t.task != nil {
|
||||||
|
t.task.End()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t Trace) SetTag(key string, value any) {
|
func (t Trace) SetTag(key string, value any) {
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,16 @@ import (
|
||||||
func TestTracing(t *testing.T) {
|
func TestTracing(t *testing.T) {
|
||||||
inCtx := context.Background()
|
inCtx := context.Background()
|
||||||
|
|
||||||
tr, ctx := StartRegion(inCtx, "testing")
|
task, ctx := StartTask(inCtx, "testing")
|
||||||
assert.NotNil(t, ctx)
|
assert.NotNil(t, ctx)
|
||||||
|
assert.NotNil(t, task)
|
||||||
assert.NotEqual(t, inCtx, ctx)
|
assert.NotEqual(t, inCtx, ctx)
|
||||||
assert.NotNil(t, tr)
|
task.SetTag("key", "value")
|
||||||
tr.SetTag("key", "value")
|
|
||||||
defer tr.End()
|
region, ctx2 := StartRegion(ctx, "testing")
|
||||||
|
assert.NotNil(t, ctx)
|
||||||
|
assert.NotNil(t, region)
|
||||||
|
assert.NotEqual(t, ctx, ctx2)
|
||||||
|
defer task.EndTask()
|
||||||
|
defer region.EndRegion()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
trace, ctx := internal.StartRegion(ctx, "processRoomEvent")
|
trace, ctx := internal.StartRegion(ctx, "processRoomEvent")
|
||||||
trace.SetTag("room_id", input.Event.RoomID())
|
trace.SetTag("room_id", input.Event.RoomID())
|
||||||
trace.SetTag("event_id", input.Event.EventID())
|
trace.SetTag("event_id", input.Event.EventID())
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// Measure how long it takes to process this event.
|
// Measure how long it takes to process this event.
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
|
|
@ -608,7 +608,7 @@ func (r *Inputer) fetchAuthEvents(
|
||||||
servers []gomatrixserverlib.ServerName,
|
servers []gomatrixserverlib.ServerName,
|
||||||
) error {
|
) error {
|
||||||
trace, ctx := internal.StartRegion(ctx, "fetchAuthEvents")
|
trace, ctx := internal.StartRegion(ctx, "fetchAuthEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
unknown := map[string]struct{}{}
|
unknown := map[string]struct{}{}
|
||||||
authEventIDs := event.AuthEventIDs()
|
authEventIDs := event.AuthEventIDs()
|
||||||
|
|
@ -753,7 +753,7 @@ func (r *Inputer) calculateAndSetState(
|
||||||
isRejected bool,
|
isRejected bool,
|
||||||
) error {
|
) error {
|
||||||
trace, ctx := internal.StartRegion(ctx, "calculateAndSetState")
|
trace, ctx := internal.StartRegion(ctx, "calculateAndSetState")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
var succeeded bool
|
var succeeded bool
|
||||||
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ func (r *Inputer) updateLatestEvents(
|
||||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "updateLatestEvents")
|
trace, ctx := internal.StartRegion(ctx, "updateLatestEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
var succeeded bool
|
var succeeded bool
|
||||||
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
||||||
|
|
@ -210,7 +210,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
|
|
||||||
func (u *latestEventsUpdater) latestState() error {
|
func (u *latestEventsUpdater) latestState() error {
|
||||||
trace, ctx := internal.StartRegion(u.ctx, "processEventWithMissingState")
|
trace, ctx := internal.StartRegion(u.ctx, "processEventWithMissingState")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
roomState := state.NewStateResolution(u.updater, u.roomInfo)
|
roomState := state.NewStateResolution(u.updater, u.roomInfo)
|
||||||
|
|
@ -330,7 +330,7 @@ func (u *latestEventsUpdater) calculateLatest(
|
||||||
newStateAndRef types.StateAtEventAndReference,
|
newStateAndRef types.StateAtEventAndReference,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
trace, _ := internal.StartRegion(u.ctx, "calculateLatest")
|
trace, _ := internal.StartRegion(u.ctx, "calculateLatest")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// First of all, get a list of all of the events in our current
|
// First of all, get a list of all of the events in our current
|
||||||
// set of forward extremities.
|
// set of forward extremities.
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ func (r *Inputer) updateMemberships(
|
||||||
removed, added []types.StateEntry,
|
removed, added []types.StateEntry,
|
||||||
) ([]api.OutputEvent, error) {
|
) ([]api.OutputEvent, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "updateMemberships")
|
trace, ctx := internal.StartRegion(ctx, "updateMemberships")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
changes := membershipChanges(removed, added)
|
changes := membershipChanges(removed, added)
|
||||||
var eventNIDs []types.EventNID
|
var eventNIDs []types.EventNID
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
) (*parsedRespState, error) {
|
) (*parsedRespState, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState")
|
trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// We are missing the previous events for this events.
|
// We are missing the previous events for this events.
|
||||||
// This means that there is a gap in our view of the history of the
|
// This means that there is a gap in our view of the history of the
|
||||||
|
|
@ -242,7 +242,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
|
|
||||||
func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
|
func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "lookupResolvedStateBeforeEvent")
|
trace, ctx := internal.StartRegion(ctx, "lookupResolvedStateBeforeEvent")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
type respState struct {
|
type respState struct {
|
||||||
// A snapshot is considered trustworthy if it came from our own roomserver.
|
// A snapshot is considered trustworthy if it came from our own roomserver.
|
||||||
|
|
@ -320,7 +320,7 @@ func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e
|
||||||
// added into the mix.
|
// added into the mix.
|
||||||
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
|
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEvent")
|
trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEvent")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// try doing all this locally before we resort to querying federation
|
// try doing all this locally before we resort to querying federation
|
||||||
respState := t.lookupStateAfterEventLocally(ctx, eventID)
|
respState := t.lookupStateAfterEventLocally(ctx, eventID)
|
||||||
|
|
@ -377,7 +377,7 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.Event) *gomatrixs
|
||||||
|
|
||||||
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, eventID string) *parsedRespState {
|
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, eventID string) *parsedRespState {
|
||||||
trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEventLocally")
|
trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEventLocally")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
var res parsedRespState
|
var res parsedRespState
|
||||||
roomState := state.NewStateResolution(t.db, t.roomInfo)
|
roomState := state.NewStateResolution(t.db, t.roomInfo)
|
||||||
|
|
@ -450,7 +450,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
|
||||||
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
||||||
*parsedRespState, error) {
|
*parsedRespState, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "lookupStateBeforeEvent")
|
trace, ctx := internal.StartRegion(ctx, "lookupStateBeforeEvent")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// Attempt to fetch the missing state using /state_ids and /events
|
// Attempt to fetch the missing state using /state_ids and /events
|
||||||
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
||||||
|
|
@ -458,7 +458,7 @@ func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersio
|
||||||
|
|
||||||
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
|
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "resolveStatesAndCheck")
|
trace, ctx := internal.StartRegion(ctx, "resolveStatesAndCheck")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
var authEventList []*gomatrixserverlib.Event
|
var authEventList []*gomatrixserverlib.Event
|
||||||
var stateEventList []*gomatrixserverlib.Event
|
var stateEventList []*gomatrixserverlib.Event
|
||||||
|
|
@ -504,7 +504,7 @@ retryAllowedState:
|
||||||
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
|
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
|
||||||
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
|
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "getMissingEvents")
|
trace, ctx := internal.StartRegion(ctx, "getMissingEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
logger := t.log.WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
logger := t.log.WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
||||||
latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID)
|
latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID)
|
||||||
|
|
@ -634,7 +634,7 @@ func (t *missingStateReq) lookupMissingStateViaState(
|
||||||
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
|
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
) (respState *parsedRespState, err error) {
|
) (respState *parsedRespState, err error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaState")
|
trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaState")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
state, err := t.federation.LookupState(ctx, t.virtualHost, t.origin, roomID, eventID, roomVersion)
|
state, err := t.federation.LookupState(ctx, t.virtualHost, t.origin, roomID, eventID, roomVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -666,7 +666,7 @@ func (t *missingStateReq) lookupMissingStateViaState(
|
||||||
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
*parsedRespState, error) {
|
*parsedRespState, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaStateIDs")
|
trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaStateIDs")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
t.log.Infof("lookupMissingStateViaStateIDs %s", eventID)
|
t.log.Infof("lookupMissingStateViaStateIDs %s", eventID)
|
||||||
// fetch the state event IDs at the time of the event
|
// fetch the state event IDs at the time of the event
|
||||||
|
|
@ -840,7 +840,7 @@ func (t *missingStateReq) createRespStateFromStateIDs(
|
||||||
|
|
||||||
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.Event, error) {
|
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.Event, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "lookupEvent")
|
trace, ctx := internal.StartRegion(ctx, "lookupEvent")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
if localFirst {
|
if localFirst {
|
||||||
// fetch from the roomserver
|
// fetch from the roomserver
|
||||||
|
|
|
||||||
|
|
@ -107,7 +107,7 @@ func (v *StateResolution) LoadStateAtSnapshot(
|
||||||
ctx context.Context, stateNID types.StateSnapshotNID,
|
ctx context.Context, stateNID types.StateSnapshotNID,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtSnapshot")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtSnapshot")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
|
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -148,7 +148,7 @@ func (v *StateResolution) LoadStateAtEvent(
|
||||||
ctx context.Context, eventID string,
|
ctx context.Context, eventID string,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtEvent")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtEvent")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -170,7 +170,7 @@ func (v *StateResolution) LoadMembershipAtEvent(
|
||||||
ctx context.Context, eventIDs []string, stateKeyNID types.EventStateKeyNID,
|
ctx context.Context, eventIDs []string, stateKeyNID types.EventStateKeyNID,
|
||||||
) (map[string][]types.StateEntry, error) {
|
) (map[string][]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadMembershipAtEvent")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadMembershipAtEvent")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// Get a mapping from snapshotNID -> eventIDs
|
// Get a mapping from snapshotNID -> eventIDs
|
||||||
snapshotNIDMap, err := v.db.BulkSelectSnapshotsFromEventIDs(ctx, eventIDs)
|
snapshotNIDMap, err := v.db.BulkSelectSnapshotsFromEventIDs(ctx, eventIDs)
|
||||||
|
|
@ -239,7 +239,7 @@ func (v *StateResolution) LoadStateAtEventForHistoryVisibility(
|
||||||
ctx context.Context, eventID string,
|
ctx context.Context, eventID string,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtEventForHistoryVisibility")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtEventForHistoryVisibility")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -264,7 +264,7 @@ func (v *StateResolution) LoadCombinedStateAfterEvents(
|
||||||
ctx context.Context, prevStates []types.StateAtEvent,
|
ctx context.Context, prevStates []types.StateAtEvent,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadCombinedStateAfterEvents")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadCombinedStateAfterEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
stateNIDs := make([]types.StateSnapshotNID, len(prevStates))
|
stateNIDs := make([]types.StateSnapshotNID, len(prevStates))
|
||||||
for i, state := range prevStates {
|
for i, state := range prevStates {
|
||||||
|
|
@ -339,7 +339,7 @@ func (v *StateResolution) DifferenceBetweeenStateSnapshots(
|
||||||
ctx context.Context, oldStateNID, newStateNID types.StateSnapshotNID,
|
ctx context.Context, oldStateNID, newStateNID types.StateSnapshotNID,
|
||||||
) (removed, added []types.StateEntry, err error) {
|
) (removed, added []types.StateEntry, err error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.DifferenceBetweeenStateSnapshots")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.DifferenceBetweeenStateSnapshots")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
if oldStateNID == newStateNID {
|
if oldStateNID == newStateNID {
|
||||||
// If the snapshot NIDs are the same then nothing has changed
|
// If the snapshot NIDs are the same then nothing has changed
|
||||||
|
|
@ -403,7 +403,7 @@ func (v *StateResolution) LoadStateAtSnapshotForStringTuples(
|
||||||
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
|
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtSnapshotForStringTuples")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtSnapshotForStringTuples")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
|
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -420,7 +420,7 @@ func (v *StateResolution) stringTuplesToNumericTuples(
|
||||||
stringTuples []gomatrixserverlib.StateKeyTuple,
|
stringTuples []gomatrixserverlib.StateKeyTuple,
|
||||||
) ([]types.StateKeyTuple, error) {
|
) ([]types.StateKeyTuple, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.stringTuplesToNumericTuples")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.stringTuplesToNumericTuples")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
eventTypes := make([]string, len(stringTuples))
|
eventTypes := make([]string, len(stringTuples))
|
||||||
stateKeys := make([]string, len(stringTuples))
|
stateKeys := make([]string, len(stringTuples))
|
||||||
|
|
@ -465,7 +465,7 @@ func (v *StateResolution) loadStateAtSnapshotForNumericTuples(
|
||||||
stateKeyTuples []types.StateKeyTuple,
|
stateKeyTuples []types.StateKeyTuple,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateAtSnapshotForNumericTuples")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateAtSnapshotForNumericTuples")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
|
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -516,7 +516,7 @@ func (v *StateResolution) LoadStateAfterEventsForStringTuples(
|
||||||
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
|
stateKeyTuples []gomatrixserverlib.StateKeyTuple,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAfterEventsForStringTuples")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAfterEventsForStringTuples")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
|
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -531,7 +531,7 @@ func (v *StateResolution) loadStateAfterEventsForNumericTuples(
|
||||||
stateKeyTuples []types.StateKeyTuple,
|
stateKeyTuples []types.StateKeyTuple,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateAfterEventsForNumericTuples")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateAfterEventsForNumericTuples")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
if len(prevStates) == 1 {
|
if len(prevStates) == 1 {
|
||||||
// Fast path for a single event.
|
// Fast path for a single event.
|
||||||
|
|
@ -706,7 +706,7 @@ func (v *StateResolution) CalculateAndStoreStateBeforeEvent(
|
||||||
isRejected bool,
|
isRejected bool,
|
||||||
) (types.StateSnapshotNID, error) {
|
) (types.StateSnapshotNID, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.CalculateAndStoreStateBeforeEvent")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.CalculateAndStoreStateBeforeEvent")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// Load the state at the prev events.
|
// Load the state at the prev events.
|
||||||
prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs())
|
prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs())
|
||||||
|
|
@ -725,7 +725,7 @@ func (v *StateResolution) CalculateAndStoreStateAfterEvents(
|
||||||
prevStates []types.StateAtEvent,
|
prevStates []types.StateAtEvent,
|
||||||
) (types.StateSnapshotNID, error) {
|
) (types.StateSnapshotNID, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.CalculateAndStoreStateAfterEvents")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.CalculateAndStoreStateAfterEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
metrics := calculateStateMetrics{startTime: time.Now(), prevEventLength: len(prevStates)}
|
metrics := calculateStateMetrics{startTime: time.Now(), prevEventLength: len(prevStates)}
|
||||||
|
|
||||||
|
|
@ -800,7 +800,7 @@ func (v *StateResolution) calculateAndStoreStateAfterManyEvents(
|
||||||
metrics calculateStateMetrics,
|
metrics calculateStateMetrics,
|
||||||
) (types.StateSnapshotNID, error) {
|
) (types.StateSnapshotNID, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.calculateAndStoreStateAfterManyEvents")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.calculateAndStoreStateAfterManyEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
state, algorithm, conflictLength, err :=
|
state, algorithm, conflictLength, err :=
|
||||||
v.calculateStateAfterManyEvents(ctx, v.roomInfo.RoomVersion, prevStates)
|
v.calculateStateAfterManyEvents(ctx, v.roomInfo.RoomVersion, prevStates)
|
||||||
|
|
@ -821,7 +821,7 @@ func (v *StateResolution) calculateStateAfterManyEvents(
|
||||||
prevStates []types.StateAtEvent,
|
prevStates []types.StateAtEvent,
|
||||||
) (state []types.StateEntry, algorithm string, conflictLength int, err error) {
|
) (state []types.StateEntry, algorithm string, conflictLength int, err error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.calculateStateAfterManyEvents")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.calculateStateAfterManyEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
var combined []types.StateEntry
|
var combined []types.StateEntry
|
||||||
// Conflict resolution.
|
// Conflict resolution.
|
||||||
|
|
@ -876,7 +876,7 @@ func (v *StateResolution) resolveConflicts(
|
||||||
notConflicted, conflicted []types.StateEntry,
|
notConflicted, conflicted []types.StateEntry,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflicts")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflicts")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
stateResAlgo, err := version.StateResAlgorithm()
|
stateResAlgo, err := version.StateResAlgorithm()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -903,7 +903,7 @@ func (v *StateResolution) resolveConflictsV1(
|
||||||
notConflicted, conflicted []types.StateEntry,
|
notConflicted, conflicted []types.StateEntry,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflictsV1")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflictsV1")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
// Load the conflicted events
|
// Load the conflicted events
|
||||||
conflictedEvents, eventIDMap, err := v.loadStateEvents(ctx, conflicted)
|
conflictedEvents, eventIDMap, err := v.loadStateEvents(ctx, conflicted)
|
||||||
|
|
@ -968,7 +968,7 @@ func (v *StateResolution) resolveConflictsV2(
|
||||||
notConflicted, conflicted []types.StateEntry,
|
notConflicted, conflicted []types.StateEntry,
|
||||||
) ([]types.StateEntry, error) {
|
) ([]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflictsV2")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflictsV2")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
estimate := len(conflicted) + len(notConflicted)
|
estimate := len(conflicted) + len(notConflicted)
|
||||||
eventIDMap := make(map[string]types.StateEntry, estimate)
|
eventIDMap := make(map[string]types.StateEntry, estimate)
|
||||||
|
|
@ -1001,7 +1001,7 @@ func (v *StateResolution) resolveConflictsV2(
|
||||||
// For each conflicted event, let's try and get the needed auth events.
|
// For each conflicted event, let's try and get the needed auth events.
|
||||||
if err = func() error {
|
if err = func() error {
|
||||||
loadAuthEventsTrace, sctx := internal.StartRegion(ctx, "StateResolution.loadAuthEvents")
|
loadAuthEventsTrace, sctx := internal.StartRegion(ctx, "StateResolution.loadAuthEvents")
|
||||||
defer loadAuthEventsTrace.End()
|
defer loadAuthEventsTrace.EndRegion()
|
||||||
|
|
||||||
loader := authEventLoader{
|
loader := authEventLoader{
|
||||||
v: v,
|
v: v,
|
||||||
|
|
@ -1046,7 +1046,7 @@ func (v *StateResolution) resolveConflictsV2(
|
||||||
// Resolve the conflicts.
|
// Resolve the conflicts.
|
||||||
resolvedEvents := func() []*gomatrixserverlib.Event {
|
resolvedEvents := func() []*gomatrixserverlib.Event {
|
||||||
resolvedTrace, _ := internal.StartRegion(ctx, "StateResolution.ResolveStateConflictsV2")
|
resolvedTrace, _ := internal.StartRegion(ctx, "StateResolution.ResolveStateConflictsV2")
|
||||||
defer resolvedTrace.End()
|
defer resolvedTrace.EndRegion()
|
||||||
|
|
||||||
return gomatrixserverlib.ResolveStateConflictsV2(
|
return gomatrixserverlib.ResolveStateConflictsV2(
|
||||||
conflictedEvents,
|
conflictedEvents,
|
||||||
|
|
@ -1119,7 +1119,7 @@ func (v *StateResolution) loadStateEvents(
|
||||||
ctx context.Context, entries []types.StateEntry,
|
ctx context.Context, entries []types.StateEntry,
|
||||||
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
|
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
|
||||||
trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateEvents")
|
trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateEvents")
|
||||||
defer trace.End()
|
defer trace.EndRegion()
|
||||||
|
|
||||||
result := make([]*gomatrixserverlib.Event, 0, len(entries))
|
result := make([]*gomatrixserverlib.Event, 0, len(entries))
|
||||||
eventEntries := make([]types.StateEntry, 0, len(entries))
|
eventEntries := make([]types.StateEntry, 0, len(entries))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue