mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-23 14:53:10 -06:00
Initial FIFOing of roomserver inputs
This commit is contained in:
parent
74743ac8ae
commit
51dc349e48
|
|
@ -19,12 +19,15 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Inputer struct {
|
type Inputer struct {
|
||||||
|
|
@ -33,7 +36,43 @@ type Inputer struct {
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
OutputRoomEventTopic string
|
OutputRoomEventTopic string
|
||||||
|
|
||||||
mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent
|
workers sync.Map // room ID -> *inputWorker
|
||||||
|
}
|
||||||
|
|
||||||
|
type inputTask struct {
|
||||||
|
event api.InputRoomEvent
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
eventID string // written back by worker
|
||||||
|
err error // written back by worker
|
||||||
|
}
|
||||||
|
|
||||||
|
type inputWorker struct {
|
||||||
|
r *Inputer
|
||||||
|
running atomic.Bool
|
||||||
|
input chan *inputTask
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *inputWorker) start() {
|
||||||
|
if !w.running.CAS(false, true) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer w.running.Store(false)
|
||||||
|
|
||||||
|
logrus.Warn("STARTING WORKER")
|
||||||
|
defer logrus.Warn("SHUTTING DOWN WORKER")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case task := <-w.input:
|
||||||
|
logrus.Warn("WORKER DOING TASK")
|
||||||
|
task.eventID, task.err = w.r.processRoomEvent(context.TODO(), task.event)
|
||||||
|
logrus.Warn("WORKER FINISHING TASK")
|
||||||
|
task.wg.Done()
|
||||||
|
logrus.Warn("WORKER FINISHED TASK")
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOutputEvents implements OutputRoomEventWriter
|
// WriteOutputEvents implements OutputRoomEventWriter
|
||||||
|
|
@ -74,18 +113,50 @@ func (r *Inputer) InputRoomEvents(
|
||||||
request *api.InputRoomEventsRequest,
|
request *api.InputRoomEventsRequest,
|
||||||
response *api.InputRoomEventsResponse,
|
response *api.InputRoomEventsResponse,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(len(request.InputRoomEvents))
|
||||||
|
tasks := make([]*inputTask, len(request.InputRoomEvents))
|
||||||
|
logrus.Warnf("Received %d input events", len(tasks))
|
||||||
|
|
||||||
for i, e := range request.InputRoomEvents {
|
for i, e := range request.InputRoomEvents {
|
||||||
|
// Work out if we are running per-room workers or if we're just doing
|
||||||
|
// it on a global basis (e.g. SQLite).
|
||||||
roomID := "global"
|
roomID := "global"
|
||||||
if r.DB.SupportsConcurrentRoomInputs() {
|
if r.DB.SupportsConcurrentRoomInputs() {
|
||||||
roomID = e.Event.RoomID()
|
roomID = e.Event.RoomID()
|
||||||
}
|
}
|
||||||
mutex, _ := r.mutexes.LoadOrStore(roomID, &sync.Mutex{})
|
|
||||||
mutex.(*sync.Mutex).Lock()
|
// Look up the worker, or create it if it doesn't exist.
|
||||||
if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil {
|
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
|
||||||
mutex.(*sync.Mutex).Unlock()
|
r: r,
|
||||||
return err
|
input: make(chan *inputTask),
|
||||||
|
})
|
||||||
|
worker := w.(*inputWorker)
|
||||||
|
|
||||||
|
// Create a task. This contains the input event and a reference to
|
||||||
|
// the wait group, so that the worker can notify us when this specific
|
||||||
|
// task has been finished.
|
||||||
|
tasks[i] = &inputTask{
|
||||||
|
event: e,
|
||||||
|
wg: wg,
|
||||||
}
|
}
|
||||||
mutex.(*sync.Mutex).Unlock()
|
|
||||||
|
// Send the task to the worker.
|
||||||
|
go func(task *inputTask) { worker.input <- task }(tasks[i])
|
||||||
|
go worker.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logrus.Warnf("Waiting for %d task(s)", len(tasks))
|
||||||
|
wg.Wait()
|
||||||
|
logrus.Warnf("Tasks finished")
|
||||||
|
|
||||||
|
for _, task := range tasks {
|
||||||
|
if task.err != nil {
|
||||||
|
logrus.Warnf("Error: %w", task.err)
|
||||||
|
} else {
|
||||||
|
logrus.Warnf("Event ID: %s", task.eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue