Client API: mutex on (user_id, room_id)

This commit is contained in:
Anand Vasudevan 2020-08-19 19:22:12 +05:30
parent 775b04d776
commit 867df3b29e

View file

@ -16,6 +16,7 @@ package routing
import (
"net/http"
"sync"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -35,6 +36,10 @@ type sendEventResponse struct {
EventID string `json:"event_id"`
}
var (
mutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
)
// SendEvent implements:
// /rooms/{roomID}/send/{eventType}
// /rooms/{roomID}/send/{eventType}/{txnID}
@ -76,6 +81,12 @@ func SendEvent(
}
}
// create a mutex for the specific user in the specific room
// this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order
userID := device.UserID
mutex, _ := mutexes.LoadOrStore(roomID+userID, &sync.Mutex{})
mutex.(*sync.Mutex).Lock()
// pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded
eventID, err := api.SendEvents(
@ -88,6 +99,7 @@ func SendEvent(
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
mutex.(*sync.Mutex).Unlock()
return jsonerror.InternalServerError()
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{
@ -96,6 +108,8 @@ func SendEvent(
"room_version": verRes.RoomVersion,
}).Info("Sent event to roomserver")
mutex.(*sync.Mutex).Unlock()
res := util.JSONResponse{
Code: http.StatusOK,
JSON: sendEventResponse{eventID},