Some tweaks for the device list updater

This commit is contained in:
Till Faelligen 2023-10-31 10:06:49 +01:00
parent 32f7c4b166
commit f2dcdf6213
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
5 changed files with 30 additions and 6 deletions

View file

@ -54,6 +54,7 @@ var streams = []*nats.StreamConfig{
Name: InputDeviceListUpdate, Name: InputDeviceListUpdate,
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,
Storage: nats.FileStorage, Storage: nats.FileStorage,
MaxAge: time.Hour * 24,
}, },
{ {
Name: InputSigningKeyUpdate, Name: InputSigningKeyUpdate,

View file

@ -17,6 +17,7 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"time"
"github.com/matrix-org/dendrite/userapi/internal" "github.com/matrix-org/dendrite/userapi/internal"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -82,7 +83,10 @@ func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
return true return true
} }
err := t.updater.Update(ctx, m) timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
err := t.updater.Update(timeoutCtx, m)
if err != nil { if err != nil {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"user_id": m.UserID, "user_id": m.UserID,

View file

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"net" "net"
"strconv"
"sync" "sync"
"time" "time"
@ -142,13 +143,27 @@ type KeyChangeProducer interface {
ProduceKeyChanges(keys []api.DeviceMessage) error ProduceKeyChanges(keys []api.DeviceMessage) error
} }
var deviceListUpdaterBackpressure = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "keyserver",
Name: "worker_backpressure",
Help: "How many device list updater requests are queued",
},
[]string{"worker_id"},
)
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
func NewDeviceListUpdater( func NewDeviceListUpdater(
process *process.ProcessContext, db DeviceListUpdaterDatabase, process *process.ProcessContext, db DeviceListUpdaterDatabase,
api DeviceListUpdaterAPI, producer KeyChangeProducer, api DeviceListUpdaterAPI, producer KeyChangeProducer,
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int, fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName, rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName,
enableMetrics bool,
) *DeviceListUpdater { ) *DeviceListUpdater {
if enableMetrics {
prometheus.MustRegister(deviceListUpdaterBackpressure)
}
return &DeviceListUpdater{ return &DeviceListUpdater{
process: process, process: process,
userIDToMutex: make(map[string]*sync.Mutex), userIDToMutex: make(map[string]*sync.Mutex),
@ -343,6 +358,8 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
index := int(int64(hash.Sum32()) % int64(len(u.workerChans))) index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
ch := u.assignChannel(userID) ch := u.assignChannel(userID)
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec()
u.workerChans[index] <- remoteServer u.workerChans[index] <- remoteServer
select { select {
case <-ch: case <-ch:

View file

@ -27,6 +27,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/fclient"
@ -161,7 +162,7 @@ func TestUpdateHavePrevID(t *testing.T) {
} }
ap := &mockDeviceListUpdaterAPI{} ap := &mockDeviceListUpdaterAPI{}
producer := &mockKeyChangeProducer{} producer := &mockKeyChangeProducer{}
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost") updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics)
event := gomatrixserverlib.DeviceListUpdateEvent{ event := gomatrixserverlib.DeviceListUpdateEvent{
DeviceDisplayName: "Foo Bar", DeviceDisplayName: "Foo Bar",
Deleted: false, Deleted: false,
@ -233,7 +234,7 @@ func TestUpdateNoPrevID(t *testing.T) {
`)), `)),
}, nil }, nil
}) })
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test") updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics)
if err := updater.Start(); err != nil { if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err) t.Fatalf("failed to start updater: %s", err)
} }
@ -303,7 +304,7 @@ func TestDebounce(t *testing.T) {
close(incomingFedReq) close(incomingFedReq)
return <-fedCh, nil return <-fedCh, nil
}) })
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost") updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics)
if err := updater.Start(); err != nil { if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err) t.Fatalf("failed to start updater: %s", err)
} }
@ -406,7 +407,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) {
updater := NewDeviceListUpdater(processCtx, db, nil, updater := NewDeviceListUpdater(processCtx, db, nil,
nil, nil, nil, nil,
0, rsAPI, "test") 0, rsAPI, "test", caching.DisableMetrics)
if err := updater.CleanUp(); err != nil { if err := updater.CleanUp(); err != nil {
t.Error(err) t.Error(err)
} }

View file

@ -18,6 +18,7 @@ import (
"time" "time"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -99,7 +100,7 @@ func NewInternalAPI(
FedClient: fedClient, FedClient: fedClient,
} }
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName) // 8 workers TODO: configurable updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName, caching.EnableMetrics) // 8 workers TODO: configurable
userAPI.Updater = updater userAPI.Updater = updater
// Remove users which we don't share a room with anymore // Remove users which we don't share a room with anymore
if err := updater.CleanUp(); err != nil { if err := updater.CleanUp(); err != nil {