Don't send back event ID unnecessarily

This commit is contained in:
Neil Alexander 2020-09-03 12:17:15 +01:00
parent c38886b8ac
commit 2b653f5edf
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.uber.org/atomic" "go.uber.org/atomic"
) )
@ -40,11 +39,10 @@ type Inputer struct {
} }
type inputTask struct { type inputTask struct {
ctx context.Context ctx context.Context
event api.InputRoomEvent event api.InputRoomEvent
wg *sync.WaitGroup wg *sync.WaitGroup
eventID string // written back by worker err error // written back by worker, only safe to read when all tasks are done
err error // written back by worker
} }
type inputWorker struct { type inputWorker struct {
@ -58,18 +56,11 @@ func (w *inputWorker) start() {
return return
} }
defer w.running.Store(false) defer w.running.Store(false)
logrus.Warn("STARTING WORKER")
defer logrus.Warn("SHUTTING DOWN WORKER")
for { for {
select { select {
case task := <-w.input: case task := <-w.input:
logrus.Warn("WORKER DOING TASK") _, task.err = w.r.processRoomEvent(task.ctx, task.event)
task.eventID, task.err = w.r.processRoomEvent(task.ctx, task.event)
logrus.Warn("WORKER FINISHING TASK")
task.wg.Done() task.wg.Done()
logrus.Warn("WORKER FINISHED TASK")
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
return return
} }
@ -114,15 +105,12 @@ func (r *Inputer) InputRoomEvents(
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) (err error) { ) (err error) {
if len(request.InputRoomEvents) == 0 { // Create a wait group. Each task that we dispatch will call Done on
logrus.Warn("Nothing to do") // this wait group so that we know when all of our events have been
return nil // processed.
}
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(len(request.InputRoomEvents)) wg.Add(len(request.InputRoomEvents))
tasks := make([]*inputTask, len(request.InputRoomEvents)) tasks := make([]*inputTask, len(request.InputRoomEvents))
logrus.Warnf("Received %d input events", len(tasks))
for i, e := range request.InputRoomEvents { for i, e := range request.InputRoomEvents {
// Work out if we are running per-room workers or if we're just doing // Work out if we are running per-room workers or if we're just doing
@ -153,17 +141,15 @@ func (r *Inputer) InputRoomEvents(
go worker.start() go worker.start()
} }
logrus.Warnf("Waiting for %d task(s)", len(tasks)) // Wait for all of the workers to return results about our tasks.
wg.Wait() wg.Wait()
logrus.Warnf("Tasks finished")
// If any of the tasks returned an error, we should probably report
// that back to the caller.
for _, task := range tasks { for _, task := range tasks {
if task.err != nil { if task.err != nil {
logrus.Warnf("Error: %s", task.err.Error()) return err
} else {
logrus.Warnf("Event ID: %s", task.eventID)
} }
} }
return nil return nil
} }