Roomserver per-room input lock

This commit is contained in:
Neil Alexander 2020-08-17 12:52:05 +01:00
parent e7d450adb8
commit 828ebb4874
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 12 additions and 6 deletions

View file

@ -21,6 +21,6 @@ type RoomserverInternalAPI struct {
KeyRing gomatrixserverlib.JSONVerifier KeyRing gomatrixserverlib.JSONVerifier
FedClient *gomatrixserverlib.FederationClient FedClient *gomatrixserverlib.FederationClient
OutputRoomEventTopic string // Kafka topic for new output room events OutputRoomEventTopic string // Kafka topic for new output room events
mutex sync.Mutex // Protects calls to processRoomEvent mutexes sync.Map // room ID -> sync.Mutex (protects calls to processRoomEvent)
fsAPI fsAPI.FederationSenderInternalAPI fsAPI fsAPI.FederationSenderInternalAPI
} }

View file

@ -18,6 +18,7 @@ package internal
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"sync"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -71,13 +72,18 @@ func (r *RoomserverInternalAPI) InputRoomEvents(
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) (err error) { ) (err error) {
// We lock as processRoomEvent can only be called once at a time for i, e := range request.InputRoomEvents {
r.mutex.Lock() // Take the lock for a given room ID. The roomserver should only
defer r.mutex.Unlock() // process one event for a given room at a time to ensure that
for i := range request.InputRoomEvents { // forward extremities, previous events etc are stored properly
// and are not corrupted by race conditions.
mutex, _ := r.mutexes.LoadOrStore(e.Event.RoomID(), &sync.Mutex{})
mutex.(*sync.Mutex).Lock()
if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil {
mutex.(*sync.Mutex).Unlock()
return err return err
} }
mutex.(*sync.Mutex).Unlock()
} }
return nil return nil
} }