FIFO ordering of input events (#1386)

* Initial FIFOing of roomserver inputs

* Remove EventID response from api.InputRoomEventsResponse

* Don't send back event ID unnecessarily

* Fix ordering hopefully

* Reduce copies, use buffered task channel to reduce contention on other rooms

* Fix error handling
This commit is contained in:
Neil Alexander 2020-09-03 15:22:16 +01:00 committed by GitHub
parent 74743ac8ae
commit 6150de6cb3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 99 additions and 46 deletions

View file

@ -342,8 +342,7 @@ func createRoom(
} }
// send events to the room server // send events to the room server
_, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil) if err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil); err != nil {
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -75,13 +75,12 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
_, err = roomserverAPI.SendEvents( if err = roomserverAPI.SendEvents(
ctx, rsAPI, ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)}, []gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
nil, nil,
) ); err != nil {
if err != nil {
util.GetLogger(ctx).WithError(err).Error("SendEvents failed") util.GetLogger(ctx).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -171,7 +171,7 @@ func SetAvatarURL(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -289,7 +289,7 @@ func SetDisplayName(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -122,8 +122,7 @@ func SendRedaction(
JSON: jsonerror.NotFound("Room does not exist"), JSON: jsonerror.NotFound("Room does not exist"),
} }
} }
_, err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil) if err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil {
if err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents") util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -90,27 +90,26 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID // pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded // event ID in case of duplicate transaction is discarded
eventID, err := api.SendEvents( if err := api.SendEvents(
req.Context(), rsAPI, req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion), e.Headered(verRes.RoomVersion),
}, },
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
txnAndSessionID, txnAndSessionID,
) ); err != nil {
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
util.GetLogger(req.Context()).WithFields(logrus.Fields{ util.GetLogger(req.Context()).WithFields(logrus.Fields{
"event_id": eventID, "event_id": e.EventID(),
"room_id": roomID, "room_id": roomID,
"room_version": verRes.RoomVersion, "room_version": verRes.RoomVersion,
}).Info("Sent event to roomserver") }).Info("Sent event to roomserver")
res := util.JSONResponse{ res := util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: sendEventResponse{eventID}, JSON: sendEventResponse{e.EventID()},
} }
// Add response to transactionsCache // Add response to transactionsCache
if txnID != nil { if txnID != nil {

View file

@ -359,7 +359,7 @@ func emit3PIDInviteEvent(
return err return err
} }
_, err = api.SendEvents( return api.SendEvents(
ctx, rsAPI, ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion), (*event).Headered(queryRes.RoomVersion),
@ -367,5 +367,4 @@ func emit3PIDInviteEvent(
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
nil, nil,
) )
return err
} }

View file

