Reduce copies, use buffered task channel to reduce contention on other rooms

This commit is contained in:
Neil Alexander 2020-09-03 13:08:26 +01:00
parent 3f3b4a3c08
commit 298a14cf33
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 8 additions and 6 deletions

View file

@ -40,7 +40,7 @@ type Inputer struct {
type inputTask struct { type inputTask struct {
ctx context.Context ctx context.Context
event api.InputRoomEvent event *api.InputRoomEvent
wg *sync.WaitGroup wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done err error // written back by worker, only safe to read when all tasks are done
} }
@ -120,10 +120,12 @@ func (r *Inputer) InputRoomEvents(
roomID = e.Event.RoomID() roomID = e.Event.RoomID()
} }
// Look up the worker, or create it if it doesn't exist. // Look up the worker, or create it if it doesn't exist. This channel
// is buffered to reduce the chance that we'll be blocked by another
// room - the channel will be quite small as it's just pointer types.
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
r: r, r: r,
input: make(chan *inputTask), input: make(chan *inputTask, 10),
}) })
worker := w.(*inputWorker) worker := w.(*inputWorker)
@ -132,7 +134,7 @@ func (r *Inputer) InputRoomEvents(
// task has been finished. // task has been finished.
tasks[i] = &inputTask{ tasks[i] = &inputTask{
ctx: ctx, ctx: ctx,
event: e, event: &request.InputRoomEvents[i],
wg: wg, wg: wg,
} }

View file

@ -38,7 +38,7 @@ import (
// nolint:gocyclo // nolint:gocyclo
func (r *Inputer) processRoomEvent( func (r *Inputer) processRoomEvent(
ctx context.Context, ctx context.Context,
input api.InputRoomEvent, input *api.InputRoomEvent,
) (eventID string, err error) { ) (eventID string, err error) {
// Parse and validate the event JSON // Parse and validate the event JSON
headered := input.Event headered := input.Event
@ -143,7 +143,7 @@ func (r *Inputer) processRoomEvent(
func (r *Inputer) calculateAndSetState( func (r *Inputer) calculateAndSetState(
ctx context.Context, ctx context.Context,
input api.InputRoomEvent, input *api.InputRoomEvent,
roomInfo types.RoomInfo, roomInfo types.RoomInfo,
stateAtEvent *types.StateAtEvent, stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event, event gomatrixserverlib.Event,