From 642f9cb964b20f52133e11c52e40733f7bc07320 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 5 Aug 2020 13:41:16 +0100 Subject: [PATCH 1/5] Process inbound device list updates from federation (#1240) * Add InputDeviceListUpdate * Unbreak unit tests * Process inbound device list updates from federation - Persist the keys in the keyserver and produce key changes - Does not currently fetch keys from the remote server if the prev IDs are missing * Linting --- federationapi/routing/routing.go | 2 +- federationapi/routing/send.go | 21 ++++++ keyserver/api/api.go | 11 ++++ keyserver/internal/internal.go | 66 ++++++++++++++++++- keyserver/inthttp/client.go | 27 ++++++-- keyserver/inthttp/server.go | 11 ++++ keyserver/keyserver.go | 12 ++-- keyserver/storage/interface.go | 11 +++- .../storage/postgres/device_keys_table.go | 21 ++++++ keyserver/storage/shared/storage.go | 20 +++++- .../storage/sqlite3/device_keys_table.go | 23 +++++++ keyserver/storage/storage_test.go | 12 ++-- keyserver/storage/tables/interface.go | 1 + syncapi/internal/keychange_test.go | 3 + 14 files changed, 220 insertions(+), 21 deletions(-) diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 9808d6238..30a65271d 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -82,7 +82,7 @@ func Setup( func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, rsAPI, eduAPI, keys, federation, + cfg, rsAPI, eduAPI, keyAPI, keys, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 680eaccd3..903a2f22c 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/config" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -37,6 +38,7 @@ func Send( cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI, + keyAPI keyapi.KeyInternalAPI, keys gomatrixserverlib.JSONVerifier, federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { @@ -48,6 +50,7 @@ func Send( federation: federation, haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), newEvents: make(map[string]bool), + keyAPI: keyAPI, } var txnEvents struct { @@ -100,6 +103,7 @@ type txnReq struct { context context.Context rsAPI api.RoomserverInternalAPI eduAPI eduserverAPI.EDUServerInputAPI + keyAPI keyapi.KeyInternalAPI keys gomatrixserverlib.JSONVerifier federation txnFederationClient // local cache of events for auth checks, etc - this may include events @@ -308,12 +312,29 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { } } } + case gomatrixserverlib.MDeviceListUpdate: + t.processDeviceListUpdate(e) default: util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu") } } } +func (t *txnReq) processDeviceListUpdate(e gomatrixserverlib.EDU) { + var payload gomatrixserverlib.DeviceListUpdateEvent + if err := json.Unmarshal(e.Content, &payload); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal device list update event") + return + } + var inputRes keyapi.InputDeviceListUpdateResponse + t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{ + Event: payload, + }, &inputRes) + if inputRes.Error != nil { + util.GetLogger(t.context).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate") + } +} + func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error { prevEventIDs := e.PrevEventIDs() diff --git a/keyserver/api/api.go b/keyserver/api/api.go index c864b328d..c3481a38d 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -21,11 +21,14 @@ import ( "time" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" ) type KeyInternalAPI interface { // SetUserAPI assigns a user API to query when extracting device names. SetUserAPI(i userapi.UserInternalAPI) + // InputDeviceListUpdate from a federated server EDU + InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse) PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse) // PerformClaimKeys claims one-time keys for use in pre-key messages PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) @@ -200,3 +203,11 @@ type QueryDeviceMessagesResponse struct { Devices []DeviceMessage Error *KeyError } + +type InputDeviceListUpdateRequest struct { + Event gomatrixserverlib.DeviceListUpdateEvent +} + +type InputDeviceListUpdateResponse struct { + Error *KeyError +} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 474f30ff3..d6e24566f 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -38,12 +38,76 @@ type KeyInternalAPI struct { FedClient *gomatrixserverlib.FederationClient UserAPI userapi.UserInternalAPI Producer *producers.KeyChange + // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 + // request to the remote server and race. + // TODO: Put in an LRU cache to bound growth + UserIDToMutex map[string]*sync.Mutex + Mutex *sync.Mutex // protects UserIDToMutex } func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { a.UserAPI = i } +func (a *KeyInternalAPI) mutex(userID string) *sync.Mutex { + a.Mutex.Lock() + defer a.Mutex.Unlock() + if a.UserIDToMutex[userID] == nil { + a.UserIDToMutex[userID] = &sync.Mutex{} + } + return a.UserIDToMutex[userID] +} + +func (a *KeyInternalAPI) InputDeviceListUpdate( + ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, +) { + mu := a.mutex(req.Event.UserID) + mu.Lock() + defer mu.Unlock() + // check if we have the prev IDs + exists, err := a.DB.PrevIDsExists(ctx, req.Event.UserID, req.Event.PrevID) + if err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("failed to check if prev ids exist: %s", err), + } + return + } + + // if we haven't missed anything update the database and notify users + if exists { + keys := []api.DeviceMessage{ + { + DeviceKeys: api.DeviceKeys{ + DeviceID: req.Event.DeviceID, + DisplayName: req.Event.DeviceDisplayName, + KeyJSON: req.Event.Keys, + UserID: req.Event.UserID, + }, + StreamID: req.Event.StreamID, + }, + } + err = a.DB.StoreRemoteDeviceKeys(ctx, keys) + if err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("failed to store remote device keys: %s", err), + } + return + } + // ALWAYS emit key changes when we've been poked over federation just in case + // this poke is important for something. + err = a.Producer.ProduceKeyChanges(keys) + if err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("failed to emit remote device key changes: %s", err), + } + } + return + } + + // if we're missing an ID go and fetch it from the remote HS + +} + func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { if req.Partition < 0 { req.Partition = a.Producer.DefaultPartition() @@ -351,7 +415,7 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per return } // store the device keys and emit changes - err := a.DB.StoreDeviceKeys(ctx, keysToStore) + err := a.DB.StoreLocalDeviceKeys(ctx, keysToStore) if err != nil { res.Error = &api.KeyError{ Err: fmt.Sprintf("failed to store device keys: %s", err.Error()), diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go index 93b190511..982000222 100644 --- a/keyserver/inthttp/client.go +++ b/keyserver/inthttp/client.go @@ -27,12 +27,13 @@ import ( // HTTP paths for the internal HTTP APIs const ( - PerformUploadKeysPath = "/keyserver/performUploadKeys" - PerformClaimKeysPath = "/keyserver/performClaimKeys" - QueryKeysPath = "/keyserver/queryKeys" - QueryKeyChangesPath = "/keyserver/queryKeyChanges" - QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys" - QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages" + InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate" + PerformUploadKeysPath = "/keyserver/performUploadKeys" + PerformClaimKeysPath = "/keyserver/performClaimKeys" + QueryKeysPath = "/keyserver/queryKeys" + QueryKeyChangesPath = "/keyserver/queryKeyChanges" + QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys" + QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages" ) // NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API. @@ -58,6 +59,20 @@ type httpKeyInternalAPI struct { func (h *httpKeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { // no-op: doesn't need it } +func (h *httpKeyInternalAPI) InputDeviceListUpdate( + ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, +) { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate") + defer span.Finish() + + apiURL := h.apiURL + InputDeviceListUpdatePath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) + if err != nil { + res.Error = &api.KeyError{ + Err: err.Error(), + } + } +} func (h *httpKeyInternalAPI) PerformClaimKeys( ctx context.Context, diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go index f0cd30388..7dfaed2e7 100644 --- a/keyserver/inthttp/server.go +++ b/keyserver/inthttp/server.go @@ -25,6 +25,17 @@ import ( ) func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) { + internalAPIMux.Handle(InputDeviceListUpdatePath, + httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse { + request := api.InputDeviceListUpdateRequest{} + response := api.InputDeviceListUpdateResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + s.InputDeviceListUpdate(req.Context(), &request, &response) + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle(PerformClaimKeysPath, httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse { request := api.PerformClaimKeysRequest{} diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 36bedf34b..79d9cec96 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -15,6 +15,8 @@ package keyserver import ( + "sync" + "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/config" @@ -51,9 +53,11 @@ func NewInternalAPI( DB: db, } return &internal.KeyInternalAPI{ - DB: db, - ThisServer: cfg.Matrix.ServerName, - FedClient: fedClient, - Producer: keyChangeProducer, + DB: db, + ThisServer: cfg.Matrix.ServerName, + FedClient: fedClient, + Producer: keyChangeProducer, + Mutex: &sync.Mutex{}, + UserIDToMutex: make(map[string]*sync.Mutex), } } diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index 11284d86b..f67bbf715 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -35,11 +35,18 @@ type Database interface { // DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` or `StreamID` already then it will be replaced. DeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) error - // StoreDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key + // StoreLocalDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key // for this (user, device). // The `StreamID` for each message is set on successful insertion. In the event the key already exists, the existing StreamID is set. // Returns an error if there was a problem storing the keys. - StoreDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error + StoreLocalDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error + + // StoreRemoteDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key + // for this (user, device). Does not modify the stream ID for keys. + StoreRemoteDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error + + // PrevIDsExists returns true if all prev IDs exist for this user. + PrevIDsExists(ctx context.Context, userID string, prevIDs []int) (bool, error) // DeviceKeysForUser returns the device keys for the device IDs given. If the length of deviceIDs is 0, all devices are selected. // If there are some missing keys, they are omitted from the returned slice. There is no ordering on the returned slice. diff --git a/keyserver/storage/postgres/device_keys_table.go b/keyserver/storage/postgres/device_keys_table.go index e1b4e9475..d321860d4 100644 --- a/keyserver/storage/postgres/device_keys_table.go +++ b/keyserver/storage/postgres/device_keys_table.go @@ -19,6 +19,7 @@ import ( "database/sql" "time" + "github.com/lib/pq" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage/tables" @@ -56,12 +57,16 @@ const selectBatchDeviceKeysSQL = "" + const selectMaxStreamForUserSQL = "" + "SELECT MAX(stream_id) FROM keyserver_device_keys WHERE user_id=$1" +const countStreamIDsForUserSQL = "" + + "SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id = ANY($2)" + type deviceKeysStatements struct { db *sql.DB upsertDeviceKeysStmt *sql.Stmt selectDeviceKeysStmt *sql.Stmt selectBatchDeviceKeysStmt *sql.Stmt selectMaxStreamForUserStmt *sql.Stmt + countStreamIDsForUserStmt *sql.Stmt } func NewPostgresDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) { @@ -84,6 +89,9 @@ func NewPostgresDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) { if s.selectMaxStreamForUserStmt, err = db.Prepare(selectMaxStreamForUserSQL); err != nil { return nil, err } + if s.countStreamIDsForUserStmt, err = db.Prepare(countStreamIDsForUserSQL); err != nil { + return nil, err + } return s, nil } @@ -115,6 +123,19 @@ func (s *deviceKeysStatements) SelectMaxStreamIDForUser(ctx context.Context, txn return } +func (s *deviceKeysStatements) CountStreamIDsForUser(ctx context.Context, userID string, streamIDs []int64) (int, error) { + // nullable if there are no results + var count sql.NullInt32 + err := s.countStreamIDsForUserStmt.QueryRowContext(ctx, userID, pq.Int64Array(streamIDs)).Scan(&count) + if err != nil { + return 0, err + } + if count.Valid { + return int(count.Int32), nil + } + return 0, nil +} + func (s *deviceKeysStatements) InsertDeviceKeys(ctx context.Context, txn *sql.Tx, keys []api.DeviceMessage) error { for _, key := range keys { now := time.Now().Unix() diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go index e78ee9433..787297740 100644 --- a/keyserver/storage/shared/storage.go +++ b/keyserver/storage/shared/storage.go @@ -47,7 +47,25 @@ func (d *Database) DeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) return d.DeviceKeysTable.SelectDeviceKeysJSON(ctx, keys) } -func (d *Database) StoreDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error { +func (d *Database) PrevIDsExists(ctx context.Context, userID string, prevIDs []int) (bool, error) { + sids := make([]int64, len(prevIDs)) + for i := range prevIDs { + sids[i] = int64(prevIDs[i]) + } + count, err := d.DeviceKeysTable.CountStreamIDsForUser(ctx, userID, sids) + if err != nil { + return false, err + } + return count == len(prevIDs), nil +} + +func (d *Database) StoreRemoteDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error { + return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + return d.DeviceKeysTable.InsertDeviceKeys(ctx, txn, keys) + }) +} + +func (d *Database) StoreLocalDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error { // work out the latest stream IDs for each user userIDToStreamID := make(map[string]int) for _, k := range keys { diff --git a/keyserver/storage/sqlite3/device_keys_table.go b/keyserver/storage/sqlite3/device_keys_table.go index 900d1238f..15d9c775f 100644 --- a/keyserver/storage/sqlite3/device_keys_table.go +++ b/keyserver/storage/sqlite3/device_keys_table.go @@ -17,6 +17,7 @@ package sqlite3 import ( "context" "database/sql" + "strings" "time" "github.com/matrix-org/dendrite/internal" @@ -53,6 +54,9 @@ const selectBatchDeviceKeysSQL = "" + const selectMaxStreamForUserSQL = "" + "SELECT MAX(stream_id) FROM keyserver_device_keys WHERE user_id=$1" +const countStreamIDsForUserSQL = "" + + "SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id IN ($2)" + type deviceKeysStatements struct { db *sql.DB writer *sqlutil.TransactionWriter @@ -143,6 +147,25 @@ func (s *deviceKeysStatements) SelectMaxStreamIDForUser(ctx context.Context, txn return } +func (s *deviceKeysStatements) CountStreamIDsForUser(ctx context.Context, userID string, streamIDs []int64) (int, error) { + iStreamIDs := make([]interface{}, len(streamIDs)+1) + iStreamIDs[0] = userID + for i := range streamIDs { + iStreamIDs[i+1] = streamIDs[i] + } + query := strings.Replace(countStreamIDsForUserSQL, "($2)", sqlutil.QueryVariadicOffset(len(streamIDs), 1), 1) + // nullable if there are no results + var count sql.NullInt32 + err := s.db.QueryRowContext(ctx, query, iStreamIDs...).Scan(&count) + if err != nil { + return 0, err + } + if count.Valid { + return int(count.Int32), nil + } + return 0, nil +} + func (s *deviceKeysStatements) InsertDeviceKeys(ctx context.Context, txn *sql.Tx, keys []api.DeviceMessage) error { return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { for _, key := range keys { diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 949d9dd6c..ec1b299ff 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -126,15 +126,15 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) { // StreamID: 2 as this is a 2nd device key }, } - MustNotError(t, db.StoreDeviceKeys(ctx, msgs)) + MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs)) if msgs[0].StreamID != 1 { - t.Fatalf("Expected StoreDeviceKeys to set StreamID=1 but got %d", msgs[0].StreamID) + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 but got %d", msgs[0].StreamID) } if msgs[1].StreamID != 1 { - t.Fatalf("Expected StoreDeviceKeys to set StreamID=1 (different user) but got %d", msgs[1].StreamID) + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 (different user) but got %d", msgs[1].StreamID) } if msgs[2].StreamID != 2 { - t.Fatalf("Expected StoreDeviceKeys to set StreamID=2 (another device) but got %d", msgs[2].StreamID) + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=2 (another device) but got %d", msgs[2].StreamID) } // updating a device sets the next stream ID for that user @@ -148,9 +148,9 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) { // StreamID: 3 }, } - MustNotError(t, db.StoreDeviceKeys(ctx, msgs)) + MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs)) if msgs[0].StreamID != 3 { - t.Fatalf("Expected StoreDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID) + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID) } // Querying for device keys returns the latest stream IDs diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go index 65da3310c..ac932d56d 100644 --- a/keyserver/storage/tables/interface.go +++ b/keyserver/storage/tables/interface.go @@ -35,6 +35,7 @@ type DeviceKeys interface { SelectDeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) error InsertDeviceKeys(ctx context.Context, txn *sql.Tx, keys []api.DeviceMessage) error SelectMaxStreamIDForUser(ctx context.Context, txn *sql.Tx, userID string) (streamID int32, err error) + CountStreamIDsForUser(ctx context.Context, userID string, streamIDs []int64) (int, error) SelectBatchDeviceKeys(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error) } diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index cc33c7382..6765fa657 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -44,6 +44,9 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT } func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) { +} +func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) { + } type mockCurrentStateAPI struct { From b7491aae03324b722042ab685013003ad483d401 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 6 Aug 2020 16:00:42 +0100 Subject: [PATCH 2/5] Yggdrasil demo updates (#1241) * PerformServersAlive in PerformBroadcastEDU * Don't double-pointer * More reliable QUIC session handling * Direct peer lookup, other tweaks * Tweaks * Try to wake up queues on incoming QUIC session * Set session callbak on gobind build * Fix incoming session storage * Stateless reset, other tweaks * Reset sessions when coordinates change * Disable HTTP connection reuse, tweak timeouts --- build/gobind/monolith.go | 29 ++- cmd/dendrite-demo-yggdrasil/main.go | 22 +- cmd/dendrite-demo-yggdrasil/yggconn/client.go | 12 +- cmd/dendrite-demo-yggdrasil/yggconn/node.go | 54 +++-- .../yggconn/session.go | 192 ++++++++++++++---- federationsender/internal/perform.go | 13 ++ go.mod | 5 +- go.sum | 33 +-- 8 files changed, 271 insertions(+), 89 deletions(-) diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 84103f385..c0c0ccb43 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" - "go.uber.org/atomic" ) type DendriteMonolith struct { @@ -34,8 +33,6 @@ type DendriteMonolith struct { StorageDirectory string listener net.Listener httpServer *http.Server - httpListening atomic.Bool - yggListening atomic.Bool } func (m *DendriteMonolith) BaseURL() string { @@ -46,6 +43,10 @@ func (m *DendriteMonolith) PeerCount() int { return m.YggdrasilNode.PeerCount() } +func (m *DendriteMonolith) SessionCount() int { + return m.YggdrasilNode.SessionCount() +} + func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { m.YggdrasilNode.SetMulticastEnabled(enabled) } @@ -86,7 +87,7 @@ func (m *DendriteMonolith) Start() { cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Matrix.PrivateKey = ygg.SigningPrivateKey() cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Matrix.FederationMaxRetries = 6 + cfg.Matrix.FederationMaxRetries = 8 cfg.Kafka.UseNaffka = true cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" cfg.Kafka.Topics.OutputClientData = "clientapiOutput" @@ -98,7 +99,7 @@ func (m *DendriteMonolith) Start() { cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s/dendrite-syncapi.db", m.StorageDirectory)) cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s/dendrite-roomserver.db", m.StorageDirectory)) cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s/dendrite-serverkey.db", m.StorageDirectory)) - cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s/dendrite-e2ekey.db", m.StorageDirectory)) + cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s/dendrite-keyserver.db", m.StorageDirectory)) cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s/dendrite-federationsender.db", m.StorageDirectory)) cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s/dendrite-appservice.db", m.StorageDirectory)) cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s/dendrite-currentstate.db", m.StorageDirectory)) @@ -136,6 +137,18 @@ func (m *DendriteMonolith) Start() { base, federation, rsAPI, stateAPI, keyRing, ) + ygg.SetSessionFunc(func(address string) { + req := &api.PerformServersAliveRequest{ + Servers: []gomatrixserverlib.ServerName{ + gomatrixserverlib.ServerName(address), + }, + } + res := &api.PerformServersAliveResponse{} + if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { + logrus.WithError(err).Error("Failed to send wake-up message to newly connected node") + } + }) + // The underlying roomserver implementation needs to be able to call the fedsender. // This is different to rsAPI which can be the http client which doesn't need this dependency rsAPI.SetFederationSenderAPI(fsAPI) @@ -175,9 +188,9 @@ func (m *DendriteMonolith) Start() { m.httpServer = &http.Server{ Addr: ":0", TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 60 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, BaseContext: func(_ net.Listener) context.Context { return context.Background() }, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 8f6b0eaf4..81bf994b2 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -72,7 +72,7 @@ func main() { cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Matrix.PrivateKey = ygg.SigningPrivateKey() cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Matrix.FederationMaxRetries = 6 + cfg.Matrix.FederationMaxRetries = 8 cfg.Kafka.UseNaffka = true cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" cfg.Kafka.Topics.OutputClientData = "clientapiOutput" @@ -83,7 +83,7 @@ func main() { cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName)) - cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName)) + cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName)) @@ -122,6 +122,18 @@ func main() { base, federation, rsAPI, stateAPI, keyRing, ) + ygg.SetSessionFunc(func(address string) { + req := &api.PerformServersAliveRequest{ + Servers: []gomatrixserverlib.ServerName{ + gomatrixserverlib.ServerName(address), + }, + } + res := &api.PerformServersAliveResponse{} + if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { + logrus.WithError(err).Error("Failed to send wake-up message to newly connected node") + } + }) + rsComponent.SetFederationSenderAPI(fsAPI) embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo") @@ -162,9 +174,9 @@ func main() { httpServer := &http.Server{ Addr: ":0", TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, - ReadTimeout: 15 * time.Second, - WriteTimeout: 45 * time.Second, - IdleTimeout: 60 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, BaseContext: func(_ net.Listener) context.Context { return context.Background() }, diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/client.go b/cmd/dendrite-demo-yggdrasil/yggconn/client.go index c5b3eb722..56afe264d 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/client.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/client.go @@ -24,9 +24,11 @@ func (n *Node) CreateClient( tr.RegisterProtocol( "matrix", &yggroundtripper{ inner: &http.Transport{ - TLSHandshakeTimeout: 20 * time.Second, + MaxIdleConns: -1, + MaxIdleConnsPerHost: -1, + TLSHandshakeTimeout: 10 * time.Second, ResponseHeaderTimeout: 10 * time.Second, - IdleConnTimeout: 60 * time.Second, + IdleConnTimeout: 30 * time.Second, DialContext: n.DialerContext, }, }, @@ -41,9 +43,11 @@ func (n *Node) CreateFederationClient( tr.RegisterProtocol( "matrix", &yggroundtripper{ inner: &http.Transport{ - TLSHandshakeTimeout: 20 * time.Second, + MaxIdleConns: -1, + MaxIdleConnsPerHost: -1, + TLSHandshakeTimeout: 10 * time.Second, ResponseHeaderTimeout: 10 * time.Second, - IdleConnTimeout: 60 * time.Second, + IdleConnTimeout: 30 * time.Second, DialContext: n.DialerContext, TLSClientConfig: n.tlsConfig, }, diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 2ed15b3a1..9b123aa64 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -32,6 +32,7 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" @@ -41,17 +42,20 @@ import ( ) type Node struct { - core *yggdrasil.Core - config *yggdrasilconfig.NodeConfig - state *yggdrasilconfig.NodeState - multicast *yggdrasilmulticast.Multicast - log *gologme.Logger - listener quic.Listener - tlsConfig *tls.Config - quicConfig *quic.Config - sessions sync.Map // string -> quic.Session - incoming chan QUICStream - NewSession func(remote gomatrixserverlib.ServerName) + core *yggdrasil.Core + config *yggdrasilconfig.NodeConfig + state *yggdrasilconfig.NodeState + multicast *yggdrasilmulticast.Multicast + log *gologme.Logger + listener quic.Listener + tlsConfig *tls.Config + quicConfig *quic.Config + sessions sync.Map // string -> *session + sessionCount atomic.Uint32 + sessionFunc func(address string) + coords sync.Map // string -> yggdrasil.Coords + incoming chan QUICStream + NewSession func(remote gomatrixserverlib.ServerName) } func (n *Node) Dialer(_, address string) (net.Conn, error) { @@ -90,6 +94,19 @@ func Setup(instanceName, storageDirectory string) (*Node, error) { } } + n.core.SetCoordChangeCallback(func(old, new yggdrasil.Coords) { + fmt.Println("COORDINATE CHANGE!") + fmt.Println("Old:", old) + fmt.Println("New:", new) + n.sessions.Range(func(k, v interface{}) bool { + if s, ok := v.(*session); ok { + fmt.Println("Killing session", k) + s.kill() + } + return true + }) + }) + n.config.Peers = []string{} n.config.AdminListen = "none" n.config.MulticastInterfaces = []string{} @@ -124,8 +141,9 @@ func Setup(instanceName, storageDirectory string) (*Node, error) { MaxIncomingUniStreams: 0, KeepAlive: true, MaxIdleTimeout: time.Minute * 30, - HandshakeTimeout: time.Second * 30, + HandshakeTimeout: time.Second * 15, } + copy(n.quicConfig.StatelessResetKey, n.EncryptionPublicKey()) n.log.Println("Public curve25519:", n.core.EncryptionPublicKey()) n.log.Println("Public ed25519:", n.core.SigningPublicKey()) @@ -173,17 +191,25 @@ func (n *Node) SigningPrivateKey() ed25519.PrivateKey { return ed25519.PrivateKey(privBytes) } +func (n *Node) SetSessionFunc(f func(address string)) { + n.sessionFunc = f +} + func (n *Node) PeerCount() int { return len(n.core.GetPeers()) - 1 } +func (n *Node) SessionCount() int { + return int(n.sessionCount.Load()) +} + func (n *Node) KnownNodes() []gomatrixserverlib.ServerName { nodemap := map[string]struct{}{ - "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{}, + "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": {}, } /* for _, peer := range n.core.GetSwitchPeers() { - nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{} + nodemap[hex.EncodeToString(peer.PublicKey[:])] = struct{}{} } */ n.sessions.Range(func(_, v interface{}) bool { diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go index 0d231f6de..0cf524d9b 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -31,8 +31,32 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" ) +type session struct { + node *Node + session quic.Session + address string + context context.Context + cancel context.CancelFunc +} + +func (n *Node) newSession(sess quic.Session, address string) *session { + ctx, cancel := context.WithCancel(context.TODO()) + return &session{ + node: n, + session: sess, + address: address, + context: ctx, + cancel: cancel, + } +} + +func (s *session) kill() { + s.cancel() +} + func (n *Node) listenFromYgg() { var err error n.listener, err = quic.Listen( @@ -55,22 +79,31 @@ func (n *Node) listenFromYgg() { _ = session.CloseWithError(0, "expected a peer certificate") continue } - address := session.ConnectionState().PeerCertificates[0].Subject.CommonName + address := session.ConnectionState().PeerCertificates[0].DNSNames[0] n.log.Infoln("Accepted connection from", address) - go n.listenFromQUIC(session, address) + go n.newSession(session, address).listenFromQUIC() + go n.sessionFunc(address) } } -func (n *Node) listenFromQUIC(session quic.Session, address string) { - n.sessions.Store(address, session) - defer n.sessions.Delete(address) +func (s *session) listenFromQUIC() { + if existing, ok := s.node.sessions.Load(s.address); ok { + if existingSession, ok := existing.(*session); ok { + fmt.Println("Killing existing session to replace", s.address) + existingSession.kill() + } + } + s.node.sessionCount.Inc() + s.node.sessions.Store(s.address, s) + defer s.node.sessions.Delete(s.address) + defer s.node.sessionCount.Dec() for { - st, err := session.AcceptStream(context.TODO()) + st, err := s.session.AcceptStream(s.context) if err != nil { - n.log.Println("session.AcceptStream:", err) + s.node.log.Println("session.AcceptStream:", err) return } - n.incoming <- QUICStream{st, session} + s.node.incoming <- QUICStream{st, s.session} } } @@ -95,53 +128,124 @@ func (n *Node) Dial(network, address string) (net.Conn, error) { } // Implements http.Transport.DialContext +// nolint:gocyclo func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) { s, ok1 := n.sessions.Load(address) - session, ok2 := s.(quic.Session) - if !ok1 || !ok2 || (ok1 && ok2 && session.ConnectionState().HandshakeComplete) { - dest, err := hex.DecodeString(address) - if err != nil { - return nil, err - } - if len(dest) != crypto.BoxPubKeyLen { - return nil, errors.New("invalid key length supplied") - } - var pubKey crypto.BoxPubKey - copy(pubKey[:], dest) - nodeID := crypto.GetNodeID(&pubKey) - nodeMask := &crypto.NodeID{} - for i := range nodeMask { - nodeMask[i] = 0xFF + session, ok2 := s.(*session) + if !ok1 || !ok2 { + // First of all, check if we think we know the coords of this + // node. If we do then we'll try to dial to it directly. This + // will either succeed or fail. + if v, ok := n.coords.Load(address); ok { + coords, ok := v.(yggdrasil.Coords) + if !ok { + n.coords.Delete(address) + return nil, errors.New("should have found yggdrasil.Coords but didn't") + } + n.log.Infof("Coords %s for %q cached, trying to dial", coords.String(), address) + var err error + // We think we know the coords. Try to dial the node. + if session, err = n.tryDial(address, coords); err != nil { + // We thought we knew the coords but it didn't result + // in a successful dial. Nuke them from the cache. + n.coords.Delete(address) + n.log.Infof("Cached coords %s for %q failed", coords.String(), address) + } } - fmt.Println("Resolving coords") - coords, err := n.core.Resolve(nodeID, nodeMask) - if err != nil { - return nil, fmt.Errorf("n.core.Resolve: %w", err) - } - fmt.Println("Found coords:", coords) - fmt.Println("Dialling") + // We either don't know the coords for the node, or we failed + // to dial it before, in which case try to resolve the coords. + if _, ok := n.coords.Load(address); !ok { + var coords yggdrasil.Coords + var err error - session, err = quic.Dial( - n.core, // yggdrasil.PacketConn - coords, // dial address - address, // dial SNI - n.tlsConfig, // TLS config - n.quicConfig, // QUIC config - ) - if err != nil { - n.log.Println("n.dialer.DialContext:", err) - return nil, err + // First look and see if the node is something that we already + // know about from our direct switch peers. + for _, peer := range n.core.GetSwitchPeers() { + if peer.PublicKey.String() == address { + coords = peer.Coords + n.log.Infof("%q is a direct peer, coords are %s", address, coords.String()) + n.coords.Store(address, coords) + break + } + } + + // If it isn' a node that we know directly then try to search + // the network. + if coords == nil { + n.log.Infof("Searching for coords for %q", address) + dest, derr := hex.DecodeString(address) + if derr != nil { + return nil, derr + } + if len(dest) != crypto.BoxPubKeyLen { + return nil, errors.New("invalid key length supplied") + } + var pubKey crypto.BoxPubKey + copy(pubKey[:], dest) + nodeID := crypto.GetNodeID(&pubKey) + nodeMask := &crypto.NodeID{} + for i := range nodeMask { + nodeMask[i] = 0xFF + } + + fmt.Println("Resolving coords") + coords, err = n.core.Resolve(nodeID, nodeMask) + if err != nil { + return nil, fmt.Errorf("n.core.Resolve: %w", err) + } + fmt.Println("Found coords:", coords) + n.coords.Store(address, coords) + } + + // We now know the coords in theory. Let's try dialling the + // node again. + if session, err = n.tryDial(address, coords); err != nil { + return nil, fmt.Errorf("n.tryDial: %w", err) + } } - fmt.Println("Dial OK") - go n.listenFromQUIC(session, address) } - st, err := session.OpenStream() + + if session == nil { + return nil, fmt.Errorf("should have found session but didn't") + } + + st, err := session.session.OpenStream() if err != nil { n.log.Println("session.OpenStream:", err) + _ = session.session.CloseWithError(0, "expected to be able to open session") return nil, err } - return QUICStream{st, session}, nil + return QUICStream{st, session.session}, nil +} + +func (n *Node) tryDial(address string, coords yggdrasil.Coords) (*session, error) { + quicSession, err := quic.Dial( + n.core, // yggdrasil.PacketConn + coords, // dial address + address, // dial SNI + n.tlsConfig, // TLS config + n.quicConfig, // QUIC config + ) + if err != nil { + return nil, err + } + if len(quicSession.ConnectionState().PeerCertificates) != 1 { + _ = quicSession.CloseWithError(0, "expected a peer certificate") + return nil, errors.New("didn't receive a peer certificate") + } + if len(quicSession.ConnectionState().PeerCertificates[0].DNSNames) != 1 { + _ = quicSession.CloseWithError(0, "expected a DNS name") + return nil, errors.New("didn't receive a DNS name") + } + if gotAddress := quicSession.ConnectionState().PeerCertificates[0].DNSNames[0]; address != gotAddress { + _ = quicSession.CloseWithError(0, "you aren't the host I was hoping for") + return nil, fmt.Errorf("expected %q but dialled %q", address, gotAddress) + } + session := n.newSession(quicSession, address) + go session.listenFromQUIC() + go n.sessionFunc(address) + return session, nil } func (n *Node) generateTLSConfig() *tls.Config { diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index d9a4b963e..1b8e360c6 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -319,6 +319,11 @@ func (r *FederationSenderInternalAPI) PerformBroadcastEDU( if err != nil { return fmt.Errorf("r.db.GetAllJoinedHosts: %w", err) } + if len(destinations) == 0 { + return nil + } + + logrus.WithContext(ctx).Infof("Sending wake-up EDU to %d destination(s)", len(destinations)) edu := &gomatrixserverlib.EDU{ Type: "org.matrix.dendrite.wakeup", @@ -328,5 +333,13 @@ func (r *FederationSenderInternalAPI) PerformBroadcastEDU( return fmt.Errorf("r.queues.SendEDU: %w", err) } + wakeReq := &api.PerformServersAliveRequest{ + Servers: destinations, + } + wakeRes := &api.PerformServersAliveResponse{} + if err := r.PerformServersAlive(ctx, wakeReq, wakeRes); err != nil { + return fmt.Errorf("r.PerformServersAlive: %w", err) + } + return nil } diff --git a/go.mod b/go.mod index 986085b4c..e367b87ac 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/libp2p/go-libp2p-pubsub v0.2.5 github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-yamux v1.3.7 // indirect - github.com/lucas-clemente/quic-go v0.17.2 + github.com/lucas-clemente/quic-go v0.17.3 github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 @@ -36,9 +36,10 @@ require ( github.com/uber-go/atomic v1.3.0 // indirect github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-lib v1.5.0 - github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3 + github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de go.uber.org/atomic v1.4.0 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 + golang.org/x/mobile v0.0.0-20200801112145-973feb4309de // indirect gopkg.in/h2non/bimg.v1 v1.0.18 gopkg.in/yaml.v2 v2.2.8 ) diff --git a/go.sum b/go.sum index 53ab9cc10..9aa039091 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOv github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0 h1:p3puK8Sl2xK+2FnnIvY/C0N1aqJo2kbEsdAzU+Tnv48= github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Shopify/sarama v1.26.1 h1:3jnfWKD7gVwbB1KSy/lE0szA9duPuSFLViK0o/d3DgA= github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= @@ -399,8 +400,8 @@ github.com/libp2p/go-yamux v1.3.0 h1:FsYzT16Wq2XqUGJsBbOxoz9g+dFklvNi7jN6YFPfl7U github.com/libp2p/go-yamux v1.3.0/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= github.com/libp2p/go-yamux v1.3.7 h1:v40A1eSPJDIZwz2AvrV3cxpTZEGDP11QJbukmEhYyQI= github.com/libp2p/go-yamux v1.3.7/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE= -github.com/lucas-clemente/quic-go v0.17.2 h1:4iQInIuNQkPNZmsy9rCnwuOzpH0qGnDo4jn0QfI/qE4= -github.com/lucas-clemente/quic-go v0.17.2/go.mod h1:I0+fcNTdb9eS1ZcjQZbDVPGchJ86chcIxPALn9lEJqE= +github.com/lucas-clemente/quic-go v0.17.3 h1:jMX/MmDNCljfisgMmPGUcBJ+zUh9w3d3ia4YJjYS3TM= +github.com/lucas-clemente/quic-go v0.17.3/go.mod h1:I0+fcNTdb9eS1ZcjQZbDVPGchJ86chcIxPALn9lEJqE= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lxn/walk v0.0.0-20191128110447-55ccb3a9f5c1/go.mod h1:E23UucZGqpuUANJooIbHWCufXvOcT6E7Stq81gU+CSQ= github.com/lxn/win v0.0.0-20191128105842-2da648fda5b4/go.mod h1:ouWl4wViUNh8tPSIwxTVMuS014WakR1hqvBc2I0bMoA= @@ -421,16 +422,6 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b h1:ul/Jc5q5+QBHNvhd9idfglOwyGf/Tc3ittINEbKJPsQ= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d h1:WZXyd8YI+PQIDYjN8HxtqNRJ1DCckt9wPTi2P8cdnKM= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165250-352235625587 h1:n2IZkm5LI4lACulOa5WU6QwWUhHUtBZez7YIFr1fCOs= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165250-352235625587/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165739-3bd1ef0f0852 h1:OBvHjLWaT2KS9kGarX2ES0yKBL/wMxAeQB39tRrAAls= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165739-3bd1ef0f0852/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200804110046-7abbc2918807 h1:ufr+e2FBDuxcO5t/7PMfoiQoma4uyYzS/sLuJSR6tng= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200804110046-7abbc2918807/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d h1:zYk/bQ5bmHDsRqHBl57aBxo5bizsknWU3sunZf9WnWI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= @@ -664,6 +655,10 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1: github.com/yggdrasil-network/yggdrasil-extras v0.0.0-20200525205615-6c8a4a2e8855/go.mod h1:xQdsh08Io6nV4WRnOVTe6gI8/2iTvfLDQ0CYa5aMt+I= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3 h1:teLoIJgPHysREs8P6GlcS/PgaU9W9+GQndikFCQ1lY0= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= +github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806124633-bd1bdd6be073 h1:Fg4Bszd2qp6eyz/yDMYfB8g2PC1FfNQphGRgZAyD0VU= +github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806124633-bd1bdd6be073/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= +github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de h1:p91aw0Mvol825U+5bvV9BBPl+HQxIczj7wxIOxZs70M= +github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= @@ -689,6 +684,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -701,10 +697,19 @@ golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 h1:Q7tZBpemrlsc2I7IyODzhtallWRSm4Q0d09pL6XbQtU= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4= +golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= +golang.org/x/mobile v0.0.0-20200801112145-973feb4309de h1:OVJ6QQUBAesB8CZijKDSsXX7xYVtUhrkY0gwMfbi4p4= +golang.org/x/mobile v0.0.0-20200801112145-973feb4309de/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4= +golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -791,7 +796,11 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69 h1:yBHHx+XZqXJBm6Exke3N7V9gnlsyXxoCPEb1yVenjfk= +golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= From 32a4565b556b55b6dfe6e8f66dc2fe75a2b8d84b Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 6 Aug 2020 17:48:10 +0100 Subject: [PATCH 3/5] Add device list updater which manages updating remote device lists (#1242) * Add device list updater which manages updating remote device lists - Doesn't persist stale lists to the database yet - Doesn't have tests yet * Mark device lists as fresh when we persist --- go.mod | 2 +- go.sum | 2 + keyserver/internal/device_list_update.go | 298 +++++++++++++++++++++++ keyserver/internal/internal.go | 58 +---- keyserver/keyserver.go | 18 +- keyserver/storage/interface.go | 8 + keyserver/storage/shared/storage.go | 12 + 7 files changed, 334 insertions(+), 64 deletions(-) create mode 100644 keyserver/internal/device_list_update.go diff --git a/go.mod b/go.mod index e367b87ac..d2116fcd3 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d + github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 9aa039091..81f6f0a42 100644 --- a/go.sum +++ b/go.sum @@ -424,6 +424,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d h1:zYk/bQ5bmHDsRqHBl57aBxo5bizsknWU3sunZf9WnWI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d h1:BsSge1plJoqMyL5xnUFuTkzMXC42GbJW1kMbMizTmL4= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go new file mode 100644 index 000000000..19d8463d8 --- /dev/null +++ b/keyserver/internal/device_list_update.go @@ -0,0 +1,298 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "encoding/json" + "fmt" + "hash/fnv" + "sync" + "time" + + "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/producers" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +// DeviceListUpdater handles device list updates from remote servers. +// +// In the case where we have the prev_id for an update, the updater just stores the update (after acquiring a per-user lock). +// In the case where we do not have the prev_id for an update, the updater marks the user_id as stale and notifies +// a worker to get the latest device list for this user. Note: stream IDs are scoped per user so missing a prev_id +// for a (user, device) does not mean that DEVICE is outdated as the previous ID could be for a different device: +// we have to invalidate all devices for that user. Once the list has been fetched, the per-user lock is acquired and the +// updater stores the latest list along with the latest stream ID. +// +// On startup, the updater spins up N workers which are responsible for querying device keys from remote servers. +// Workers are scoped by homeserver domain, with one worker responsible for many domains, determined by hashing +// mod N the server name. Work is sent via a channel which just serves to "poke" the worker as the data is retrieved +// from the database (which allows us to batch requests to the same server). This has a number of desirable properties: +// - We guarantee only 1 in-flight /keys/query request per server at any time as there is exactly 1 worker responsible +// for that domain. +// - We don't have unbounded growth in proportion to the number of servers (this is more important in a P2P world where +// we have many many servers) +// - We can adjust concurrency (at the cost of memory usage) by tuning N, to accommodate mobile devices vs servers. +// The downsides are that: +// - Query requests can get queued behind other servers if they hash to the same worker, even if there are other free +// workers elsewhere. Whilst suboptimal, provided we cap how long a single request can last (e.g using context timeouts) +// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests +// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse +// than being stuck behind foo.bar +// In the event that the query fails, the worker spins up a short-lived goroutine whose sole purpose is to inject the server +// name back into the channel after a certain amount of time. If in the interim the device lists have been updated, then +// the database query will return no stale lists. Reinjection into the channel continues until success or the server terminates, +// when it will be reloaded on startup. +type DeviceListUpdater struct { + // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 + // request to the remote server and race. + // TODO: Put in an LRU cache to bound growth + userIDToMutex map[string]*sync.Mutex + mu *sync.Mutex // protects UserIDToMutex + + db DeviceListUpdaterDatabase + producer *producers.KeyChange + fedClient *gomatrixserverlib.FederationClient + workerChans []chan gomatrixserverlib.ServerName +} + +// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater. +// Useful for testing. +type DeviceListUpdaterDatabase interface { + // StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. + // If no domains are given, all user IDs with stale device lists are returned. + StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) + + // MarkDeviceListStale sets the stale bit for this user to isStale. + MarkDeviceListStale(ctx context.Context, userID string, isStale bool) error + + // StoreRemoteDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key + // for this (user, device). Does not modify the stream ID for keys. + StoreRemoteDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error + + // PrevIDsExists returns true if all prev IDs exist for this user. + PrevIDsExists(ctx context.Context, userID string, prevIDs []int) (bool, error) +} + +// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. +func NewDeviceListUpdater( + db DeviceListUpdaterDatabase, producer *producers.KeyChange, fedClient *gomatrixserverlib.FederationClient, + numWorkers int, +) *DeviceListUpdater { + return &DeviceListUpdater{ + userIDToMutex: make(map[string]*sync.Mutex), + mu: &sync.Mutex{}, + db: db, + producer: producer, + fedClient: fedClient, + workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + } +} + +// Start the device list updater, which will try to refresh any stale device lists. +func (u *DeviceListUpdater) Start() error { + for i := 0; i < len(u.workerChans); i++ { + // Allocate a small buffer per channel. + // If the buffer limit is reached, backpressure will cause the processing of EDUs + // to stop (in this transaction) until key requests can be made. + ch := make(chan gomatrixserverlib.ServerName, 10) + u.workerChans[i] = ch + go u.worker(ch) + } + + staleLists, err := u.db.StaleDeviceLists(context.Background(), []gomatrixserverlib.ServerName{}) + if err != nil { + return err + } + for _, userID := range staleLists { + u.notifyWorkers(userID) + } + return nil +} + +func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex { + u.mu.Lock() + defer u.mu.Unlock() + if u.userIDToMutex[userID] == nil { + u.userIDToMutex[userID] = &sync.Mutex{} + } + return u.userIDToMutex[userID] +} + +func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error { + isDeviceListStale, err := u.update(ctx, event) + if err != nil { + return err + } + if isDeviceListStale { + // poke workers to handle stale device lists + u.notifyWorkers(event.UserID) + } + return nil +} + +func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) (bool, error) { + mu := u.mutex(event.UserID) + mu.Lock() + defer mu.Unlock() + // check if we have the prev IDs + exists, err := u.db.PrevIDsExists(ctx, event.UserID, event.PrevID) + if err != nil { + return false, fmt.Errorf("failed to check prev IDs exist for %s (%s): %w", event.UserID, event.DeviceID, err) + } + util.GetLogger(ctx).WithFields(logrus.Fields{ + "prev_ids_exist": exists, + "user_id": event.UserID, + "device_id": event.DeviceID, + "stream_id": event.StreamID, + "prev_ids": event.PrevID, + }).Info("DeviceListUpdater.Update") + + // if we haven't missed anything update the database and notify users + if exists { + keys := []api.DeviceMessage{ + { + DeviceKeys: api.DeviceKeys{ + DeviceID: event.DeviceID, + DisplayName: event.DeviceDisplayName, + KeyJSON: event.Keys, + UserID: event.UserID, + }, + StreamID: event.StreamID, + }, + } + err = u.db.StoreRemoteDeviceKeys(ctx, keys) + if err != nil { + return false, fmt.Errorf("failed to store remote device keys for %s (%s): %w", event.UserID, event.DeviceID, err) + } + // ALWAYS emit key changes when we've been poked over federation even if there's no change + // just in case this poke is important for something. + err = u.producer.ProduceKeyChanges(keys) + if err != nil { + return false, fmt.Errorf("failed to produce device key changes for %s (%s): %w", event.UserID, event.DeviceID, err) + } + return false, nil + } + + err = u.db.MarkDeviceListStale(ctx, event.UserID, true) + if err != nil { + return false, fmt.Errorf("failed to mark device list for %s as stale: %w", event.UserID, err) + } + + return true, nil +} + +func (u *DeviceListUpdater) notifyWorkers(userID string) { + _, remoteServer, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return + } + hash := fnv.New32a() + _, _ = hash.Write([]byte(remoteServer)) + index := int(hash.Sum32()) % len(u.workerChans) + u.workerChans[index] <- remoteServer +} + +func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { + // It's possible to get many of the same server name in the channel, so in order + // to prevent processing the same server over and over we keep track of when we + // last made a request to the server. If we get the server name during the cooloff + // period, we'll ignore the poke. + lastProcessed := make(map[gomatrixserverlib.ServerName]time.Time) + cooloffPeriod := time.Minute + shouldProcess := func(srv gomatrixserverlib.ServerName) bool { + // we should process requests when now is after the last process time + cooloff + return time.Now().After(lastProcessed[srv].Add(cooloffPeriod)) + } + + // on failure, spin up a short-lived goroutine to inject the server name again. + inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) { + time.Sleep(duration) + ch <- srv + } + + for serverName := range ch { + if !shouldProcess(serverName) { + // do not inject into the channel as we know there will be a sleeping goroutine + // which will do it after the cooloff period expires + continue + } + lastProcessed[serverName] = time.Now() + shouldRetry := u.processServer(serverName) + if shouldRetry { + go inject(serverName, cooloffPeriod) // TODO: Backoff? + } + } +} + +func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) bool { + requestTimeout := time.Minute // max amount of time we want to spend on each request + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + logger := util.GetLogger(ctx).WithField("server_name", serverName) + // fetch stale device lists + userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName}) + if err != nil { + logger.WithError(err).Error("failed to load stale device lists") + return true + } + hasFailures := false + for _, userID := range userIDs { + if ctx.Err() != nil { + // we've timed out, give up and go to the back of the queue to let another server be processed. + hasFailures = true + break + } + res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) + if err != nil { + logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") + hasFailures = true + continue + } + err = u.updateDeviceList(ctx, &res) + if err != nil { + logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store it") + hasFailures = true + } + } + return hasFailures +} + +func (u *DeviceListUpdater) updateDeviceList(ctx context.Context, res *gomatrixserverlib.RespUserDevices) error { + keys := make([]api.DeviceMessage, len(res.Devices)) + for i, device := range res.Devices { + keyJSON, err := json.Marshal(device.Keys) + if err != nil { + util.GetLogger(ctx).WithField("keys", device.Keys).Error("failed to marshal keys, skipping device") + continue + } + keys[i] = api.DeviceMessage{ + StreamID: res.StreamID, + DeviceKeys: api.DeviceKeys{ + DeviceID: device.DeviceID, + DisplayName: device.DisplayName, + UserID: res.UserID, + KeyJSON: keyJSON, + }, + } + } + err := u.db.StoreRemoteDeviceKeys(ctx, keys) + if err != nil { + return err + } + return u.db.MarkDeviceListStale(ctx, res.UserID, false) +} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index d6e24566f..ff298c07c 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -38,74 +38,22 @@ type KeyInternalAPI struct { FedClient *gomatrixserverlib.FederationClient UserAPI userapi.UserInternalAPI Producer *producers.KeyChange - // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 - // request to the remote server and race. - // TODO: Put in an LRU cache to bound growth - UserIDToMutex map[string]*sync.Mutex - Mutex *sync.Mutex // protects UserIDToMutex + Updater *DeviceListUpdater } func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { a.UserAPI = i } -func (a *KeyInternalAPI) mutex(userID string) *sync.Mutex { - a.Mutex.Lock() - defer a.Mutex.Unlock() - if a.UserIDToMutex[userID] == nil { - a.UserIDToMutex[userID] = &sync.Mutex{} - } - return a.UserIDToMutex[userID] -} - func (a *KeyInternalAPI) InputDeviceListUpdate( ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, ) { - mu := a.mutex(req.Event.UserID) - mu.Lock() - defer mu.Unlock() - // check if we have the prev IDs - exists, err := a.DB.PrevIDsExists(ctx, req.Event.UserID, req.Event.PrevID) + err := a.Updater.Update(ctx, req.Event) if err != nil { res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to check if prev ids exist: %s", err), + Err: fmt.Sprintf("failed to update device list: %s", err), } - return } - - // if we haven't missed anything update the database and notify users - if exists { - keys := []api.DeviceMessage{ - { - DeviceKeys: api.DeviceKeys{ - DeviceID: req.Event.DeviceID, - DisplayName: req.Event.DeviceDisplayName, - KeyJSON: req.Event.Keys, - UserID: req.Event.UserID, - }, - StreamID: req.Event.StreamID, - }, - } - err = a.DB.StoreRemoteDeviceKeys(ctx, keys) - if err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to store remote device keys: %s", err), - } - return - } - // ALWAYS emit key changes when we've been poked over federation just in case - // this poke is important for something. - err = a.Producer.ProduceKeyChanges(keys) - if err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to emit remote device key changes: %s", err), - } - } - return - } - - // if we're missing an ID go and fetch it from the remote HS - } func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 79d9cec96..4a6fbe3c1 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -15,8 +15,6 @@ package keyserver import ( - "sync" - "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/config" @@ -52,12 +50,16 @@ func NewInternalAPI( Producer: producer, DB: db, } + updater := internal.NewDeviceListUpdater(db, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable + err = updater.Start() + if err != nil { + logrus.WithError(err).Panicf("failed to start device list updater") + } return &internal.KeyInternalAPI{ - DB: db, - ThisServer: cfg.Matrix.ServerName, - FedClient: fedClient, - Producer: keyChangeProducer, - Mutex: &sync.Mutex{}, - UserIDToMutex: make(map[string]*sync.Mutex), + DB: db, + ThisServer: cfg.Matrix.ServerName, + FedClient: fedClient, + Producer: keyChangeProducer, + Updater: updater, } } diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index f67bbf715..2a60aaccc 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -19,6 +19,7 @@ import ( "encoding/json" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/gomatrixserverlib" ) type Database interface { @@ -64,4 +65,11 @@ type Database interface { // A to offset of sarama.OffsetNewest means no upper limit. // Returns the offset of the latest key change. KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) + + // StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. + // If no domains are given, all user IDs with stale device lists are returned. + StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) + + // MarkDeviceListStale sets the stale bit for this user to isStale. + MarkDeviceListStale(ctx context.Context, userID string, isStale bool) error } diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go index 787297740..68964be67 100644 --- a/keyserver/storage/shared/storage.go +++ b/keyserver/storage/shared/storage.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage/tables" + "github.com/matrix-org/gomatrixserverlib" ) type Database struct { @@ -124,3 +125,14 @@ func (d *Database) StoreKeyChange(ctx context.Context, partition int32, offset i func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) { return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset, toOffset) } + +// StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. +// If no domains are given, all user IDs with stale device lists are returned. +func (d *Database) StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) { + return nil, nil // TODO +} + +// MarkDeviceListStale sets the stale bit for this user to isStale. +func (d *Database) MarkDeviceListStale(ctx context.Context, userID string, isStale bool) error { + return nil // TODO +} From ee22c6e4403bad9bc1de46d7400ac87b1ec0975c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Aug 2020 10:38:06 +0100 Subject: [PATCH 4/5] Update go.mod/go.sum for matrix-org/gomatrixserverlib#212 --- go.mod | 3 +-- go.sum | 25 ++----------------------- 2 files changed, 3 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index d2116fcd3..4c14f047d 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d + github.com/matrix-org/gomatrixserverlib v0.0.0-20200807093738-5834540331a4 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible @@ -39,7 +39,6 @@ require ( github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de go.uber.org/atomic v1.4.0 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 - golang.org/x/mobile v0.0.0-20200801112145-973feb4309de // indirect gopkg.in/h2non/bimg.v1 v1.0.18 gopkg.in/yaml.v2 v2.2.8 ) diff --git a/go.sum b/go.sum index 81f6f0a42..39118478a 100644 --- a/go.sum +++ b/go.sum @@ -12,7 +12,6 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOv github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0 h1:p3puK8Sl2xK+2FnnIvY/C0N1aqJo2kbEsdAzU+Tnv48= github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Shopify/sarama v1.26.1 h1:3jnfWKD7gVwbB1KSy/lE0szA9duPuSFLViK0o/d3DgA= github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= @@ -422,10 +421,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d h1:zYk/bQ5bmHDsRqHBl57aBxo5bizsknWU3sunZf9WnWI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d h1:BsSge1plJoqMyL5xnUFuTkzMXC42GbJW1kMbMizTmL4= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200807093738-5834540331a4 h1:QZgMWr/uM9Uc0udL/jtuzubkArNvGAPPIjPOuFWsrj0= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200807093738-5834540331a4/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= @@ -655,10 +652,6 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhe github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yggdrasil-network/yggdrasil-extras v0.0.0-20200525205615-6c8a4a2e8855/go.mod h1:xQdsh08Io6nV4WRnOVTe6gI8/2iTvfLDQ0CYa5aMt+I= -github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3 h1:teLoIJgPHysREs8P6GlcS/PgaU9W9+GQndikFCQ1lY0= -github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= -github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806124633-bd1bdd6be073 h1:Fg4Bszd2qp6eyz/yDMYfB8g2PC1FfNQphGRgZAyD0VU= -github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806124633-bd1bdd6be073/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de h1:p91aw0Mvol825U+5bvV9BBPl+HQxIczj7wxIOxZs70M= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= @@ -686,7 +679,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -699,19 +691,10 @@ golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 h1:Q7tZBpemrlsc2I7IyODzhtallWRSm4Q0d09pL6XbQtU= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4= -golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= -golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= -golang.org/x/mobile v0.0.0-20200801112145-973feb4309de h1:OVJ6QQUBAesB8CZijKDSsXX7xYVtUhrkY0gwMfbi4p4= -golang.org/x/mobile v0.0.0-20200801112145-973feb4309de/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4= -golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -798,11 +781,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69 h1:yBHHx+XZqXJBm6Exke3N7V9gnlsyXxoCPEb1yVenjfk= -golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= From 6ce7af8a3e6b1750f729727e583e5c1702221e56 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Aug 2020 13:28:09 +0100 Subject: [PATCH 5/5] Update go.mod/go.sum for matrix-org/gomatrixserverlib#213 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 4c14f047d..ece7d01ab 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200807093738-5834540331a4 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200807122736-eb1a0b991914 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 39118478a..82e7b399e 100644 --- a/go.sum +++ b/go.sum @@ -421,8 +421,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200807093738-5834540331a4 h1:QZgMWr/uM9Uc0udL/jtuzubkArNvGAPPIjPOuFWsrj0= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200807093738-5834540331a4/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200807122736-eb1a0b991914 h1:VSGCvSUB1/Y32F/JSjmTaIW9jr1BmBHEd0ok4AaT/lo= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200807122736-eb1a0b991914/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=