@ -266,15 +266,14 @@ func SendJoin(
// We are responsible for notifying other servers that the user has joined // We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName // the room, so set SendAsServer to cfg.Matrix.ServerName
if !alreadyJoined { if !alreadyJoined {
_, err = api.SendEvents( if err = api.SendEvents(
httpReq.Context(), rsAPI, httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion), event.Headered(stateAndAuthChainResponse.RoomVersion),
}, },
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
nil, nil,
) ); err != nil {
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -247,15 +247,14 @@ func SendLeave(
// Send the events to the room server. // Send the events to the room server.
// We are responsible for notifying other servers that the user has left // We are responsible for notifying other servers that the user has left
// the room, so set SendAsServer to cfg.Matrix.ServerName // the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = api.SendEvents( if err = api.SendEvents(
httpReq.Context(), rsAPI, httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion), event.Headered(verRes.RoomVersion),
}, },
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
nil, nil,
) ); err != nil {
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -382,7 +382,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
} }
// pass the event to the roomserver // pass the event to the roomserver
_, err := api.SendEvents( return api.SendEvents(
t.context, t.rsAPI, t.context, t.rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion), e.Headered(stateResp.RoomVersion),
@ -390,7 +390,6 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
api.DoNotSendToOtherServers, api.DoNotSendToOtherServers,
nil, nil,
) )
return err
} }
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error { func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {

View file

@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites(
} }
// Send all the events // Send all the events
if _, err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil { if err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -172,7 +172,7 @@ func ExchangeThirdPartyInvite(
} }
// Send the event to the roomserver // Send the event to the roomserver
if _, err = api.SendEvents( if err = api.SendEvents(
httpReq.Context(), rsAPI, httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion), signedEvent.Event.Headered(verRes.RoomVersion),

View file

@ -83,5 +83,4 @@ type InputRoomEventsRequest struct {
// InputRoomEventsResponse is a response to InputRoomEvents // InputRoomEventsResponse is a response to InputRoomEvents
type InputRoomEventsResponse struct { type InputRoomEventsResponse struct {
EventID string `json:"event_id"`
} }

View file

@ -26,7 +26,7 @@ import (
func SendEvents( func SendEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent, ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) (string, error) { ) error {
ires := make([]InputRoomEvent, len(events)) ires := make([]InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
ires[i] = InputRoomEvent{ ires[i] = InputRoomEvent{
@ -77,19 +77,16 @@ func SendEventWithState(
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
}) })
_, err = SendInputRoomEvents(ctx, rsAPI, ires) return SendInputRoomEvents(ctx, rsAPI, ires)
return err
} }
// SendInputRoomEvents to the roomserver. // SendInputRoomEvents to the roomserver.
func SendInputRoomEvents( func SendInputRoomEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent,
) (eventID string, err error) { ) error {
request := InputRoomEventsRequest{InputRoomEvents: ires} request := InputRoomEventsRequest{InputRoomEvents: ires}
var response InputRoomEventsResponse var response InputRoomEventsResponse
err = rsAPI.InputRoomEvents(ctx, &request, &response) return rsAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
} }
// SendInvite event to the roomserver. // SendInvite event to the roomserver.

View file

@ -19,12 +19,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"sync" "sync"
"time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
) )
type Inputer struct { type Inputer struct {
@ -33,7 +35,36 @@ type Inputer struct {
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
OutputRoomEventTopic string OutputRoomEventTopic string
mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent workers sync.Map // room ID -> *inputWorker
}
type inputTask struct {
ctx context.Context
event *api.InputRoomEvent
wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done
}
type inputWorker struct {
r *Inputer
running atomic.Bool
input chan *inputTask
}
func (w *inputWorker) start() {
if !w.running.CAS(false, true) {
return
}
defer w.running.Store(false)
for {
select {
case task := <-w.input:
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
task.wg.Done()
case <-time.After(time.Second * 5):
return
}
}
} }
// WriteOutputEvents implements OutputRoomEventWriter // WriteOutputEvents implements OutputRoomEventWriter
@ -73,19 +104,54 @@ func (r *Inputer) InputRoomEvents(
ctx context.Context, ctx context.Context,
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) (err error) { ) error {
// Create a wait group. Each task that we dispatch will call Done on
// this wait group so that we know when all of our events have been
// processed.
wg := &sync.WaitGroup{}
wg.Add(len(request.InputRoomEvents))
tasks := make([]*inputTask, len(request.InputRoomEvents))
for i, e := range request.InputRoomEvents { for i, e := range request.InputRoomEvents {
// Work out if we are running per-room workers or if we're just doing
// it on a global basis (e.g. SQLite).
roomID := "global" roomID := "global"
if r.DB.SupportsConcurrentRoomInputs() { if r.DB.SupportsConcurrentRoomInputs() {
roomID = e.Event.RoomID() roomID = e.Event.RoomID()
} }
mutex, _ := r.mutexes.LoadOrStore(roomID, &sync.Mutex{})
mutex.(*sync.Mutex).Lock() // Look up the worker, or create it if it doesn't exist. This channel
if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { // is buffered to reduce the chance that we'll be blocked by another
mutex.(*sync.Mutex).Unlock() // room - the channel will be quite small as it's just pointer types.
return err w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
r: r,
input: make(chan *inputTask, 10),
})
worker := w.(*inputWorker)
// Create a task. This contains the input event and a reference to
// the wait group, so that the worker can notify us when this specific
// task has been finished.
tasks[i] = &inputTask{
ctx: ctx,
event: &request.InputRoomEvents[i],
wg: wg,
}
// Send the task to the worker.
go worker.start()
worker.input <- tasks[i]
}
// Wait for all of the workers to return results about our tasks.
wg.Wait()
// If any of the tasks returned an error, we should probably report
// that back to the caller.
for _, task := range tasks {
if task.err != nil {
return task.err
} }
mutex.(*sync.Mutex).Unlock()
} }
return nil return nil
} }

View file

@ -38,7 +38,7 @@ import (
// nolint:gocyclo // nolint:gocyclo
func (r *Inputer) processRoomEvent( func (r *Inputer) processRoomEvent(
ctx context.Context, ctx context.Context,
input api.InputRoomEvent, input *api.InputRoomEvent,
) (eventID string, err error) { ) (eventID string, err error) {
// Parse and validate the event JSON // Parse and validate the event JSON
headered := input.Event headered := input.Event
@ -143,7 +143,7 @@ func (r *Inputer) processRoomEvent(
func (r *Inputer) calculateAndSetState( func (r *Inputer) calculateAndSetState(
ctx context.Context, ctx context.Context,
input api.InputRoomEvent, input *api.InputRoomEvent,
roomInfo types.RoomInfo, roomInfo types.RoomInfo,
stateAtEvent *types.StateAtEvent, stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event, event gomatrixserverlib.Event,

View file

@ -114,8 +114,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js
rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}) rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{})
hevents := mustLoadEvents(t, ver, events) hevents := mustLoadEvents(t, ver, events)
_, err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil) if err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil {
if err != nil {
t.Errorf("failed to SendEvents: %s", err) t.Errorf("failed to SendEvents: %s", err)
} }
return rsAPI, dp, hevents return rsAPI, dp, hevents