From f2dcdf62135d9c8a65181e5bea608cd09538d7fd Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Tue, 31 Oct 2023 10:06:49 +0100 Subject: [PATCH] Some tweaks for the device list updater --- setup/jetstream/streams.go | 1 + userapi/consumers/devicelistupdate.go | 6 +++++- userapi/internal/device_list_update.go | 17 +++++++++++++++++ userapi/internal/device_list_update_test.go | 9 +++++---- userapi/userapi.go | 3 ++- 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 741407926..bcfbadada 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -54,6 +54,7 @@ var streams = []*nats.StreamConfig{ Name: InputDeviceListUpdate, Retention: nats.InterestPolicy, Storage: nats.FileStorage, + MaxAge: time.Hour * 24, }, { Name: InputSigningKeyUpdate, diff --git a/userapi/consumers/devicelistupdate.go b/userapi/consumers/devicelistupdate.go index 3389bb808..b3ccb573b 100644 --- a/userapi/consumers/devicelistupdate.go +++ b/userapi/consumers/devicelistupdate.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "time" "github.com/matrix-org/dendrite/userapi/internal" "github.com/matrix-org/gomatrixserverlib" @@ -82,7 +83,10 @@ func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M return true } - err := t.updater.Update(ctx, m) + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + + err := t.updater.Update(timeoutCtx, m) if err != nil { logrus.WithFields(logrus.Fields{ "user_id": m.UserID, diff --git a/userapi/internal/device_list_update.go b/userapi/internal/device_list_update.go index 2f33589fe..6d537a4d7 100644 --- a/userapi/internal/device_list_update.go +++ b/userapi/internal/device_list_update.go @@ -21,6 +21,7 @@ import ( "fmt" "hash/fnv" "net" + "strconv" "sync" "time" @@ -142,13 +143,27 @@ type KeyChangeProducer interface { ProduceKeyChanges(keys []api.DeviceMessage) error } +var deviceListUpdaterBackpressure = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "keyserver", + Name: "worker_backpressure", + Help: "How many device list updater requests are queued", + }, + []string{"worker_id"}, +) + // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. func NewDeviceListUpdater( process *process.ProcessContext, db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer, fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int, rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName, + enableMetrics bool, ) *DeviceListUpdater { + if enableMetrics { + prometheus.MustRegister(deviceListUpdaterBackpressure) + } return &DeviceListUpdater{ process: process, userIDToMutex: make(map[string]*sync.Mutex), @@ -343,6 +358,8 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) { index := int(int64(hash.Sum32()) % int64(len(u.workerChans))) ch := u.assignChannel(userID) + deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc() + defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec() u.workerChans[index] <- remoteServer select { case <-ch: diff --git a/userapi/internal/device_list_update_test.go b/userapi/internal/device_list_update_test.go index 38fd8b583..14a49bc54 100644 --- a/userapi/internal/device_list_update_test.go +++ b/userapi/internal/device_list_update_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/fclient" @@ -161,7 +162,7 @@ func TestUpdateHavePrevID(t *testing.T) { } ap := &mockDeviceListUpdaterAPI{} producer := &mockKeyChangeProducer{} - updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost") + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics) event := gomatrixserverlib.DeviceListUpdateEvent{ DeviceDisplayName: "Foo Bar", Deleted: false, @@ -233,7 +234,7 @@ func TestUpdateNoPrevID(t *testing.T) { `)), }, nil }) - updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test") + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics) if err := updater.Start(); err != nil { t.Fatalf("failed to start updater: %s", err) } @@ -303,7 +304,7 @@ func TestDebounce(t *testing.T) { close(incomingFedReq) return <-fedCh, nil }) - updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost") + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics) if err := updater.Start(); err != nil { t.Fatalf("failed to start updater: %s", err) } @@ -406,7 +407,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) { updater := NewDeviceListUpdater(processCtx, db, nil, nil, nil, - 0, rsAPI, "test") + 0, rsAPI, "test", caching.DisableMetrics) if err := updater.CleanUp(); err != nil { t.Error(err) } diff --git a/userapi/userapi.go b/userapi/userapi.go index 6b6dac884..5e672faf8 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -18,6 +18,7 @@ import ( "time" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" @@ -99,7 +100,7 @@ func NewInternalAPI( FedClient: fedClient, } - updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName) // 8 workers TODO: configurable + updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName, caching.EnableMetrics) // 8 workers TODO: configurable userAPI.Updater = updater // Remove users which we don't share a room with anymore if err := updater.CleanUp(); err != nil {