mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 22:03:10 -06:00
Client API: mutex on (user_id, room_id)
Changed variable name used for the mutexes map Changed the place where the mutex is locked Changed unlock to a defered call instead of manually calling it
This commit is contained in:
parent
a59a190564
commit
e0f1dd77e2
|
|
@ -37,7 +37,7 @@ type sendEventResponse struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
|
userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
|
||||||
)
|
)
|
||||||
|
|
||||||
// SendEvent implements:
|
// SendEvent implements:
|
||||||
|
|
@ -68,6 +68,13 @@ func SendEvent(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create a mutex for the specific user in the specific room
|
||||||
|
// this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order
|
||||||
|
userID := device.UserID
|
||||||
|
mutex, _ := userRoomSendMutexes.LoadOrStore(roomID+userID, &sync.Mutex{})
|
||||||
|
mutex.(*sync.Mutex).Lock()
|
||||||
|
defer mutex.(*sync.Mutex).Unlock()
|
||||||
|
|
||||||
e, resErr := generateSendEvent(req, device, roomID, eventType, stateKey, cfg, rsAPI)
|
e, resErr := generateSendEvent(req, device, roomID, eventType, stateKey, cfg, rsAPI)
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
return *resErr
|
return *resErr
|
||||||
|
|
@ -81,12 +88,6 @@ func SendEvent(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a mutex for the specific user in the specific room
|
|
||||||
// this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order
|
|
||||||
userID := device.UserID
|
|
||||||
mutex, _ := mutexes.LoadOrStore(roomID+userID, &sync.Mutex{})
|
|
||||||
mutex.(*sync.Mutex).Lock()
|
|
||||||
|
|
||||||
// pass the new event to the roomserver and receive the correct event ID
|
// pass the new event to the roomserver and receive the correct event ID
|
||||||
// event ID in case of duplicate transaction is discarded
|
// event ID in case of duplicate transaction is discarded
|
||||||
eventID, err := api.SendEvents(
|
eventID, err := api.SendEvents(
|
||||||
|
|
@ -99,7 +100,6 @@ func SendEvent(
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
|
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
|
||||||
mutex.(*sync.Mutex).Unlock()
|
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||||
|
|
@ -108,8 +108,6 @@ func SendEvent(
|
||||||
"room_version": verRes.RoomVersion,
|
"room_version": verRes.RoomVersion,
|
||||||
}).Info("Sent event to roomserver")
|
}).Info("Sent event to roomserver")
|
||||||
|
|
||||||
mutex.(*sync.Mutex).Unlock()
|
|
||||||
|
|
||||||
res := util.JSONResponse{
|
res := util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: sendEventResponse{eventID},
|
JSON: sendEventResponse{eventID},
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue