Per-room input mutex

This commit is contained in:
Neil Alexander 2020-08-20 14:11:51 +01:00
parent 5ad47d3b3d
commit bbb86553c9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 8 additions and 6 deletions

View file

@ -20,7 +20,7 @@ type RoomserverInternalAPI struct {
ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier
FedClient *gomatrixserverlib.FederationClient
OutputRoomEventTopic string // Kafka topic for new output room events
mutex sync.Mutex // Protects calls to processRoomEvent
OutputRoomEventTopic string // Kafka topic for new output room events
mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent
fsAPI fsAPI.FederationSenderInternalAPI
}

View file

@ -18,6 +18,7 @@ package internal
import (
"context"
"encoding/json"
"sync"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/roomserver/api"
@ -71,13 +72,14 @@ func (r *RoomserverInternalAPI) InputRoomEvents(
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) (err error) {
// We lock as processRoomEvent can only be called once at a time
r.mutex.Lock()
defer r.mutex.Unlock()
for i := range request.InputRoomEvents {
for i, e := range request.InputRoomEvents {
mutex, _ := r.mutexes.LoadOrStore(e.Event.RoomID(), &sync.Mutex{})
mutex.(*sync.Mutex).Lock()
if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil {
mutex.(*sync.Mutex).Unlock()
return err
}
mutex.(*sync.Mutex).Unlock()
}
return nil
}