diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 6d3cb908f..a4ac0d48b 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "go.uber.org/atomic" ) @@ -40,11 +39,10 @@ type Inputer struct { } type inputTask struct { - ctx context.Context - event api.InputRoomEvent - wg *sync.WaitGroup - eventID string // written back by worker - err error // written back by worker + ctx context.Context + event api.InputRoomEvent + wg *sync.WaitGroup + err error // written back by worker, only safe to read when all tasks are done } type inputWorker struct { @@ -58,18 +56,11 @@ func (w *inputWorker) start() { return } defer w.running.Store(false) - - logrus.Warn("STARTING WORKER") - defer logrus.Warn("SHUTTING DOWN WORKER") - for { select { case task := <-w.input: - logrus.Warn("WORKER DOING TASK") - task.eventID, task.err = w.r.processRoomEvent(task.ctx, task.event) - logrus.Warn("WORKER FINISHING TASK") + _, task.err = w.r.processRoomEvent(task.ctx, task.event) task.wg.Done() - logrus.Warn("WORKER FINISHED TASK") case <-time.After(time.Second * 5): return } @@ -114,15 +105,12 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) (err error) { - if len(request.InputRoomEvents) == 0 { - logrus.Warn("Nothing to do") - return nil - } - + // Create a wait group. Each task that we dispatch will call Done on + // this wait group so that we know when all of our events have been + // processed. wg := &sync.WaitGroup{} wg.Add(len(request.InputRoomEvents)) tasks := make([]*inputTask, len(request.InputRoomEvents)) - logrus.Warnf("Received %d input events", len(tasks)) for i, e := range request.InputRoomEvents { // Work out if we are running per-room workers or if we're just doing @@ -153,17 +141,15 @@ func (r *Inputer) InputRoomEvents( go worker.start() } - logrus.Warnf("Waiting for %d task(s)", len(tasks)) + // Wait for all of the workers to return results about our tasks. wg.Wait() - logrus.Warnf("Tasks finished") + // If any of the tasks returned an error, we should probably report + // that back to the caller. for _, task := range tasks { if task.err != nil { - logrus.Warnf("Error: %s", task.err.Error()) - } else { - logrus.Warnf("Event ID: %s", task.eventID) + return err } } - return nil }