From e45ba35e979f2f8af1e75f5f16158973b790436b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 29 Sep 2022 20:05:05 +0100 Subject: [PATCH 1/3] Enable knocking on room versions 8 and 9 (update to matrix-org/gomatrixserverlib@91d455c) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index bd79680ed..0985b986c 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220929155203-377b320ad5b1 + github.com/matrix-org/gomatrixserverlib v0.0.0-20220929190355-91d455cd3621 github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.15 diff --git a/go.sum b/go.sum index 742a52dcb..c3db3e09e 100644 --- a/go.sum +++ b/go.sum @@ -384,8 +384,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220929155203-377b320ad5b1 h1:kS4jgi/DshFGJArjF14b1u42fVWC+8LMVdSVLdQYq3E= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220929155203-377b320ad5b1/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220929190355-91d455cd3621 h1:a8IaoSPDxevkgXnOUrtIW9AqVNvXBJAG0gtnX687S7g= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220929190355-91d455cd3621/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c h1:iCHLYwwlPsf4TYFrvhKdhQoAM2lXzcmDZYqwBNWcnVk= github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= From 9005e5b4a87c0bdd4f19b1dbf46f3093ac07e727 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Fri, 30 Sep 2022 10:32:31 +0200 Subject: [PATCH 2/3] Add `/_dendrite/admin/refreshDevices/{userID}` (#2746) Allows to immediately query `/devices/{userID}` over federation to (hopefully) resolve E2EE issues. --- clientapi/routing/admin.go | 55 ++++++++++++++++++++++++------- clientapi/routing/routing.go | 8 ++++- docs/administration/4_adminapi.md | 5 +++ keyserver/api/api.go | 1 + keyserver/internal/internal.go | 13 ++++++-- 5 files changed, 66 insertions(+), 16 deletions(-) diff --git a/clientapi/routing/admin.go b/clientapi/routing/admin.go index 5089d7c36..89c269f1a 100644 --- a/clientapi/routing/admin.go +++ b/clientapi/routing/admin.go @@ -2,20 +2,23 @@ package routing import ( "encoding/json" + "fmt" "net/http" "time" "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/internal/httputil" - roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + userapi "github.com/matrix-org/dendrite/userapi/api" ) func AdminEvacuateRoom(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse { @@ -144,12 +147,6 @@ func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userap } func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, natsClient *nats.Conn) util.JSONResponse { - if device.AccountType != userapi.AccountTypeAdmin { - return util.JSONResponse{ - Code: http.StatusForbidden, - JSON: jsonerror.Forbidden("This API can only be used by admin users."), - } - } _, err := natsClient.RequestMsg(nats.NewMsg(cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex)), time.Second*10) if err != nil { logrus.WithError(err).Error("failed to publish nats message") @@ -160,3 +157,37 @@ func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *userapi.Devi JSON: struct{}{}, } } + +func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI api.ClientKeyAPI) util.JSONResponse { + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + userID := vars["userID"] + + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if domain == cfg.Matrix.ServerName { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidParam("Can not mark local device list as stale"), + } + } + + err = keyAPI.PerformMarkAsStaleIfNeeded(req.Context(), &api.PerformMarkAsStaleRequest{ + UserID: userID, + Domain: domain, + }, &struct{}{}) + if err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.Unknown(fmt.Sprintf("Failed to mark device list as stale: %s", err)), + } + } + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 9c1f8f720..7d1c434c4 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -163,11 +163,17 @@ func Setup( ).Methods(http.MethodPost, http.MethodOptions) dendriteAdminRouter.Handle("/admin/fulltext/reindex", - httputil.MakeAuthAPI("admin_fultext_reindex", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + httputil.MakeAdminAPI("admin_fultext_reindex", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { return AdminReindex(req, cfg, device, natsClient) }), ).Methods(http.MethodGet, http.MethodOptions) + dendriteAdminRouter.Handle("/admin/refreshDevices/{userID}", + httputil.MakeAdminAPI("admin_refresh_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return AdminMarkAsStale(req, cfg, keyAPI) + }), + ).Methods(http.MethodPost, http.MethodOptions) + // server notifications if cfg.Matrix.ServerNotices.Enabled { logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice") diff --git a/docs/administration/4_adminapi.md b/docs/administration/4_adminapi.md index 1712bb1bf..56e19a8b4 100644 --- a/docs/administration/4_adminapi.md +++ b/docs/administration/4_adminapi.md @@ -62,6 +62,11 @@ the full user ID is `@alice:domain.com` then the local part is `alice`. This endpoint instructs Dendrite to reindex all searchable events (`m.room.message`, `m.room.topic` and `m.room.name`). An empty JSON body will be returned immediately. Indexing is done in the background, the server logs every 1000 events (or below) when they are being indexed. Once reindexing is done, you'll see something along the lines `Indexed 69586 events in 53.68223182s` in your debug logs. +## POST `/_dendrite/admin/refreshDevices/{userID}` + +This endpoint instructs Dendrite to immediately query `/devices/{userID}` on a federated server. An empty JSON body will be returned on success, updating all locally stored user devices/keys. This can be used to possibly resolve E2EE issues, where the remote user can't decrypt messages. + + ## POST `/_synapse/admin/v1/send_server_notice` Request body format: diff --git a/keyserver/api/api.go b/keyserver/api/api.go index c9ec59a75..14fced3e8 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -45,6 +45,7 @@ type ClientKeyAPI interface { PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) error // PerformClaimKeys claims one-time keys for use in pre-key messages PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) error + PerformMarkAsStaleIfNeeded(ctx context.Context, req *PerformMarkAsStaleRequest, res *struct{}) error } // API functions required by the userapi diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index a8d1128c4..6309066d7 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -228,14 +228,21 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query // PerformMarkAsStaleIfNeeded marks the users device list as stale, if the given deviceID is not present // in our database. func (a *KeyInternalAPI) PerformMarkAsStaleIfNeeded(ctx context.Context, req *api.PerformMarkAsStaleRequest, res *struct{}) error { - knownDevices, err := a.DB.DeviceKeysForUser(ctx, req.UserID, []string{req.DeviceID}, true) + knownDevices, err := a.DB.DeviceKeysForUser(ctx, req.UserID, []string{}, true) if err != nil { return err } if len(knownDevices) == 0 { - return a.Updater.ManualUpdate(ctx, req.Domain, req.UserID) + return fmt.Errorf("unknown user %s", req.UserID) } - return nil + + for i := range knownDevices { + if knownDevices[i].DeviceID == req.DeviceID { + return nil // we already know about this device + } + } + + return a.Updater.ManualUpdate(ctx, req.Domain, req.UserID) } // nolint:gocyclo From 8a82f100460dc5ca7bd39ae2345c251d6622c494 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 30 Sep 2022 09:41:16 +0100 Subject: [PATCH 3/3] Allow more time for device list updates (#2749) This updates the device list updater so that it has a context per-request, rather than a global 30 seconds for the entire server. This could mean that talking to a slow remote server or requesting a lot of user IDs was pretty much guaranteed to fail. It also uses the process context to allow correct cancellation when Dendrite wants to shut down cleanly. --- keyserver/internal/device_list_update.go | 204 ++++++++++-------- keyserver/internal/device_list_update_test.go | 7 +- keyserver/keyserver.go | 2 +- 3 files changed, 117 insertions(+), 96 deletions(-) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 525f8a99d..fcfcd092d 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -17,6 +17,7 @@ package internal import ( "context" "encoding/json" + "errors" "fmt" "hash/fnv" "net" @@ -31,6 +32,7 @@ import ( fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/setup/process" ) var ( @@ -45,6 +47,9 @@ var ( ) ) +const defaultWaitTime = time.Minute +const requestTimeout = time.Second * 30 + func init() { prometheus.MustRegister( deviceListUpdateCount, @@ -80,6 +85,7 @@ func init() { // In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is // set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried. type DeviceListUpdater struct { + process *process.ProcessContext // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 // request to the remote server and race. // TODO: Put in an LRU cache to bound growth @@ -131,10 +137,12 @@ type KeyChangeProducer interface { // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. func NewDeviceListUpdater( - db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer, + process *process.ProcessContext, db DeviceListUpdaterDatabase, + api DeviceListUpdaterAPI, producer KeyChangeProducer, fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int, ) *DeviceListUpdater { return &DeviceListUpdater{ + process: process, userIDToMutex: make(map[string]*sync.Mutex), mu: &sync.Mutex{}, db: db, @@ -234,7 +242,7 @@ func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib. "prev_ids": event.PrevID, "display_name": event.DeviceDisplayName, "deleted": event.Deleted, - }).Info("DeviceListUpdater.Update") + }).Trace("DeviceListUpdater.Update") // if we haven't missed anything update the database and notify users if exists || event.Deleted { @@ -378,111 +386,123 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { } func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) { - deviceListUpdateCount.WithLabelValues(string(serverName)).Inc() - requestTimeout := time.Second * 30 // max amount of time we want to spend on each request - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - defer cancel() + ctx := u.process.Context() logger := util.GetLogger(ctx).WithField("server_name", serverName) - waitTime := 2 * time.Second - // fetch stale device lists + deviceListUpdateCount.WithLabelValues(string(serverName)).Inc() + + waitTime := defaultWaitTime // How long should we wait to try again? + successCount := 0 // How many user requests failed? + userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName}) if err != nil { logger.WithError(err).Error("Failed to load stale device lists") return waitTime, true } - failCount := 0 -userLoop: + defer func() { + for _, userID := range userIDs { + // always clear the channel to unblock Update calls regardless of success/failure + u.clearChannel(userID) + } + }() + for _, userID := range userIDs { - if ctx.Err() != nil { - // we've timed out, give up and go to the back of the queue to let another server be processed. - failCount += 1 - waitTime = time.Minute * 10 + userWait, err := u.processServerUser(ctx, serverName, userID) + if err != nil { + if userWait > waitTime { + waitTime = userWait + } break } - res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) - if err != nil { - failCount += 1 - select { - case <-ctx.Done(): - // we've timed out, give up and go to the back of the queue to let another server be processed. - waitTime = time.Minute * 10 - break userLoop - default: - } - switch e := err.(type) { - case *fedsenderapi.FederationClientError: - if e.RetryAfter > 0 { - waitTime = e.RetryAfter - } else if e.Blacklisted { - waitTime = time.Hour * 8 - break userLoop - } else if e.Code >= 300 { - // We didn't get a real FederationClientError (e.g. in polylith mode, where gomatrix.HTTPError - // are "converted" to FederationClientError), but we probably shouldn't hit them every $waitTime seconds. - waitTime = time.Hour - break userLoop - } - case net.Error: - // Use the default waitTime, if it's a timeout. - // It probably doesn't make sense to try further users. - if !e.Timeout() { - waitTime = time.Minute * 10 - logger.WithError(e).Error("GetUserDevices returned net.Error") - break userLoop - } - case gomatrix.HTTPError: - // The remote server returned an error, give it some time to recover. - // This is to avoid spamming remote servers, which may not be Matrix servers anymore. - if e.Code >= 300 { - waitTime = time.Hour - logger.WithError(e).Error("GetUserDevices returned gomatrix.HTTPError") - break userLoop - } - default: - // Something else failed - waitTime = time.Minute * 10 - logger.WithError(err).WithField("user_id", userID).Debugf("GetUserDevices returned unknown error type: %T", err) - break userLoop - } - continue - } - if res.MasterKey != nil || res.SelfSigningKey != nil { - uploadReq := &api.PerformUploadDeviceKeysRequest{ - UserID: userID, - } - uploadRes := &api.PerformUploadDeviceKeysResponse{} - if res.MasterKey != nil { - if err = sanityCheckKey(*res.MasterKey, userID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err == nil { - uploadReq.MasterKey = *res.MasterKey - } - } - if res.SelfSigningKey != nil { - if err = sanityCheckKey(*res.SelfSigningKey, userID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err == nil { - uploadReq.SelfSigningKey = *res.SelfSigningKey - } - } - _ = u.api.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes) - } - err = u.updateDeviceList(&res) - if err != nil { - logger.WithError(err).WithField("user_id", userID).Error("Fetched device list but failed to store/emit it") - failCount += 1 - } + successCount++ } - if failCount > 0 { + + allUsersSucceeded := successCount == len(userIDs) + if !allUsersSucceeded { logger.WithFields(logrus.Fields{ - "total": len(userIDs), - "failed": failCount, - "skipped": len(userIDs) - failCount, - "waittime": waitTime, + "total": len(userIDs), + "succeeded": successCount, + "failed": len(userIDs) - successCount, + "wait_time": waitTime, }).Warn("Failed to query device keys for some users") } - for _, userID := range userIDs { - // always clear the channel to unblock Update calls regardless of success/failure - u.clearChannel(userID) + return waitTime, !allUsersSucceeded +} + +func (u *DeviceListUpdater) processServerUser(ctx context.Context, serverName gomatrixserverlib.ServerName, userID string) (time.Duration, error) { + ctx, cancel := context.WithTimeout(ctx, requestTimeout) + defer cancel() + logger := util.GetLogger(ctx).WithFields(logrus.Fields{ + "server_name": serverName, + "user_id": userID, + }) + + res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return time.Minute * 10, err + } + switch e := err.(type) { + case *json.UnmarshalTypeError, *json.SyntaxError: + logger.WithError(err).Debugf("Device list update for %q contained invalid JSON", userID) + return defaultWaitTime, nil + case *fedsenderapi.FederationClientError: + if e.RetryAfter > 0 { + return e.RetryAfter, err + } else if e.Blacklisted { + return time.Hour * 8, err + } else if e.Code >= 300 { + // We didn't get a real FederationClientError (e.g. in polylith mode, where gomatrix.HTTPError + // are "converted" to FederationClientError), but we probably shouldn't hit them every $waitTime seconds. + return time.Hour, err + } + case net.Error: + // Use the default waitTime, if it's a timeout. + // It probably doesn't make sense to try further users. + if !e.Timeout() { + logger.WithError(e).Debug("GetUserDevices returned net.Error") + return time.Minute * 10, err + } + case gomatrix.HTTPError: + // The remote server returned an error, give it some time to recover. + // This is to avoid spamming remote servers, which may not be Matrix servers anymore. + if e.Code >= 300 { + logger.WithError(e).Debug("GetUserDevices returned gomatrix.HTTPError") + return time.Hour, err + } + default: + // Something else failed + logger.WithError(err).Debugf("GetUserDevices returned unknown error type: %T", err) + return time.Minute * 10, err + } } - return waitTime, failCount > 0 + if res.UserID != userID { + logger.WithError(err).Debugf("User ID %q in device list update response doesn't match expected %q", res.UserID, userID) + return defaultWaitTime, nil + } + if res.MasterKey != nil || res.SelfSigningKey != nil { + uploadReq := &api.PerformUploadDeviceKeysRequest{ + UserID: userID, + } + uploadRes := &api.PerformUploadDeviceKeysResponse{} + if res.MasterKey != nil { + if err = sanityCheckKey(*res.MasterKey, userID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err == nil { + uploadReq.MasterKey = *res.MasterKey + } + } + if res.SelfSigningKey != nil { + if err = sanityCheckKey(*res.SelfSigningKey, userID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err == nil { + uploadReq.SelfSigningKey = *res.SelfSigningKey + } + } + _ = u.api.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes) + } + err = u.updateDeviceList(&res) + if err != nil { + logger.WithError(err).Error("Fetched device list but failed to store/emit it") + return defaultWaitTime, err + } + return defaultWaitTime, nil } func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error { diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go index 0520a9e66..28a13a0a0 100644 --- a/keyserver/internal/device_list_update_test.go +++ b/keyserver/internal/device_list_update_test.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/setup/process" ) var ( @@ -146,7 +147,7 @@ func TestUpdateHavePrevID(t *testing.T) { } ap := &mockDeviceListUpdaterAPI{} producer := &mockKeyChangeProducer{} - updater := NewDeviceListUpdater(db, ap, producer, nil, 1) + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1) event := gomatrixserverlib.DeviceListUpdateEvent{ DeviceDisplayName: "Foo Bar", Deleted: false, @@ -218,7 +219,7 @@ func TestUpdateNoPrevID(t *testing.T) { `)), }, nil }) - updater := NewDeviceListUpdater(db, ap, producer, fedClient, 2) + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2) if err := updater.Start(); err != nil { t.Fatalf("failed to start updater: %s", err) } @@ -287,7 +288,7 @@ func TestDebounce(t *testing.T) { close(incomingFedReq) return <-fedCh, nil }) - updater := NewDeviceListUpdater(db, ap, producer, fedClient, 1) + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1) if err := updater.Start(); err != nil { t.Fatalf("failed to start updater: %s", err) } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 5124b777e..9ae4f9ca3 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -58,7 +58,7 @@ func NewInternalAPI( FedClient: fedClient, Producer: keyChangeProducer, } - updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable + updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable ap.Updater = updater go func() { if err := updater.Start(); err != nil {