Try to avoid database locks

This commit is contained in:
Neil Alexander 2020-05-29 11:38:19 +01:00
parent 005c4c0168
commit 5487edf1d6
5 changed files with 108 additions and 37 deletions

View file

@ -115,9 +115,9 @@ func (t *EDUCache) AddTypingUser(
func (t *EDUCache) AddSendToDeviceMessage() int64 { func (t *EDUCache) AddSendToDeviceMessage() int64 {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
r := t.latestSyncPosition
t.latestSyncPosition++ t.latestSyncPosition++
return t.latestSyncPosition - 1 return r
} }
// addUser with mutex lock & replace the previous timer. // addUser with mutex lock & replace the previous timer.

View file

@ -19,6 +19,8 @@ import (
"fmt" "fmt"
"runtime" "runtime"
"time" "time"
"go.uber.org/atomic"
) )
// A Transaction is something that can be committed or rolledback. // A Transaction is something that can be committed or rolledback.
@ -107,3 +109,44 @@ type DbProperties interface {
MaxOpenConns() int MaxOpenConns() int
ConnMaxLifetime() time.Duration ConnMaxLifetime() time.Duration
} }
type TransactionWriter struct {
running atomic.Bool
todo chan transactionWriterTask
}
type transactionWriterTask struct {
db *sql.DB
f func(txn *sql.Tx)
wait chan struct{}
}
func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx)) {
if w.todo == nil {
w.todo = make(chan transactionWriterTask)
}
if !w.running.Load() {
go w.run()
}
task := transactionWriterTask{
db: db,
f: f,
wait: make(chan struct{}),
}
w.todo <- task
<-task.wait
}
func (w *TransactionWriter) run() {
if !w.running.CAS(false, true) {
return
}
defer w.running.Store(false)
for task := range w.todo {
_ = WithTransaction(task.db, func(txn *sql.Tx) error {
task.f(txn)
return nil
})
close(task.wait)
}
}

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -33,6 +34,7 @@ import (
type OutputSendToDeviceEventConsumer struct { type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
serverName gomatrixserverlib.ServerName // our server name
notifier *sync.Notifier notifier *sync.Notifier
} }
@ -54,6 +56,7 @@ func NewOutputSendToDeviceEventConsumer(
s := &OutputSendToDeviceEventConsumer{ s := &OutputSendToDeviceEventConsumer{
sendToDeviceConsumer: &consumer, sendToDeviceConsumer: &consumer,
db: store, db: store,
serverName: cfg.Matrix.ServerName,
notifier: n, notifier: n,
} }
@ -75,6 +78,14 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err return err
} }
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
return err
}
if domain != s.serverName {
return nil
}
util.GetLogger(context.TODO()).WithFields(log.Fields{ util.GetLogger(context.TODO()).WithFields(log.Fields{
"sender": output.Sender, "sender": output.Sender,
"user_id": output.UserID, "user_id": output.UserID,

View file

@ -28,6 +28,7 @@ type Database struct {
CurrentRoomState tables.CurrentRoomState CurrentRoomState tables.CurrentRoomState
BackwardExtremities tables.BackwardsExtremities BackwardExtremities tables.BackwardsExtremities
SendToDevice tables.SendToDevice SendToDevice tables.SendToDevice
SendToDeviceWriter internal.TransactionWriter
EDUCache *cache.EDUCache EDUCache *cache.EDUCache
} }
@ -1045,9 +1046,13 @@ func (d *Database) StoreNewSendForDeviceMessage(
if err != nil { if err != nil {
return 0, err return 0, err
} }
err = d.AddSendToDeviceEvent( // Delegate the database write task to the SendToDeviceWriter. It'll guarantee
ctx, nil, userID, deviceID, string(j), // that we don't lock the table for writes in more than one place.
) d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) {
err = d.AddSendToDeviceEvent(
ctx, txn, userID, deviceID, string(j),
)
})
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -1059,42 +1064,48 @@ func (d *Database) SendToDeviceUpdatesForSync(
userID, deviceID string, userID, deviceID string,
token types.StreamingToken, token types.StreamingToken,
) (events []types.SendToDeviceEvent, err error) { ) (events []types.SendToDeviceEvent, err error) {
err = internal.WithTransaction(d.DB, func(txn *sql.Tx) error { // First of all, get our send-to-device updates for this user.
// First of all, get our send-to-device updates for this user. events, err = d.SendToDevice.SelectSendToDeviceMessages(ctx, nil, userID, deviceID)
events, err = d.SendToDevice.SelectSendToDeviceMessages(ctx, txn, userID, deviceID) if err != nil {
if err != nil { return nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err)
return fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err) }
}
// Start by cleaning up any send-to-device messages that have older sent-by-tokens. // Start by cleaning up any send-to-device messages that have older sent-by-tokens.
// This means that they were sent in a previous /sync and the client has happily // This means that they were sent in a previous /sync and the client has happily
// progressed onto newer sync tokens. // progressed onto newer sync tokens.
toUpdate := []types.SendToDeviceNID{} toUpdate := []types.SendToDeviceNID{}
toDelete := []types.SendToDeviceNID{} toDelete := []types.SendToDeviceNID{}
for pos, event := range events { for pos, event := range events {
if event.SentByToken == nil { if event.SentByToken == nil {
// Mark the event for update and keep it in our list of return events. // Mark the event for update and keep it in our list of return events.
toUpdate = append(toUpdate, event.ID) toUpdate = append(toUpdate, event.ID)
event.SentByToken = &token event.SentByToken = &token
} else if token.IsAfter(*event.SentByToken) { } else if token.IsAfter(*event.SentByToken) {
// Mark the event for deletion and remove it from our list of return events. // Mark the event for deletion and remove it from our list of return events.
toDelete = append(toDelete, event.ID) toDelete = append(toDelete, event.ID)
events = append(events[:pos], events[pos+1:]...) events = append(events[:pos], events[pos+1:]...)
}
}
// If we need to write to the database then we'll ask the SendToDeviceWriter to
// do that for us. It'll guarantee that we don't lock the table for writes in
// more than one place.
if len(toUpdate) > 0 || len(toDelete) > 0 {
d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) {
// Delete any send-to-device messages marked for deletion.
if e := d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, toDelete); e != nil {
err = fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", e)
return
} }
}
// Delete any send-to-device messages marked for deletion. // Now update any outstanding send-to-device messages with the new sync token.
if err := d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, toDelete); err != nil { if e := d.SendToDevice.UpdateSentSendToDeviceMessages(ctx, txn, token.String(), toUpdate); e != nil {
return fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", err) err = fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err)
} return
}
})
}
// Now update any outstanding send-to-device messages with the new sync token.
if err := d.SendToDevice.UpdateSentSendToDeviceMessages(ctx, txn, token.String(), toUpdate); err != nil {
return fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err)
}
return nil
})
return return
} }

View file

@ -289,3 +289,9 @@ Existing members see new members' join events
Inbound federation can receive events Inbound federation can receive events
Inbound federation can receive redacted events Inbound federation can receive redacted events
Can logout current device Can logout current device
Can send a message directly to a device using PUT /sendToDevice
Can recv a device message using /sync
Can send a to-device message to two users which both receive it using /sync
Can recv device messages until they are acknowledged
Device messages wake up /sync
Device messages over federation wake up /sync