diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 6d7c8fcfe..0af5b2997 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -20,7 +20,7 @@ type RoomserverInternalAPI struct { ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier FedClient *gomatrixserverlib.FederationClient - OutputRoomEventTopic string // Kafka topic for new output room events - mutex sync.Mutex // Protects calls to processRoomEvent + OutputRoomEventTopic string // Kafka topic for new output room events + mutexes sync.Map // room ID -> sync.Mutex (protects calls to processRoomEvent) fsAPI fsAPI.FederationSenderInternalAPI } diff --git a/roomserver/internal/input.go b/roomserver/internal/input.go index 2af3e62d8..3b4466035 100644 --- a/roomserver/internal/input.go +++ b/roomserver/internal/input.go @@ -18,6 +18,7 @@ package internal import ( "context" "encoding/json" + "sync" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/roomserver/api" @@ -71,13 +72,18 @@ func (r *RoomserverInternalAPI) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) (err error) { - // We lock as processRoomEvent can only be called once at a time - r.mutex.Lock() - defer r.mutex.Unlock() - for i := range request.InputRoomEvents { + for i, e := range request.InputRoomEvents { + // Take the lock for a given room ID. The roomserver should only + // process one event for a given room at a time to ensure that + // forward extremities, previous events etc are stored properly + // and are not corrupted by race conditions. + mutex, _ := r.mutexes.LoadOrStore(e.Event.RoomID(), &sync.Mutex{}) + mutex.(*sync.Mutex).Lock() if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { + mutex.(*sync.Mutex).Unlock() return err } + mutex.(*sync.Mutex).Unlock() } return nil }