diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index e0cd7eb5d..3e676419f 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -16,6 +16,7 @@ package routing import ( "net/http" + "sync" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -35,6 +36,10 @@ type sendEventResponse struct { EventID string `json:"event_id"` } +var ( + mutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents +) + // SendEvent implements: // /rooms/{roomID}/send/{eventType} // /rooms/{roomID}/send/{eventType}/{txnID} @@ -76,6 +81,12 @@ func SendEvent( } } + // create a mutex for the specific user in the specific room + // this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order + userID := device.UserID + mutex, _ := mutexes.LoadOrStore(roomID+userID, &sync.Mutex{}) + mutex.(*sync.Mutex).Lock() + // pass the new event to the roomserver and receive the correct event ID // event ID in case of duplicate transaction is discarded eventID, err := api.SendEvents( @@ -88,6 +99,7 @@ func SendEvent( ) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") + mutex.(*sync.Mutex).Unlock() return jsonerror.InternalServerError() } util.GetLogger(req.Context()).WithFields(logrus.Fields{ @@ -96,6 +108,8 @@ func SendEvent( "room_version": verRes.RoomVersion, }).Info("Sent event to roomserver") + mutex.(*sync.Mutex).Unlock() + res := util.JSONResponse{ Code: http.StatusOK, JSON: sendEventResponse{eventID},