Merge branch 'master' of https://github.com/matrix-org/dendrite into add-nats-support

This commit is contained in:
Till Faelligen 2021-07-06 20:03:51 +02:00
commit 1533c648e3

View file

@ -205,7 +205,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(context.Background()) resp, jsonErr := t.processTransaction(httpReq.Context())
if jsonErr != nil { if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr return *jsonErr
@ -253,11 +253,8 @@ type txnFederationClient interface {
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult) results := make(map[string]gomatrixserverlib.PDUResult)
//var resultsMutex sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
var tasks []*inputTask var tasks []*inputTask
wg.Add(1) // for processEDUs
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
pduCountTotal.WithLabelValues("total").Inc() pduCountTotal.WithLabelValues("total").Inc()
@ -313,9 +310,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
input: newSendFIFOQueue(), input: newSendFIFOQueue(),
}) })
worker := v.(*inputWorker) worker := v.(*inputWorker)
if !worker.running.Load() {
go worker.run()
}
wg.Add(1) wg.Add(1)
task := &inputTask{ task := &inputTask{
ctx: ctx, ctx: ctx,
@ -325,13 +319,12 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
tasks = append(tasks, task) tasks = append(tasks, task)
worker.input.push(task) worker.input.push(task)
if worker.running.CAS(false, true) {
go worker.run()
}
} }
go func() { t.processEDUs(ctx)
defer wg.Done()
t.processEDUs(ctx)
}()
wg.Wait() wg.Wait()
for _, task := range tasks { for _, task := range tasks {
@ -351,9 +344,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
func (t *inputWorker) run() { func (t *inputWorker) run() {
if !t.running.CAS(false, true) {
return
}
defer t.running.Store(false) defer t.running.Store(false)
for { for {
task, ok := t.input.pop() task, ok := t.input.pop()
@ -368,10 +358,14 @@ func (t *inputWorker) run() {
select { select {
case <-task.ctx.Done(): case <-task.ctx.Done():
task.err = context.DeadlineExceeded task.err = context.DeadlineExceeded
pduCountTotal.WithLabelValues("expired").Inc()
return return
default: default:
evStart := time.Now() evStart := time.Now()
task.err = task.t.processEvent(task.ctx, task.event) // TODO: Is 5 minutes too long?
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
task.err = task.t.processEvent(ctx, task.event)
cancel()
task.duration = time.Since(evStart) task.duration = time.Since(evStart)
if err := task.err; err != nil { if err := task.err; err != nil {
switch err.(type) { switch err.(type) {
@ -703,14 +697,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
return gomatrixserverlib.Allowed(e, &authUsingState) return gomatrixserverlib.Allowed(e, &authUsingState)
} }
var processEventWithMissingStateMutexes = internal.NewMutexByRoom()
func (t *txnReq) processEventWithMissingState( func (t *txnReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) error { ) error {
processEventWithMissingStateMutexes.Lock(e.RoomID())
defer processEventWithMissingStateMutexes.Unlock(e.RoomID())
// We are missing the previous events for this events. // We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the // This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap: // room. There two ways that we can handle such a gap: