Hook up the key DB and make a test pass

This commit is contained in:
Kegan Dougal 2020-07-15 11:22:30 +01:00
parent 07cbfce2d0
commit 6ab49ac863
17 changed files with 119 additions and 25 deletions

View file

@ -26,6 +26,7 @@ import (
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
@ -48,6 +49,7 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
userAPI userapi.UserInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
syncProducer := &producers.SyncAPIProducer{
@ -58,6 +60,6 @@ func AddPublicRoutes(
routing.Setup(
router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, userAPI, federation,
syncProducer, transactionsCache, fsAPI, stateAPI, extRoomsProvider,
syncProducer, transactionsCache, fsAPI, stateAPI, keyAPI, extRoomsProvider,
)
}

View file

@ -15,8 +15,13 @@
package routing
import (
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
@ -32,9 +37,55 @@ func QueryKeys(
}
}
func UploadKeys(req *http.Request) util.JSONResponse {
type uploadKeysRequest struct {
DeviceKeys json.RawMessage `json:"device_keys"`
OneTimeKeys map[string]json.RawMessage `json:"one_time_keys"`
}
func UploadKeys(req *http.Request, keyAPI api.KeyInternalAPI, device *userapi.Device) util.JSONResponse {
var r uploadKeysRequest
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
}
uploadReq := &api.PerformUploadKeysRequest{}
if r.DeviceKeys != nil {
uploadReq.DeviceKeys = []api.DeviceKeys{
{
DeviceID: device.ID,
UserID: device.UserID,
KeyJSON: r.DeviceKeys,
},
}
}
if r.OneTimeKeys != nil {
uploadReq.OneTimeKeys = []api.OneTimeKeys{
{
DeviceID: device.ID,
UserID: device.UserID,
KeyJSON: r.OneTimeKeys,
},
}
}
var uploadRes api.PerformUploadKeysResponse
keyAPI.PerformUploadKeys(req.Context(), uploadReq, &uploadRes)
if uploadRes.Error != nil {
util.GetLogger(req.Context()).WithError(uploadRes.Error).Error("Failed to PerformUploadKeys")
return jsonerror.InternalServerError()
}
if len(uploadRes.KeyErrors) > 0 {
util.GetLogger(req.Context()).WithField("key_errors", uploadRes.KeyErrors).Error("Failed to upload one or more keys")
return util.JSONResponse{
Code: 400,
JSON: uploadRes.KeyErrors,
}
}
return util.JSONResponse{
Code: 200,
JSON: struct{}{},
JSON: struct {
OTKCounts interface{} `json:"one_time_key_counts"`
}{uploadRes.OneTimeKeyCounts[0].KeyCount},
}
}

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
@ -62,6 +63,7 @@ func Setup(
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
@ -705,7 +707,12 @@ func Setup(
// Supplying a device ID is deprecated.
r0mux.Handle("/keys/upload/{deviceID}",
httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return UploadKeys(req)
return UploadKeys(req, keyAPI, device)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/keys/upload",
httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return UploadKeys(req, keyAPI, device)
}),
).Methods(http.MethodPost, http.MethodOptions)
}

View file

@ -36,10 +36,11 @@ func main() {
eduInputAPI := base.EDUServerClient()
userAPI := base.UserAPIClient()
stateAPI := base.CurrentStateAPIClient()
keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes(
base.PublicAPIMux, base.Cfg, base.KafkaProducer, deviceDB, accountDB, federation,
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, nil,
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, keyAPI, nil,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/serverkeyapi"
"github.com/matrix-org/dendrite/userapi"
@ -129,6 +130,7 @@ func main() {
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
if err = cfg.Derive(); err != nil {
panic(err)
}
@ -184,6 +186,7 @@ func main() {
ServerKeyAPI: serverKeyAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
KeyAPI: keyserver.NewInternalAPI(base.Base.Cfg),
ExtPublicRoomsProvider: provider,
}
monolith.AddAllPublicRoutes(base.Base.PublicAPIMux)

View file

@ -38,6 +38,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
@ -87,6 +88,7 @@ func main() {
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
if err = cfg.Derive(); err != nil {
panic(err)
}
@ -140,6 +142,7 @@ func main() {
RoomserverAPI: rsAPI,
UserAPI: userAPI,
StateAPI: stateAPI,
KeyAPI: keyserver.NewInternalAPI(base.Cfg),
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,

View file

@ -24,7 +24,7 @@ func main() {
base := setup.NewBaseDendrite(cfg, "KeyServer", true)
defer base.Close() // nolint: errcheck
intAPI := keyserver.NewInternalAPI()
intAPI := keyserver.NewInternalAPI(base.Cfg)
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)

View file

@ -119,7 +119,7 @@ func main() {
rsImpl.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
keyAPI := keyserver.NewInternalAPI()
keyAPI := keyserver.NewInternalAPI(base.Cfg)
monolith := setup.Monolith{
Config: base.Cfg,

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/userapi"
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
@ -172,6 +173,7 @@ func main() {
cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db"
cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db"
cfg.Database.CurrentState = "file:/idb/dendritejs_currentstate.db"
cfg.Database.E2EKey = "file:/idb/dendritejs_e2ekey.db"
cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event"
cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event"
cfg.Kafka.Topics.OutputClientData = "output_client_data"
@ -231,6 +233,7 @@ func main() {
RoomserverAPI: rsAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
KeyAPI: keyserver.NewInternalAPI(base.Cfg),
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: p2pPublicRoomProvider,
}

View file

@ -121,6 +121,7 @@ database:
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
current_state: "postgres://dendrite:itsasecret@localhost/dendrite_currentstate?sslmode=disable"
e2e_key: "postgres://dendrite:itsasecret@localhost/dendrite_e2ekey?sslmode=disable"
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1

View file

@ -174,6 +174,9 @@ type Dendrite struct {
// The ServerKey database caches the public keys of remote servers.
// It may be accessed by the FederationAPI, the ClientAPI, and the MediaAPI.
ServerKey DataSource `yaml:"server_key"`
// The E2EKey database stores one-time public keys for devices in addition to
// signed device keys. Used for E2E.
E2EKey DataSource `yaml:"e2e_key"`
// The SyncAPI stores information used by the SyncAPI server.
// It is only accessed by the SyncAPI server.
SyncAPI DataSource `yaml:"sync_api"`
@ -602,6 +605,7 @@ func (config *Dendrite) checkDatabase(configErrs *configErrors) {
checkNotEmpty(configErrs, "database.sync_api", string(config.Database.SyncAPI))
checkNotEmpty(configErrs, "database.room_server", string(config.Database.RoomServer))
checkNotEmpty(configErrs, "database.current_state", string(config.Database.CurrentState))
checkNotEmpty(configErrs, "database.e2e_key", string(config.Database.E2EKey))
}
// checkListen verifies the parameters listen.* are valid.
@ -615,6 +619,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
checkNotEmpty(configErrs, "listen.server_key_api", string(config.Listen.EDUServer))
checkNotEmpty(configErrs, "listen.user_api", string(config.Listen.UserAPI))
checkNotEmpty(configErrs, "listen.current_state_server", string(config.Listen.CurrentState))
checkNotEmpty(configErrs, "listen.key_server", string(config.Listen.KeyServer))
}
// checkLogging verifies the parameters logging.* are valid.

View file

@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
publicMux, m.Config, m.KafkaProducer, m.DeviceDB, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.ExtPublicRoomsProvider,
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
)
federationapi.AddPublicRoutes(
publicMux, m.Config, m.UserAPI, m.FedClient,

View file

@ -28,7 +28,11 @@ type KeyInternalAPI interface {
// KeyError is returned if there was a problem performing/querying the server
type KeyError struct {
Error string
Err string
}
func (k *KeyError) Error() string {
return k.Err
}
// DeviceKeys represents a set of device keys for a single device
@ -81,6 +85,7 @@ type PerformUploadKeysRequest struct {
// PerformUploadKeysResponse is the response to PerformUploadKeys
type PerformUploadKeysResponse struct {
// A fatal error when processing e.g database failures
Error *KeyError
// A map of user_id -> device_id -> Error for tracking failures.
KeyErrors map[string]map[string]*KeyError

View file

@ -25,7 +25,7 @@ import (
)
type KeyInternalAPI struct {
db storage.Database
DB storage.Database
}
func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
@ -52,7 +52,7 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
}
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: fmt.Sprintf(
Err: fmt.Sprintf(
"user_id or device_id mismatch: users: %s - %s, devices: %s - %s",
gotUserID, key.UserID, gotDeviceID, key.DeviceID,
),
@ -66,16 +66,16 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
DeviceID: keysToStore[i].DeviceID,
}
}
if err := a.db.DeviceKeysJSON(ctx, existingKeys); err != nil {
if err := a.DB.DeviceKeysJSON(ctx, existingKeys); err != nil {
res.Error = &api.KeyError{
Error: fmt.Sprintf("failed to query existing device keys: %s", err.Error()),
Err: fmt.Sprintf("failed to query existing device keys: %s", err.Error()),
}
return
}
// store the device keys and emit changes
if err := a.db.StoreDeviceKeys(ctx, keysToStore); err != nil {
if err := a.DB.StoreDeviceKeys(ctx, keysToStore); err != nil {
res.Error = &api.KeyError{
Error: fmt.Sprintf("failed to store device keys: %s", err.Error()),
Err: fmt.Sprintf("failed to store device keys: %s", err.Error()),
}
return
}
@ -91,10 +91,10 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
keyIDsWithAlgorithms[i] = keyIDWithAlgo
i++
}
existingKeys, err := a.db.ExistingOneTimeKeys(ctx, key.UserID, key.DeviceID, keyIDsWithAlgorithms)
existingKeys, err := a.DB.ExistingOneTimeKeys(ctx, key.UserID, key.DeviceID, keyIDsWithAlgorithms)
if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: "failed to query existing one-time keys: " + err.Error(),
Err: "failed to query existing one-time keys: " + err.Error(),
})
continue
}
@ -102,16 +102,16 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
// if keys exist and the JSON doesn't match, error out as the key already exists
if !bytes.Equal(existingKeys[keyIDWithAlgo], key.KeyJSON[keyIDWithAlgo]) {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", key.UserID, key.DeviceID, keyIDWithAlgo),
Err: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", key.UserID, key.DeviceID, keyIDWithAlgo),
})
continue
}
}
// store one-time keys
counts, err := a.db.StoreOneTimeKeys(ctx, key)
counts, err := a.DB.StoreOneTimeKeys(ctx, key)
if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{
Error: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", key.UserID, key.DeviceID, err.Error()),
Err: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", key.UserID, key.DeviceID, err.Error()),
})
continue
}

View file

@ -63,7 +63,7 @@ func (h *httpKeyInternalAPI) PerformClaimKeys(
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
Err: err.Error(),
}
}
}
@ -80,7 +80,7 @@ func (h *httpKeyInternalAPI) PerformUploadKeys(
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
Err: err.Error(),
}
}
}
@ -97,7 +97,7 @@ func (h *httpKeyInternalAPI) QueryKeys(
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
Err: err.Error(),
}
}
}

View file

@ -16,9 +16,12 @@ package keyserver
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/sirupsen/logrus"
)
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
@ -29,6 +32,15 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI() api.KeyInternalAPI {
return &internal.KeyInternalAPI{}
func NewInternalAPI(cfg *config.Dendrite) api.KeyInternalAPI {
db, err := storage.NewDatabase(
string(cfg.Database.E2EKey),
cfg.DbProperties(),
)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database")
}
return &internal.KeyInternalAPI{
DB: db,
}
}

View file

@ -119,6 +119,7 @@ Newly banned rooms appear in the leave section of incremental sync
Newly banned rooms appear in the leave section of incremental sync
local user can join room with version 1
User can invite local user to room with version 1
Can upload device keys
Should reject keys claiming to belong to a different user
Can add account data
Can add account data to room