diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index a10827a01..f84e2b3ad 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -40,7 +40,7 @@ type Inputer struct { type inputTask struct { ctx context.Context - event api.InputRoomEvent + event *api.InputRoomEvent wg *sync.WaitGroup err error // written back by worker, only safe to read when all tasks are done } @@ -120,10 +120,12 @@ func (r *Inputer) InputRoomEvents( roomID = e.Event.RoomID() } - // Look up the worker, or create it if it doesn't exist. + // Look up the worker, or create it if it doesn't exist. This channel + // is buffered to reduce the chance that we'll be blocked by another + // room - the channel will be quite small as it's just pointer types. w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ r: r, - input: make(chan *inputTask), + input: make(chan *inputTask, 10), }) worker := w.(*inputWorker) @@ -132,7 +134,7 @@ func (r *Inputer) InputRoomEvents( // task has been finished. tasks[i] = &inputTask{ ctx: ctx, - event: e, + event: &request.InputRoomEvents[i], wg: wg, } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 69f51f4b8..6ee679da6 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -38,7 +38,7 @@ import ( // nolint:gocyclo func (r *Inputer) processRoomEvent( ctx context.Context, - input api.InputRoomEvent, + input *api.InputRoomEvent, ) (eventID string, err error) { // Parse and validate the event JSON headered := input.Event @@ -143,7 +143,7 @@ func (r *Inputer) processRoomEvent( func (r *Inputer) calculateAndSetState( ctx context.Context, - input api.InputRoomEvent, + input *api.InputRoomEvent, roomInfo types.RoomInfo, stateAtEvent *types.StateAtEvent, event gomatrixserverlib.Event,