diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 6d7c8fcfe..efe9bdcdc 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -20,7 +20,7 @@ type RoomserverInternalAPI struct { ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier FedClient *gomatrixserverlib.FederationClient - OutputRoomEventTopic string // Kafka topic for new output room events - mutex sync.Mutex // Protects calls to processRoomEvent + OutputRoomEventTopic string // Kafka topic for new output room events + mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent fsAPI fsAPI.FederationSenderInternalAPI } diff --git a/roomserver/internal/input.go b/roomserver/internal/input.go index 2af3e62d8..cbb61dfc1 100644 --- a/roomserver/internal/input.go +++ b/roomserver/internal/input.go @@ -18,6 +18,7 @@ package internal import ( "context" "encoding/json" + "sync" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/roomserver/api" @@ -71,13 +72,14 @@ func (r *RoomserverInternalAPI) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) (err error) { - // We lock as processRoomEvent can only be called once at a time - r.mutex.Lock() - defer r.mutex.Unlock() - for i := range request.InputRoomEvents { + for i, e := range request.InputRoomEvents { + mutex, _ := r.mutexes.LoadOrStore(e.Event.RoomID(), &sync.Mutex{}) + mutex.(*sync.Mutex).Lock() if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { + mutex.(*sync.Mutex).Unlock() return err } + mutex.(*sync.Mutex).Unlock() } return nil }