From 7f02eab47d9f62132c4fee50f190088b70763e83 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 5 Jul 2021 09:14:24 +0100 Subject: [PATCH 1/3] Remove processEventWithMissingStateMutex --- federationapi/routing/send.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index ae9a63fc2..978eafd4d 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -703,14 +703,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv return gomatrixserverlib.Allowed(e, &authUsingState) } -var processEventWithMissingStateMutexes = internal.NewMutexByRoom() - func (t *txnReq) processEventWithMissingState( ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ) error { - processEventWithMissingStateMutexes.Lock(e.RoomID()) - defer processEventWithMissingStateMutexes.Unlock(e.RoomID()) - // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: From 99d8e1c107c6c328d3b4b33d2704b3fcddd2421b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 5 Jul 2021 12:14:31 +0100 Subject: [PATCH 2/3] Federation API fixes (#1899) * Ensure worker has work before starting goroutine * Revert "Remove processEventWithMissingStateMutex" This reverts commit 7f02eab47d9f62132c4fee50f190088b70763e83. * Use request context when processing transactions * Keep goroutine count down by not starting work for things where the caller gave up * Remove mutex, start workers at correct time --- federationapi/routing/send.go | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 978eafd4d..dde07701a 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -205,7 +205,7 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(context.Background()) + resp, jsonErr := t.processTransaction(httpReq.Context()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -253,11 +253,8 @@ type txnFederationClient interface { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { results := make(map[string]gomatrixserverlib.PDUResult) - //var resultsMutex sync.Mutex - var wg sync.WaitGroup var tasks []*inputTask - wg.Add(1) // for processEDUs for _, pdu := range t.PDUs { pduCountTotal.WithLabelValues("total").Inc() @@ -313,9 +310,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res input: newSendFIFOQueue(), }) worker := v.(*inputWorker) - if !worker.running.Load() { - go worker.run() - } wg.Add(1) task := &inputTask{ ctx: ctx, @@ -325,13 +319,12 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } tasks = append(tasks, task) worker.input.push(task) + if worker.running.CAS(false, true) { + go worker.run() + } } - go func() { - defer wg.Done() - t.processEDUs(ctx) - }() - + t.processEDUs(ctx) wg.Wait() for _, task := range tasks { @@ -351,9 +344,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } func (t *inputWorker) run() { - if !t.running.CAS(false, true) { - return - } defer t.running.Store(false) for { task, ok := t.input.pop() @@ -371,7 +361,10 @@ func (t *inputWorker) run() { return default: evStart := time.Now() - task.err = task.t.processEvent(task.ctx, task.event) + // TODO: Is 5 minutes too long? + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + task.err = task.t.processEvent(ctx, task.event) + cancel() task.duration = time.Since(evStart) if err := task.err; err != nil { switch err.(type) { From bcd3ef38d0c288ebbad54e8cd05f78ff22376c02 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 5 Jul 2021 13:47:37 +0100 Subject: [PATCH 3/3] Track expiry rate on pduCountTotal --- federationapi/routing/send.go | 1 + 1 file changed, 1 insertion(+) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index dde07701a..d5e44f72c 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -358,6 +358,7 @@ func (t *inputWorker) run() { select { case <-task.ctx.Done(): task.err = context.DeadlineExceeded + pduCountTotal.WithLabelValues("expired").Inc() return default: evStart := time.Now()