mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-28 17:23:09 -06:00
Remove mutex, start workers at correct time
This commit is contained in:
parent
59b1df09e5
commit
5e6fa5596a
|
|
@ -253,11 +253,8 @@ type txnFederationClient interface {
|
||||||
|
|
||||||
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
|
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
|
||||||
results := make(map[string]gomatrixserverlib.PDUResult)
|
results := make(map[string]gomatrixserverlib.PDUResult)
|
||||||
//var resultsMutex sync.Mutex
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var tasks []*inputTask
|
var tasks []*inputTask
|
||||||
wg.Add(1) // for processEDUs
|
|
||||||
|
|
||||||
for _, pdu := range t.PDUs {
|
for _, pdu := range t.PDUs {
|
||||||
pduCountTotal.WithLabelValues("total").Inc()
|
pduCountTotal.WithLabelValues("total").Inc()
|
||||||
|
|
@ -322,16 +319,12 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
|
||||||
}
|
}
|
||||||
tasks = append(tasks, task)
|
tasks = append(tasks, task)
|
||||||
worker.input.push(task)
|
worker.input.push(task)
|
||||||
if !worker.running.CAS(false, true) {
|
if worker.running.CAS(false, true) {
|
||||||
go worker.run()
|
go worker.run()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
t.processEDUs(ctx)
|
||||||
defer wg.Done()
|
|
||||||
t.processEDUs(ctx)
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
|
|
@ -703,14 +696,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
|
||||||
return gomatrixserverlib.Allowed(e, &authUsingState)
|
return gomatrixserverlib.Allowed(e, &authUsingState)
|
||||||
}
|
}
|
||||||
|
|
||||||
var processEventWithMissingStateMutexes = internal.NewMutexByRoom()
|
|
||||||
|
|
||||||
func (t *txnReq) processEventWithMissingState(
|
func (t *txnReq) processEventWithMissingState(
|
||||||
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
) error {
|
) error {
|
||||||
processEventWithMissingStateMutexes.Lock(e.RoomID())
|
|
||||||
defer processEventWithMissingStateMutexes.Unlock(e.RoomID())
|
|
||||||
|
|
||||||
// We are missing the previous events for this events.
|
// We are missing the previous events for this events.
|
||||||
// This means that there is a gap in our view of the history of the
|
// This means that there is a gap in our view of the history of the
|
||||||
// room. There two ways that we can handle such a gap:
|
// room. There two ways that we can handle such a gap:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue