mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-18 12:23:09 -06:00
Fetched implementation
This commit is contained in:
commit
ad38cd9442
|
|
@ -26,4 +26,5 @@ type Device struct {
|
||||||
// associated with access tokens.
|
// associated with access tokens.
|
||||||
SessionID int64
|
SessionID int64
|
||||||
// TODO: display name, last used timestamp, keys, etc
|
// TODO: display name, last used timestamp, keys, etc
|
||||||
|
DisplayName string
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,9 @@ const selectDeviceByIDSQL = "" +
|
||||||
"SELECT display_name FROM device_devices WHERE localpart = $1 and device_id = $2"
|
"SELECT display_name FROM device_devices WHERE localpart = $1 and device_id = $2"
|
||||||
|
|
||||||
const selectDevicesByLocalpartSQL = "" +
|
const selectDevicesByLocalpartSQL = "" +
|
||||||
"SELECT device_id, display_name FROM device_devices WHERE localpart = $1"
|
// todo : display name still has a problem when value is null
|
||||||
|
//"SELECT device_id, display_name FROM device_devices WHERE localpart = $1"
|
||||||
|
"SELECT device_id FROM device_devices WHERE localpart = $1"
|
||||||
|
|
||||||
const updateDeviceNameSQL = "" +
|
const updateDeviceNameSQL = "" +
|
||||||
"UPDATE device_devices SET display_name = $1 WHERE localpart = $2 AND device_id = $3"
|
"UPDATE device_devices SET display_name = $1 WHERE localpart = $2 AND device_id = $3"
|
||||||
|
|
@ -210,6 +212,8 @@ func (s *devicesStatements) selectDevicesByLocalpart(
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var dev authtypes.Device
|
var dev authtypes.Device
|
||||||
err = rows.Scan(&dev.ID)
|
err = rows.Scan(&dev.ID)
|
||||||
|
// todo: display name still has a problem when value is null
|
||||||
|
//err = rows.Scan(&dev.ID, &dev.DisplayName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return devices, err
|
return devices, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,14 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
<<<<<<< HEAD:cmd/dendrite-monolith-server/main.go
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
|
=======
|
||||||
|
"github.com/matrix-org/dendrite/common/keydb"
|
||||||
|
"github.com/matrix-org/dendrite/common/transactions"
|
||||||
|
"github.com/matrix-org/dendrite/typingserver"
|
||||||
|
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go
|
||||||
"github.com/matrix-org/dendrite/clientapi"
|
"github.com/matrix-org/dendrite/clientapi"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
|
|
@ -33,6 +40,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/typingserver"
|
"github.com/matrix-org/dendrite/typingserver"
|
||||||
"github.com/matrix-org/dendrite/typingserver/cache"
|
"github.com/matrix-org/dendrite/typingserver/cache"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
@ -62,6 +70,8 @@ func main() {
|
||||||
)
|
)
|
||||||
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query)
|
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query)
|
||||||
|
|
||||||
|
encryptDB := encryptoapi.SetupEcryptoapi(base, deviceDB)
|
||||||
|
|
||||||
clientapi.SetupClientAPIComponent(
|
clientapi.SetupClientAPIComponent(
|
||||||
base, deviceDB, accountDB,
|
base, deviceDB, accountDB,
|
||||||
federation, &keyRing, alias, input, query,
|
federation, &keyRing, alias, input, query,
|
||||||
|
|
@ -70,7 +80,12 @@ func main() {
|
||||||
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery)
|
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery)
|
||||||
mediaapi.SetupMediaAPIComponent(base, deviceDB)
|
mediaapi.SetupMediaAPIComponent(base, deviceDB)
|
||||||
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
|
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
|
||||||
|
<<<<<<< HEAD:cmd/dendrite-monolith-server/main.go
|
||||||
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)
|
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)
|
||||||
|
=======
|
||||||
|
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, encryptDB)
|
||||||
|
//appservice.SetupAppServiceAPIComponent(base, accountDB, deviceDB, federation, alias, query, transactions.New())
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go
|
||||||
|
|
||||||
httpHandler := common.WrapHandlerInCORS(base.APIMux)
|
httpHandler := common.WrapHandlerInCORS(base.APIMux)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ func main() {
|
||||||
|
|
||||||
_, _, query := base.CreateHTTPRoomserverAPIs()
|
_, _, query := base.CreateHTTPRoomserverAPIs()
|
||||||
|
|
||||||
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)
|
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, nil)
|
||||||
|
|
||||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))
|
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -172,6 +172,8 @@ type Dendrite struct {
|
||||||
PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
|
PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
|
||||||
// The Naffka database is used internally by the naffka library, if used.
|
// The Naffka database is used internally by the naffka library, if used.
|
||||||
Naffka DataSource `yaml:"naffka,omitempty"`
|
Naffka DataSource `yaml:"naffka,omitempty"`
|
||||||
|
// Encryption api database
|
||||||
|
EncryptAPI DataSource `yaml:"encrypt_api"`
|
||||||
} `yaml:"database"`
|
} `yaml:"database"`
|
||||||
|
|
||||||
// TURN Server Config
|
// TURN Server Config
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,7 @@ kafka:
|
||||||
output_client_data: clientapiOutput
|
output_client_data: clientapiOutput
|
||||||
output_typing_event: typingServerOutput
|
output_typing_event: typingServerOutput
|
||||||
user_updates: userUpdates
|
user_updates: userUpdates
|
||||||
|
keyUpdate: keyUpdate
|
||||||
|
|
||||||
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
||||||
database:
|
database:
|
||||||
|
|
@ -99,6 +100,7 @@ database:
|
||||||
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
|
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
|
||||||
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
|
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
|
||||||
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
|
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
|
||||||
|
encrypt_api: "postgres://dendrite:itsasecret@localhost/dendrite_encryptapi?sslmode=disable"
|
||||||
# If using naffka you need to specify a naffka database
|
# If using naffka you need to specify a naffka database
|
||||||
# naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable"
|
# naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,10 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
for db in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do
|
for db in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do
|
||||||
createdb -U dendrite -O dendrite dendrite_$db
|
createdb -U dendrite -O dendrite dendrite_$db
|
||||||
|
=======
|
||||||
|
for db in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka encryptapi; do
|
||||||
|
createdb -O dendrite dendrite_$db
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a
|
||||||
done
|
done
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,46 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package encryptoapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/routing"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/storage"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// in order to gain key management capability
|
||||||
|
// , CMD should involve this invoke into main function
|
||||||
|
// , a setup need an assemble of i.e configs as base and
|
||||||
|
// accountDB and deviceDB
|
||||||
|
|
||||||
|
// SetupEcryptoapi set up to servers
|
||||||
|
func SetupEcryptoapi(
|
||||||
|
base *basecomponent.BaseDendrite,
|
||||||
|
deviceDB *devices.Database,
|
||||||
|
) *storage.Database {
|
||||||
|
encryptionDB, err := storage.NewDatabase(string(base.Cfg.Database.EncryptAPI))
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Panicf("failed to connect to encryption db")
|
||||||
|
}
|
||||||
|
routing.Setup(
|
||||||
|
base.APIMux,
|
||||||
|
encryptionDB,
|
||||||
|
deviceDB,
|
||||||
|
)
|
||||||
|
routing.InitNotifier(base)
|
||||||
|
return encryptionDB
|
||||||
|
}
|
||||||
537
src/github.com/matrix-org/dendrite/encryptoapi/routing/keys.go
Normal file
537
src/github.com/matrix-org/dendrite/encryptoapi/routing/keys.go
Normal file
|
|
@ -0,0 +1,537 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package routing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TYPESUM sum type
|
||||||
|
TYPESUM = iota
|
||||||
|
// BODYDEVICEKEY device key body
|
||||||
|
BODYDEVICEKEY
|
||||||
|
// BODYONETIMEKEY one time key
|
||||||
|
BODYONETIMEKEY
|
||||||
|
// ONETIMEKEYSTRING key string
|
||||||
|
ONETIMEKEYSTRING
|
||||||
|
// ONETIMEKEYOBJECT key object
|
||||||
|
ONETIMEKEYOBJECT
|
||||||
|
)
|
||||||
|
|
||||||
|
// ONETIMEKEYSTR stands for storage string property
|
||||||
|
const ONETIMEKEYSTR = "one_time_key"
|
||||||
|
|
||||||
|
// DEVICEKEYSTR stands for storage string property
|
||||||
|
const DEVICEKEYSTR = "device_key"
|
||||||
|
|
||||||
|
// KeyNotifier kafka notifier
|
||||||
|
type KeyNotifier struct {
|
||||||
|
base *basecomponent.BaseDendrite
|
||||||
|
ch sarama.AsyncProducer
|
||||||
|
}
|
||||||
|
|
||||||
|
var keyProducer = &KeyNotifier{}
|
||||||
|
|
||||||
|
// UploadPKeys this function is for user upload his device key, and one-time-key to a limit at 50 set as default
|
||||||
|
func UploadPKeys(
|
||||||
|
req *http.Request,
|
||||||
|
encryptionDB *storage.Database,
|
||||||
|
userID, deviceID string,
|
||||||
|
) util.JSONResponse {
|
||||||
|
var keybody types.UploadEncrypt
|
||||||
|
if reqErr := httputil.UnmarshalJSONRequest(req, &keybody); reqErr != nil {
|
||||||
|
return *reqErr
|
||||||
|
}
|
||||||
|
keySpecific := turnSpecific(keybody)
|
||||||
|
// persist keys into encryptionDB
|
||||||
|
err := persistKeys(
|
||||||
|
req.Context(),
|
||||||
|
encryptionDB,
|
||||||
|
&keySpecific,
|
||||||
|
userID, deviceID)
|
||||||
|
// numMap is algorithm-num map
|
||||||
|
numMap := (QueryOneTimeKeys(
|
||||||
|
req.Context(),
|
||||||
|
TYPESUM,
|
||||||
|
userID,
|
||||||
|
deviceID,
|
||||||
|
encryptionDB)).(map[string]int)
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadGateway,
|
||||||
|
JSON: types.UploadResponse{
|
||||||
|
Count: numMap,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: types.UploadResponse{
|
||||||
|
Count: numMap,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryPKeys this function is for user query other's device key
|
||||||
|
func QueryPKeys(
|
||||||
|
req *http.Request,
|
||||||
|
encryptionDB *storage.Database,
|
||||||
|
deviceID string,
|
||||||
|
deviceDB *devices.Database,
|
||||||
|
) util.JSONResponse {
|
||||||
|
var err error
|
||||||
|
var queryRq types.QueryRequest
|
||||||
|
queryRp := types.QueryResponse{}
|
||||||
|
queryRp.Failure = make(map[string]interface{})
|
||||||
|
queryRp.DeviceKeys = make(map[string]map[string]types.DeviceKeysQuery)
|
||||||
|
if reqErr := httputil.UnmarshalJSONRequest(req, &queryRq); reqErr != nil {
|
||||||
|
return *reqErr
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
federation consideration: when user id is in federation, a
|
||||||
|
query is needed to ask fed for keys.
|
||||||
|
domain --------+ fed (keys)
|
||||||
|
domain +--tout-- timer
|
||||||
|
*/
|
||||||
|
// todo: Add federation processing at specific userID.
|
||||||
|
if false /*federation judgement*/ {
|
||||||
|
tout := queryRq.Timeout
|
||||||
|
if tout == 0 {
|
||||||
|
tout = int64(10 * time.Second)
|
||||||
|
}
|
||||||
|
stimuCh := make(chan int)
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Duration(tout) * 1000 * 1000)
|
||||||
|
close(stimuCh)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-stimuCh:
|
||||||
|
queryRp.Failure = make(map[string]interface{})
|
||||||
|
// todo: key in this map is restricted to username at the end, yet a mocked one.
|
||||||
|
queryRp.Failure["@alice:localhost"] = "ran out of offered time"
|
||||||
|
case <-make(chan interface{}):
|
||||||
|
// todo : here goes federation chan , still a mocked one
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// query one's device key from user corresponding to uid
|
||||||
|
for uid, arr := range queryRq.DeviceKeys {
|
||||||
|
queryRp.DeviceKeys[uid] = make(map[string]types.DeviceKeysQuery)
|
||||||
|
deviceKeysQueryMap := queryRp.DeviceKeys[uid]
|
||||||
|
// backward compatible to old interface
|
||||||
|
midArr := []string{}
|
||||||
|
// figure out device list from devices described as device which is actually deviceID
|
||||||
|
for device := range arr.(map[string]interface{}) {
|
||||||
|
midArr = append(midArr, device)
|
||||||
|
}
|
||||||
|
// all device keys
|
||||||
|
dkeys, _ := encryptionDB.QueryInRange(req.Context(), uid, midArr)
|
||||||
|
// build response for them
|
||||||
|
|
||||||
|
for _, key := range dkeys {
|
||||||
|
|
||||||
|
deviceKeysQueryMap = presetDeviceKeysQueryMap(deviceKeysQueryMap, uid, key)
|
||||||
|
// load for accomplishment
|
||||||
|
single := deviceKeysQueryMap[key.DeviceID]
|
||||||
|
resKey := fmt.Sprintf("%s:%s", key.KeyAlgorithm, key.DeviceID)
|
||||||
|
resBody := key.Key
|
||||||
|
single.Keys[resKey] = resBody
|
||||||
|
single.DeviceID = key.DeviceID
|
||||||
|
single.UserID = key.UserID
|
||||||
|
single.Signature[uid][fmt.Sprintf("%s:%s", "ed25519", key.DeviceID)] = key.Signature
|
||||||
|
single.Algorithm, err = takeAL(req.Context(), *encryptionDB, key.UserID, key.DeviceID)
|
||||||
|
localpart, _, _ := gomatrixserverlib.SplitID('@', uid)
|
||||||
|
device, _ := deviceDB.GetDeviceByID(req.Context(), localpart, deviceID)
|
||||||
|
single.Unsigned.Info = device.DisplayName
|
||||||
|
deviceKeysQueryMap[key.DeviceID] = single
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusInternalServerError,
|
||||||
|
JSON: queryRp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: queryRp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClaimOneTimeKeys claim for one time key that may be used in session exchange in olm encryption
|
||||||
|
func ClaimOneTimeKeys(
|
||||||
|
req *http.Request,
|
||||||
|
encryptionDB *storage.Database,
|
||||||
|
) util.JSONResponse {
|
||||||
|
var claimRq types.ClaimRequest
|
||||||
|
claimRp := types.ClaimResponse{}
|
||||||
|
claimRp.Failures = make(map[string]interface{})
|
||||||
|
claimRp.ClaimBody = make(map[string]map[string]map[string]interface{})
|
||||||
|
if reqErr := httputil.UnmarshalJSONRequest(req, &claimRq); reqErr != nil {
|
||||||
|
return *reqErr
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
federation consideration: when user id is in federation, a query is needed to ask fed for keys
|
||||||
|
domain --------+ fed (keys)
|
||||||
|
domain +--tout-- timer
|
||||||
|
*/
|
||||||
|
// todo: Add federation processing at specific userID.
|
||||||
|
if false /*federation judgement*/ {
|
||||||
|
tout := claimRq.Timeout
|
||||||
|
stimuCh := make(chan int)
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Duration(tout) * 1000 * 1000)
|
||||||
|
close(stimuCh)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-stimuCh:
|
||||||
|
claimRp.Failures = make(map[string]interface{})
|
||||||
|
// todo: key in this map is restricted to username at the end, yet a mocked one.
|
||||||
|
claimRp.Failures["@alice:localhost"] = "ran out of offered time"
|
||||||
|
case <-make(chan interface{}):
|
||||||
|
// todo : here goes federation chan , still a mocked one
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
content := claimRq.ClaimDetail
|
||||||
|
for uid, detail := range content {
|
||||||
|
for deviceID, al := range detail {
|
||||||
|
var alTyp int
|
||||||
|
if strings.Contains(al, "signed") {
|
||||||
|
alTyp = ONETIMEKEYOBJECT
|
||||||
|
} else {
|
||||||
|
alTyp = ONETIMEKEYSTRING
|
||||||
|
}
|
||||||
|
key, err := pickOne(req.Context(), *encryptionDB, uid, deviceID, al)
|
||||||
|
if err != nil {
|
||||||
|
claimRp.Failures[uid] = fmt.Sprintf("%s:%s", "fail to get keys for device ", deviceID)
|
||||||
|
}
|
||||||
|
claimRp.ClaimBody[uid] = make(map[string]map[string]interface{})
|
||||||
|
keyPreMap := claimRp.ClaimBody[uid]
|
||||||
|
keymap := keyPreMap[deviceID]
|
||||||
|
if keymap == nil {
|
||||||
|
keymap = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
switch alTyp {
|
||||||
|
case ONETIMEKEYSTRING:
|
||||||
|
keymap[fmt.Sprintf("%s:%s", al, key.KeyID)] = key.Key
|
||||||
|
case ONETIMEKEYOBJECT:
|
||||||
|
sig := make(map[string]map[string]string)
|
||||||
|
sig[uid] = make(map[string]string)
|
||||||
|
sig[uid][fmt.Sprintf("%s:%s", "ed25519", deviceID)] = key.Signature
|
||||||
|
keymap[fmt.Sprintf("%s:%s", al, key.KeyID)] = types.KeyObject{Key: key.Key, Signature: sig}
|
||||||
|
}
|
||||||
|
claimRp.ClaimBody[uid][deviceID] = keymap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: claimRp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo: check through interface for duplicate and what type of request should it be
|
||||||
|
// whether device or one time or both of them
|
||||||
|
func checkUpload(req *types.UploadEncryptSpecific, typ int) bool {
|
||||||
|
if typ == BODYDEVICEKEY {
|
||||||
|
devicekey := req.DeviceKeys
|
||||||
|
if devicekey.UserID == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if typ == BODYONETIMEKEY {
|
||||||
|
if req.OneTimeKey.KeyString == nil || req.OneTimeKey.KeyObject == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryOneTimeKeys todo: complete this field through claim type
|
||||||
|
func QueryOneTimeKeys(
|
||||||
|
ctx context.Context,
|
||||||
|
typ int,
|
||||||
|
userID, deviceID string,
|
||||||
|
encryptionDB *storage.Database,
|
||||||
|
) interface{} {
|
||||||
|
if typ == TYPESUM {
|
||||||
|
res, _ := encryptionDB.SelectOneTimeKeyCount(ctx, deviceID, userID)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearUnused when web client sign out, a clean should be processed, cause all keys would never been used from then on.
|
||||||
|
// todo: complete this function and invoke through sign out extension or some scenarios else those matter
|
||||||
|
func ClearUnused() {}
|
||||||
|
|
||||||
|
// persist both device keys and one time keys
|
||||||
|
func persistKeys(
|
||||||
|
ctx context.Context,
|
||||||
|
database *storage.Database,
|
||||||
|
body *types.UploadEncryptSpecific,
|
||||||
|
userID,
|
||||||
|
deviceID string,
|
||||||
|
) (err error) {
|
||||||
|
// in order to persist keys , a check filtering duplicate should be processed
|
||||||
|
// true stands for counterparts are in request
|
||||||
|
// situation 1: only device keys
|
||||||
|
// situation 2: both device keys and one time keys
|
||||||
|
// situation 3: only one time keys
|
||||||
|
if checkUpload(body, BODYDEVICEKEY) {
|
||||||
|
deviceKeys := body.DeviceKeys
|
||||||
|
al := deviceKeys.Algorithm
|
||||||
|
err = persistAl(ctx, *database, userID, deviceID, al)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if checkUpload(body, BODYONETIMEKEY) {
|
||||||
|
if err = bothKeyProcess(ctx, body, userID, deviceID, database, deviceKeys); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err = dkeyProcess(ctx, userID, deviceID, database, deviceKeys); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// notifier to sync server
|
||||||
|
upnotify(userID)
|
||||||
|
} else {
|
||||||
|
if checkUpload(body, BODYONETIMEKEY) {
|
||||||
|
if err = otmKeyProcess(ctx, body, userID, deviceID, database); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return errors.New("failed to touch keys")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// make keys instantiated to specific struct from keybody interface{}
|
||||||
|
func turnSpecific(
|
||||||
|
cont types.UploadEncrypt,
|
||||||
|
) (spec types.UploadEncryptSpecific) {
|
||||||
|
// both device keys are coordinate
|
||||||
|
spec.DeviceKeys = cont.DeviceKeys
|
||||||
|
spec.OneTimeKey.KeyString = make(map[string]string)
|
||||||
|
spec.OneTimeKey.KeyObject = make(map[string]types.KeyObject)
|
||||||
|
mapStringInterface := cont.OneTimeKey
|
||||||
|
for key, val := range mapStringInterface {
|
||||||
|
value, ok := val.(string)
|
||||||
|
if ok {
|
||||||
|
spec.OneTimeKey.KeyString[key] = value
|
||||||
|
} else {
|
||||||
|
valueObject := types.KeyObject{}
|
||||||
|
target, _ := json.Marshal(val)
|
||||||
|
err := json.Unmarshal(target, &valueObject)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
spec.OneTimeKey.KeyObject[key] = valueObject
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func persistAl(
|
||||||
|
ctx context.Context,
|
||||||
|
encryptDB storage.Database,
|
||||||
|
uid, device string,
|
||||||
|
al []string,
|
||||||
|
) (err error) {
|
||||||
|
err = encryptDB.InsertAl(ctx, uid, device, al)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func takeAL(
|
||||||
|
ctx context.Context,
|
||||||
|
encryptDB storage.Database,
|
||||||
|
uid, device string,
|
||||||
|
) (al []string, err error) {
|
||||||
|
al, err = encryptDB.SelectAl(ctx, uid, device)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func pickOne(
|
||||||
|
ctx context.Context,
|
||||||
|
encryptDB storage.Database,
|
||||||
|
uid, device, al string,
|
||||||
|
) (key types.KeyHolder, err error) {
|
||||||
|
key, err = encryptDB.SelectOneTimeKeySingle(ctx, uid, device, al)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func upnotify(userID string) {
|
||||||
|
m := sarama.ProducerMessage{
|
||||||
|
Topic: "keyUpdate",
|
||||||
|
Key: sarama.StringEncoder("key"),
|
||||||
|
Value: sarama.StringEncoder(userID),
|
||||||
|
}
|
||||||
|
keyProducer.ch.Input() <- &m
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitNotifier initialize kafka notifier
|
||||||
|
func InitNotifier(base *basecomponent.BaseDendrite) {
|
||||||
|
keyProducer.base = base
|
||||||
|
pro, _ := sarama.NewAsyncProducer(base.Cfg.Kafka.Addresses, nil)
|
||||||
|
keyProducer.ch = pro
|
||||||
|
}
|
||||||
|
|
||||||
|
func presetDeviceKeysQueryMap(
|
||||||
|
deviceKeysQueryMap map[string]types.DeviceKeysQuery,
|
||||||
|
uid string,
|
||||||
|
key types.KeyHolder,
|
||||||
|
) map[string]types.DeviceKeysQuery {
|
||||||
|
// preset for complicated nested map struct
|
||||||
|
if _, ok := deviceKeysQueryMap[key.DeviceID]; !ok {
|
||||||
|
// make consistency
|
||||||
|
deviceKeysQueryMap[key.DeviceID] = types.DeviceKeysQuery{}
|
||||||
|
}
|
||||||
|
if deviceKeysQueryMap[key.DeviceID].Signature == nil {
|
||||||
|
mid := make(map[string]map[string]string)
|
||||||
|
midmap := deviceKeysQueryMap[key.DeviceID]
|
||||||
|
midmap.Signature = mid
|
||||||
|
deviceKeysQueryMap[key.DeviceID] = midmap
|
||||||
|
}
|
||||||
|
if deviceKeysQueryMap[key.DeviceID].Keys == nil {
|
||||||
|
mid := make(map[string]string)
|
||||||
|
midmap := deviceKeysQueryMap[key.DeviceID]
|
||||||
|
midmap.Keys = mid
|
||||||
|
deviceKeysQueryMap[key.DeviceID] = midmap
|
||||||
|
}
|
||||||
|
if _, ok := deviceKeysQueryMap[key.DeviceID].Signature[uid]; !ok {
|
||||||
|
// make consistency
|
||||||
|
deviceKeysQueryMap[key.DeviceID].Signature[uid] = make(map[string]string)
|
||||||
|
}
|
||||||
|
return deviceKeysQueryMap
|
||||||
|
}
|
||||||
|
|
||||||
|
func bothKeyProcess(
|
||||||
|
ctx context.Context,
|
||||||
|
body *types.UploadEncryptSpecific,
|
||||||
|
userID, deviceID string,
|
||||||
|
database *storage.Database,
|
||||||
|
deviceKeys types.DeviceKeys,
|
||||||
|
) (err error) {
|
||||||
|
// insert one time keys firstly
|
||||||
|
onetimeKeys := body.OneTimeKey
|
||||||
|
for alKeyID, val := range onetimeKeys.KeyString {
|
||||||
|
al := (strings.Split(alKeyID, ":"))[0]
|
||||||
|
keyID := (strings.Split(alKeyID, ":"))[1]
|
||||||
|
keyInfo := val
|
||||||
|
keyStringTyp := ONETIMEKEYSTR
|
||||||
|
sig := ""
|
||||||
|
err = database.InsertKey(ctx, deviceID, userID, keyID, keyStringTyp, keyInfo, al, sig)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for alKeyID, val := range onetimeKeys.KeyObject {
|
||||||
|
al := (strings.Split(alKeyID, ":"))[0]
|
||||||
|
keyID := (strings.Split(alKeyID, ":"))[1]
|
||||||
|
keyInfo := val.Key
|
||||||
|
keyObjectTyp := ONETIMEKEYSTR
|
||||||
|
sig := val.Signature[userID][fmt.Sprintf("%s:%s", "ed25519", deviceID)]
|
||||||
|
err = database.InsertKey(ctx, deviceID, userID, keyID, keyObjectTyp, keyInfo, al, sig)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// insert device keys
|
||||||
|
keys := deviceKeys.Keys
|
||||||
|
sigs := deviceKeys.Signature
|
||||||
|
for alDevice, key := range keys {
|
||||||
|
al := (strings.Split(alDevice, ":"))[0]
|
||||||
|
keyTyp := DEVICEKEYSTR
|
||||||
|
keyInfo := key
|
||||||
|
keyID := ""
|
||||||
|
sig := sigs[userID][fmt.Sprintf("%s:%s", "ed25519", deviceID)]
|
||||||
|
err = database.InsertKey(
|
||||||
|
ctx, deviceID, userID, keyID, keyTyp, keyInfo, al, sig)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func dkeyProcess(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, deviceID string,
|
||||||
|
database *storage.Database,
|
||||||
|
deviceKeys types.DeviceKeys,
|
||||||
|
) (err error) {
|
||||||
|
keys := deviceKeys.Keys
|
||||||
|
sigs := deviceKeys.Signature
|
||||||
|
for alDevice, key := range keys {
|
||||||
|
al := (strings.Split(alDevice, ":"))[0]
|
||||||
|
keyTyp := DEVICEKEYSTR
|
||||||
|
keyInfo := key
|
||||||
|
keyID := ""
|
||||||
|
sig := sigs[userID][fmt.Sprintf("%s:%s", "ed25519", deviceID)]
|
||||||
|
err = database.InsertKey(ctx, deviceID, userID, keyID, keyTyp, keyInfo, al, sig)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func otmKeyProcess(
|
||||||
|
ctx context.Context,
|
||||||
|
body *types.UploadEncryptSpecific,
|
||||||
|
userID, deviceID string,
|
||||||
|
database *storage.Database,
|
||||||
|
) (err error) {
|
||||||
|
onetimeKeys := body.OneTimeKey
|
||||||
|
for alKeyID, val := range onetimeKeys.KeyString {
|
||||||
|
al := (strings.Split(alKeyID, ":"))[0]
|
||||||
|
keyID := (strings.Split(alKeyID, ":"))[1]
|
||||||
|
keyInfo := val
|
||||||
|
oneTimeKeyStringTyp := ONETIMEKEYSTR
|
||||||
|
sig := ""
|
||||||
|
err = database.InsertKey(ctx, deviceID, userID, keyID, oneTimeKeyStringTyp, keyInfo, al, sig)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for alKeyID, val := range onetimeKeys.KeyObject {
|
||||||
|
al := (strings.Split(alKeyID, ":"))[0]
|
||||||
|
keyID := (strings.Split(alKeyID, ":"))[1]
|
||||||
|
keyInfo := val.Key
|
||||||
|
oneTimeKeyObjectTyp := ONETIMEKEYSTR
|
||||||
|
sig := val.Signature[userID][fmt.Sprintf("%s:%s", "ed25519", deviceID)]
|
||||||
|
err = database.InsertKey(ctx, deviceID, userID, keyID, oneTimeKeyObjectTyp, keyInfo, al, sig)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,66 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package routing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/storage"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
const pathPrefixUnstable = "/_matrix/client/unstable"
|
||||||
|
|
||||||
|
// Setup works for setting up encryption api server
|
||||||
|
func Setup(
|
||||||
|
apiMux *mux.Router,
|
||||||
|
encryptionDB *storage.Database,
|
||||||
|
deviceDB *devices.Database,
|
||||||
|
) {
|
||||||
|
authData := auth.Data{nil, deviceDB, nil}
|
||||||
|
unstablemux := apiMux.PathPrefix(pathPrefixUnstable).Subrouter()
|
||||||
|
|
||||||
|
unstablemux.Handle("/keys/upload/{deviceID}",
|
||||||
|
common.MakeAuthAPI("upload keys", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
return UploadPKeys(req, encryptionDB, device.UserID, device.ID)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
|
unstablemux.Handle("/keys/upload",
|
||||||
|
common.MakeAuthAPI("upload keys", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
return UploadPKeys(req, encryptionDB, device.UserID, device.ID)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
|
unstablemux.Handle("/keys/query",
|
||||||
|
common.MakeAuthAPI("query keys", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
//vars := mux.Vars(req)
|
||||||
|
return QueryPKeys(req, encryptionDB, device.ID, deviceDB)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
|
unstablemux.Handle("/keys/claim",
|
||||||
|
common.MakeAuthAPI("claim keys", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
return ClaimOneTimeKeys(req, encryptionDB)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,87 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const algorithmSchema = `
|
||||||
|
-- The media_repository table holds metadata for each media file stored and accessible to the local server,
|
||||||
|
-- the actual file is stored separately.
|
||||||
|
CREATE TABLE IF NOT EXISTS encrypt_algorithm (
|
||||||
|
device_id TEXT NOT NULL,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
algorithms TEXT NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
const insertalSQL = `
|
||||||
|
INSERT INTO encrypt_algorithm (device_id, user_id, algorithms)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
`
|
||||||
|
const selectalSQL = `
|
||||||
|
SELECT user_id, device_id, algorithms FROM encrypt_algorithm
|
||||||
|
WHERE user_id = $1 AND device_id = $2
|
||||||
|
`
|
||||||
|
|
||||||
|
type alStatements struct {
|
||||||
|
insertAlStmt *sql.Stmt
|
||||||
|
selectAlStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *alStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(algorithmSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertAlStmt, err = db.Prepare(insertalSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectAlStmt, err = db.Prepare(selectalSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// persist algorithms
|
||||||
|
func (s *alStatements) insertAl(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
userID, deviceID, algorithms string,
|
||||||
|
) error {
|
||||||
|
stmt := common.TxStmt(txn, s.insertAlStmt)
|
||||||
|
_, err := stmt.ExecContext(ctx, deviceID, userID, algorithms)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// select algorithms
|
||||||
|
func (s *alStatements) selectAl(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
|
userID, deviceID string,
|
||||||
|
) (holder types.AlHolder, err error) {
|
||||||
|
|
||||||
|
stmt := common.TxStmt(txn, s.selectAlStmt)
|
||||||
|
row := stmt.QueryRowContext(ctx, userID, deviceID)
|
||||||
|
single := types.AlHolder{}
|
||||||
|
err = row.Scan(
|
||||||
|
&single.UserID,
|
||||||
|
&single.DeviceID,
|
||||||
|
&single.SupportedAlgorithm,
|
||||||
|
)
|
||||||
|
return single, err
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,251 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const keysSchema = `
|
||||||
|
-- The media_repository table holds metadata for each media file stored and accessible to the local server,
|
||||||
|
-- the actual file is stored separately.
|
||||||
|
CREATE TABLE IF NOT EXISTS encrypt_keys (
|
||||||
|
device_id TEXT NOT NULL,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
key_id TEXT ,
|
||||||
|
key_type TEXT NOT NULL,
|
||||||
|
key_info TEXT NOT NULL,
|
||||||
|
algorithm TEXT NOT NULL,
|
||||||
|
signature TEXT NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
const insertkeySQL = `
|
||||||
|
INSERT INTO encrypt_keys (device_id, user_id, key_id, key_type, key_info, algorithm, signature)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
|
`
|
||||||
|
const selectkeySQL = `
|
||||||
|
SELECT user_id, device_id, key_id, key_type, key_info, algorithm, signature FROM encrypt_keys
|
||||||
|
WHERE user_id = $1 AND device_id = $2
|
||||||
|
`
|
||||||
|
const deleteSinglekeySQL = `
|
||||||
|
SELECT user_id, device_id, key_id, key_type, key_info, algorithm, signature FROM encrypt_keys
|
||||||
|
WHERE user_id = $1 AND device_id = $2 AND algorithm = $3
|
||||||
|
`
|
||||||
|
const selectSinglekeySQL = `
|
||||||
|
DELETE FROM encrypt_keys
|
||||||
|
WHERE user_id = $1 AND device_id = $2 AND algorithm = $3 AND key_id = $4
|
||||||
|
`
|
||||||
|
const selectInkeysSQL = `
|
||||||
|
SELECT user_id, device_id, key_id, key_type, key_info, algorithm, signature FROM encrypt_keys
|
||||||
|
WHERE user_id = $1 AND key_type = 'device_key' AND device_id = ANY($2)
|
||||||
|
`
|
||||||
|
const selectAllkeysSQL = `
|
||||||
|
SELECT user_id, device_id, key_id, key_type, key_info, algorithm, signature FROM encrypt_keys
|
||||||
|
WHERE user_id = $1 AND key_type = $2
|
||||||
|
`
|
||||||
|
const selectCountOneTimeKey = `
|
||||||
|
SELECT algorithm, COUNT(algorithm) FROM encrypt_keys WHERE user_id = $1 AND device_id = $2 AND key_type = 'one_time_key'
|
||||||
|
GROUP BY algorithm
|
||||||
|
`
|
||||||
|
|
||||||
|
type keyStatements struct {
|
||||||
|
insertKeyStmt *sql.Stmt
|
||||||
|
selectKeyStmt *sql.Stmt
|
||||||
|
selectInKeysStmt *sql.Stmt
|
||||||
|
selectAllKeyStmt *sql.Stmt
|
||||||
|
selectSingleKeyStmt *sql.Stmt
|
||||||
|
deleteSingleKeyStmt *sql.Stmt
|
||||||
|
selectCountOneTimeKeyStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *keyStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(keysSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertKeyStmt, err = db.Prepare(insertkeySQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectKeyStmt, err = db.Prepare(selectkeySQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectInKeysStmt, err = db.Prepare(selectInkeysSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectAllKeyStmt, err = db.Prepare(selectAllkeysSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.deleteSingleKeyStmt, err = db.Prepare(selectSinglekeySQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectSingleKeyStmt, err = db.Prepare(deleteSinglekeySQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectCountOneTimeKeyStmt, err = db.Prepare(selectCountOneTimeKey); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert keys
|
||||||
|
func (s *keyStatements) insertKey(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
deviceID, userID, keyID, keyTyp, keyInfo, algorithm, signature string,
|
||||||
|
) error {
|
||||||
|
stmt := common.TxStmt(txn, s.insertKeyStmt)
|
||||||
|
_, err := stmt.ExecContext(ctx, deviceID, userID, keyID, keyTyp, keyInfo, algorithm, signature)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// select by user and device
|
||||||
|
func (s *keyStatements) selectKey(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
|
deviceID, userID string,
|
||||||
|
) ([]types.KeyHolder, error) {
|
||||||
|
holders := []types.KeyHolder{}
|
||||||
|
stmt := common.TxStmt(txn, s.selectKeyStmt)
|
||||||
|
rows, err := stmt.QueryContext(ctx, userID, deviceID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
single := &types.KeyHolder{}
|
||||||
|
if err = rows.Scan(
|
||||||
|
&single.UserID,
|
||||||
|
&single.DeviceID,
|
||||||
|
&single.KeyID,
|
||||||
|
&single.KeyType,
|
||||||
|
&single.Key,
|
||||||
|
&single.KeyAlgorithm,
|
||||||
|
&single.Signature,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
holders = append(holders, *single)
|
||||||
|
}
|
||||||
|
err = rows.Close()
|
||||||
|
return holders, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// select single one for claim usage
|
||||||
|
func (s *keyStatements) selectSingleKey(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, deviceID, algorithm string,
|
||||||
|
) (holder types.KeyHolder, err error) {
|
||||||
|
stmt := s.selectSingleKeyStmt
|
||||||
|
row := stmt.QueryRowContext(ctx, userID, deviceID, algorithm)
|
||||||
|
if err != nil {
|
||||||
|
return holder, err
|
||||||
|
}
|
||||||
|
if err = row.Scan(
|
||||||
|
&holder.UserID,
|
||||||
|
&holder.DeviceID,
|
||||||
|
&holder.KeyID,
|
||||||
|
&holder.KeyType,
|
||||||
|
&holder.Key,
|
||||||
|
&holder.KeyAlgorithm,
|
||||||
|
&holder.Signature,
|
||||||
|
); err != nil {
|
||||||
|
return holder, err
|
||||||
|
}
|
||||||
|
deleteStmt := s.deleteSingleKeyStmt
|
||||||
|
_, err = deleteStmt.ExecContext(ctx, userID, deviceID, algorithm, holder.KeyID)
|
||||||
|
return holder, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// select details by given an array of devices
|
||||||
|
func (s *keyStatements) selectInKeys(
|
||||||
|
ctx context.Context,
|
||||||
|
userID string,
|
||||||
|
arr []string,
|
||||||
|
) ([]types.KeyHolder, error) {
|
||||||
|
holders := []types.KeyHolder{}
|
||||||
|
stmt := s.selectAllKeyStmt
|
||||||
|
if len(arr) == 0 {
|
||||||
|
// mapping for all device keys
|
||||||
|
rowsP, err := stmt.QueryContext(ctx, userID, "device_key")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
holders, err = injectKeyHolder(rowsP, holders)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = rowsP.Close()
|
||||||
|
return holders, err
|
||||||
|
}
|
||||||
|
stmt = s.selectInKeysStmt
|
||||||
|
list := pq.Array(arr)
|
||||||
|
rowsP, err := stmt.QueryContext(ctx, userID, list)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
holders, err = injectKeyHolder(rowsP, holders)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = rowsP.Close()
|
||||||
|
return holders, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func injectKeyHolder(rows *sql.Rows, keyHolder []types.KeyHolder) (holders []types.KeyHolder, err error) {
|
||||||
|
for rows.Next() {
|
||||||
|
single := &types.KeyHolder{}
|
||||||
|
if err = rows.Scan(
|
||||||
|
&single.UserID,
|
||||||
|
&single.DeviceID,
|
||||||
|
&single.KeyID,
|
||||||
|
&single.KeyType,
|
||||||
|
&single.Key,
|
||||||
|
&single.KeyAlgorithm,
|
||||||
|
&single.Signature,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
keyHolder = append(keyHolder, *single)
|
||||||
|
}
|
||||||
|
holders = keyHolder
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// select by user and device
|
||||||
|
func (s *keyStatements) selectOneTimeKeyCount(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, deviceID string,
|
||||||
|
) (map[string]int, error) {
|
||||||
|
holders := make(map[string]int)
|
||||||
|
rows, err := s.selectCountOneTimeKeyStmt.QueryContext(ctx, userID, deviceID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
var singleKey string
|
||||||
|
var singleVal int
|
||||||
|
if err = rows.Scan(
|
||||||
|
&singleKey,
|
||||||
|
&singleVal,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
holders[singleKey] = singleVal
|
||||||
|
}
|
||||||
|
err = rows.Close()
|
||||||
|
return holders, err
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,131 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/encryptoapi/types"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Database represents a presence database.
|
||||||
|
type Database struct {
|
||||||
|
db *sql.DB
|
||||||
|
keyStatements keyStatements
|
||||||
|
alStatements alStatements
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDatabase creates a new presence database
|
||||||
|
func NewDatabase(dataSourceName string) (*Database, error) {
|
||||||
|
var db *sql.DB
|
||||||
|
var err error
|
||||||
|
if db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
keyStatement := keyStatements{}
|
||||||
|
alStatement := alStatements{}
|
||||||
|
if err = keyStatement.prepare(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = alStatement.prepare(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Database{db: db, keyStatements: keyStatement, alStatements: alStatement}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertKey insert device key
|
||||||
|
func (d *Database) InsertKey(
|
||||||
|
ctx context.Context,
|
||||||
|
deviceID, userID, keyID, keyTyp, keyInfo, al, sig string,
|
||||||
|
) (err error) {
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
return d.keyStatements.insertKey(ctx, txn, deviceID, userID, keyID, keyTyp, keyInfo, al, sig)
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectOneTimeKeyCount for key upload response usage a map from key algorithm to sum to counterpart
|
||||||
|
func (d *Database) SelectOneTimeKeyCount(
|
||||||
|
ctx context.Context,
|
||||||
|
deviceID, userID string,
|
||||||
|
) (m map[string]int, err error) {
|
||||||
|
m = make(map[string]int)
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
elems, err := d.keyStatements.selectKey(ctx, txn, deviceID, userID)
|
||||||
|
for _, val := range elems {
|
||||||
|
if _, ok := m[val.KeyAlgorithm]; !ok {
|
||||||
|
m[val.KeyAlgorithm] = 0
|
||||||
|
}
|
||||||
|
if val.KeyType == "one_time_key" {
|
||||||
|
m[val.KeyAlgorithm]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryInRange query keys in a range of devices
|
||||||
|
func (d *Database) QueryInRange(
|
||||||
|
ctx context.Context,
|
||||||
|
userID string,
|
||||||
|
arr []string,
|
||||||
|
) (res []types.KeyHolder, err error) {
|
||||||
|
res, err = d.keyStatements.selectInKeys(ctx, userID, arr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertAl persist algorithms
|
||||||
|
func (d *Database) InsertAl(
|
||||||
|
ctx context.Context, uid, device string, al []string,
|
||||||
|
) (err error) {
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) (err error) {
|
||||||
|
err = d.alStatements.insertAl(ctx, txn, uid, device, strings.Join(al, ","))
|
||||||
|
return
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectAl select algorithms
|
||||||
|
func (d *Database) SelectAl(
|
||||||
|
ctx context.Context, uid, device string,
|
||||||
|
) (res []string, err error) {
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) (err error) {
|
||||||
|
holder, err := d.alStatements.selectAl(ctx, txn, uid, device)
|
||||||
|
res = strings.Split(holder.SupportedAlgorithm, ",")
|
||||||
|
return
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectOneTimeKeySingle claim for one time key one for once
|
||||||
|
func (d *Database) SelectOneTimeKeySingle(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, deviceID, algorithm string,
|
||||||
|
) (holder types.KeyHolder, err error) {
|
||||||
|
holder, err = d.keyStatements.selectSingleKey(ctx, userID, deviceID, algorithm)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncOneTimeCount for sync device_one_time_keys_count extension
|
||||||
|
func (d *Database) SyncOneTimeCount(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, deviceID string,
|
||||||
|
) (holder map[string]int, err error) {
|
||||||
|
holder, err = d.keyStatements.selectOneTimeKeyCount(ctx, userID, deviceID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
// ClaimRequest structure
|
||||||
|
type ClaimRequest struct {
|
||||||
|
Timeout int64 `json:"timeout"`
|
||||||
|
ClaimDetail map[string]map[string]string `json:"one_time_keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClaimResponse structure
|
||||||
|
type ClaimResponse struct {
|
||||||
|
Failures map[string]interface{} `json:"failures"`
|
||||||
|
ClaimBody map[string]map[string]map[string]interface{} `json:"one_time_keys"`
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
// QueryRequest structure
|
||||||
|
type QueryRequest struct {
|
||||||
|
Timeout int64 `json:"timeout"`
|
||||||
|
DeviceKeys map[string]interface{} `json:"device_keys"`
|
||||||
|
Token string `json:"token"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryResponse structure
|
||||||
|
type QueryResponse struct {
|
||||||
|
Failure map[string]interface{} `json:"failures"`
|
||||||
|
DeviceKeys map[string]map[string]DeviceKeysQuery `json:"device_keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeviceKeysQuery structure
|
||||||
|
type DeviceKeysQuery struct {
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
DeviceID string `json:"device_id"`
|
||||||
|
Algorithm []string `json:"algorithms"`
|
||||||
|
Keys map[string]string `json:"keys"`
|
||||||
|
Signature map[string]map[string]string `json:"signatures"`
|
||||||
|
Unsigned UnsignedDeviceInfo `json:"unsigned"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnsignedDeviceInfo structure
|
||||||
|
type UnsignedDeviceInfo struct {
|
||||||
|
Info string `json:"device_display_name"`
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
// KeyHolder structure
|
||||||
|
type KeyHolder struct {
|
||||||
|
UserID,
|
||||||
|
DeviceID,
|
||||||
|
Signature,
|
||||||
|
KeyAlgorithm,
|
||||||
|
KeyID,
|
||||||
|
Key,
|
||||||
|
KeyType string
|
||||||
|
}
|
||||||
|
|
||||||
|
// AlHolder structure
|
||||||
|
type AlHolder struct {
|
||||||
|
UserID,
|
||||||
|
DeviceID,
|
||||||
|
SupportedAlgorithm string
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,61 @@
|
||||||
|
// Copyright 2018 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
// UploadEncrypt structure
|
||||||
|
type UploadEncrypt struct {
|
||||||
|
DeviceKeys DeviceKeys `json:"device_keys"`
|
||||||
|
OneTimeKey map[string]interface{} `json:"one_time_keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UploadEncryptSpecific structure
|
||||||
|
type UploadEncryptSpecific struct {
|
||||||
|
DeviceKeys DeviceKeys `json:"device_keys"`
|
||||||
|
OneTimeKey OneTimeKeySpecific `json:"one_time_keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UploadResponse structure
|
||||||
|
type UploadResponse struct {
|
||||||
|
Count map[string]int `json:"one_time_key_counts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeviceKeys structure
|
||||||
|
type DeviceKeys struct {
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
DeviceID string `json:"device_id"`
|
||||||
|
Algorithm []string `json:"algorithms"`
|
||||||
|
Keys map[string]string `json:"keys"`
|
||||||
|
Signature map[string]map[string]string `json:"signatures"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyObject structure
|
||||||
|
type KeyObject struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
Signature map[string]map[string]string `json:"signatures"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// OneTimeKey structure
|
||||||
|
type OneTimeKey struct {
|
||||||
|
//KeyString map[string]string
|
||||||
|
//KeyObject map[string]KeyObject
|
||||||
|
KeySth map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OneTimeKeySpecific structure
|
||||||
|
type OneTimeKeySpecific struct {
|
||||||
|
KeyString map[string]string
|
||||||
|
KeyObject map[string]KeyObject
|
||||||
|
//KeySth map[string]interface{}
|
||||||
|
}
|
||||||
89
src/github.com/matrix-org/dendrite/syncapi/routing/std.go
Normal file
89
src/github.com/matrix-org/dendrite/syncapi/routing/std.go
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
package routing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SendToDevice this is a function for calling process of send-to-device messages those bypassed DAG
|
||||||
|
func SendToDevice(
|
||||||
|
req *http.Request,
|
||||||
|
sender string,
|
||||||
|
syncDB *storage.SyncServerDatabase,
|
||||||
|
deviceDB *devices.Database,
|
||||||
|
eventType, txnID string,
|
||||||
|
notifier *sync.Notifier,
|
||||||
|
) util.JSONResponse {
|
||||||
|
ctx := req.Context()
|
||||||
|
stdRq := types.StdRequest{}
|
||||||
|
httputil.UnmarshalJSONRequest(req, &stdRq)
|
||||||
|
for uid, deviceMap := range stdRq.Sender {
|
||||||
|
|
||||||
|
// federation consideration todo:
|
||||||
|
// if uid is remote domain a fed process should go
|
||||||
|
if false {
|
||||||
|
// federation process
|
||||||
|
return util.JSONResponse{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// uid is local domain
|
||||||
|
for device, cont := range deviceMap {
|
||||||
|
jsonBuffer, err := json.Marshal(cont)
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusForbidden,
|
||||||
|
JSON: struct{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ev := types.StdHolder{
|
||||||
|
Sender: sender,
|
||||||
|
Event: jsonBuffer,
|
||||||
|
EventTyp: eventType,
|
||||||
|
}
|
||||||
|
var pos int64
|
||||||
|
|
||||||
|
// wildcard all devices
|
||||||
|
if device == "*" {
|
||||||
|
var deviceCollection []authtypes.Device
|
||||||
|
var localpart string
|
||||||
|
localpart, _, _ = gomatrixserverlib.SplitID('@', uid)
|
||||||
|
deviceCollection, err = deviceDB.GetDevicesByLocalpart(ctx, localpart)
|
||||||
|
for _, val := range deviceCollection {
|
||||||
|
pos, err = syncDB.InsertStdMessage(ctx, ev, txnID, uid, val.ID)
|
||||||
|
notifier.OnNewEvent(nil, uid, types.StreamPosition(pos))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusForbidden,
|
||||||
|
JSON: struct{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: struct{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pos, err = syncDB.InsertStdMessage(ctx, ev, txnID, uid, device)
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusForbidden,
|
||||||
|
JSON: struct{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
notifier.OnNewEvent(nil, uid, types.StreamPosition(pos))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: struct{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,162 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// we treat send to device as abbrev as STD in the context below.
|
||||||
|
|
||||||
|
const sendToDeviceSchema = `
|
||||||
|
CREATE TABLE IF NOT EXISTS syncapi_send_to_device (
|
||||||
|
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
|
||||||
|
txn_id TEXT NOT NULL,
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
event_type TEXT NOT NULL,
|
||||||
|
target_device_id TEXT NOT NULL,
|
||||||
|
target_user_id TEXT NOT NULL,
|
||||||
|
event_json TEXT NOT NULL,
|
||||||
|
del_read INTEGER DEFAULT 0,
|
||||||
|
max_read BIGINT DEFAULT currval('syncapi_stream_id') ,
|
||||||
|
CONSTRAINT syncapi_send_to_device_unique UNIQUE (txn_id, target_device_id, target_user_id)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertSTDSQL = "" +
|
||||||
|
"INSERT INTO syncapi_send_to_device (" +
|
||||||
|
" sender, event_type, target_user_id, target_device_id, txn_id, event_json" +
|
||||||
|
") VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
|
||||||
|
|
||||||
|
const deleteSTDSQL = "" +
|
||||||
|
"DELETE FROM syncapi_send_to_device WHERE target_user_id = $1 AND target_device_id = $2 AND max_read < $3 AND del_read = 1"
|
||||||
|
|
||||||
|
const selectSTDEventsInRangeSQL = "" +
|
||||||
|
"SELECT id, sender, event_type, event_json FROM syncapi_send_to_device" +
|
||||||
|
" WHERE target_user_id = $1 AND target_device_id = $2 AND id <= $3" +
|
||||||
|
" ORDER BY id LIMIT 100 "
|
||||||
|
|
||||||
|
const updateSTDEventSQL = "" +
|
||||||
|
"UPDATE syncapi_send_to_device SET del_read = 1 , max_read = $1 WHERE id = ANY($2)"
|
||||||
|
|
||||||
|
const selectMaxSTDIDSQL = "" +
|
||||||
|
"SELECT MAX(id) FROM syncapi_send_to_device"
|
||||||
|
|
||||||
|
type stdEventsStatements struct {
|
||||||
|
insertStdEventStmt *sql.Stmt
|
||||||
|
selectStdEventsInRangeStmt *sql.Stmt
|
||||||
|
deleteStdEventStmt *sql.Stmt
|
||||||
|
selectStdIDStmt *sql.Stmt
|
||||||
|
updateStdStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stdEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(sendToDeviceSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertStdEventStmt, err = db.Prepare(insertSTDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectStdEventsInRangeStmt, err = db.Prepare(selectSTDEventsInRangeSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.deleteStdEventStmt, err = db.Prepare(deleteSTDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectStdIDStmt, err = db.Prepare(selectMaxSTDIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.updateStdStmt, err = db.Prepare(updateSTDEventSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stdEventsStatements) insertStdEvent(
|
||||||
|
ctx context.Context, stdEvent types.StdHolder,
|
||||||
|
transactionID string, targetUID, targetDevice string,
|
||||||
|
) (streamPos int64, err error) {
|
||||||
|
err = s.insertStdEventStmt.QueryRowContext(
|
||||||
|
ctx,
|
||||||
|
stdEvent.Sender,
|
||||||
|
stdEvent.EventTyp,
|
||||||
|
targetUID,
|
||||||
|
targetDevice,
|
||||||
|
transactionID,
|
||||||
|
stdEvent.Event,
|
||||||
|
).Scan(&streamPos)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stdEventsStatements) deleteStdEvent(
|
||||||
|
ctx context.Context, userID, deviceID string,
|
||||||
|
idUpBound int64,
|
||||||
|
) error {
|
||||||
|
_, err := s.deleteStdEventStmt.ExecContext(ctx, userID, deviceID, idUpBound)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stdEventsStatements) selectStdEventsInRange(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
targetUserID, targetDeviceID string,
|
||||||
|
endPos int64,
|
||||||
|
) ([]types.StdHolder, error) {
|
||||||
|
stdHolder := []types.StdHolder{}
|
||||||
|
stmt := common.TxStmt(txn, s.selectStdEventsInRangeStmt)
|
||||||
|
rows, err := stmt.QueryContext(ctx, targetUserID, targetDeviceID, endPos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
holder := types.StdHolder{}
|
||||||
|
var (
|
||||||
|
id int64
|
||||||
|
sender string
|
||||||
|
eventType string
|
||||||
|
eventJSON []byte
|
||||||
|
)
|
||||||
|
if err = rows.Scan(&id, &sender, &eventType, &eventJSON); err != nil {
|
||||||
|
closeErr := rows.Close()
|
||||||
|
if closeErr != nil {
|
||||||
|
return nil, closeErr
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
holder.StreamID = id
|
||||||
|
holder.Sender = sender
|
||||||
|
holder.Event = eventJSON
|
||||||
|
holder.EventTyp = eventType
|
||||||
|
stdHolder = append(stdHolder, holder)
|
||||||
|
}
|
||||||
|
err = rows.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// update events with read mark
|
||||||
|
update := []int64{}
|
||||||
|
for _, val := range stdHolder {
|
||||||
|
update = append(update, val.StreamID)
|
||||||
|
}
|
||||||
|
updateStmt := common.TxStmt(txn, s.updateStdStmt)
|
||||||
|
_, err = updateStmt.ExecContext(ctx, endPos, pq.Array(update))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return stdHolder, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stdEventsStatements) selectMaxStdID(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
) (id int64, err error) {
|
||||||
|
var nullableID sql.NullInt64
|
||||||
|
stmt := common.TxStmt(txn, s.selectStdIDStmt)
|
||||||
|
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
|
||||||
|
if nullableID.Valid {
|
||||||
|
id = nullableID.Int64
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,64 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
encryptoapi "github.com/matrix-org/dendrite/encryptoapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type keyCounter struct {
|
||||||
|
sync.RWMutex
|
||||||
|
m map[string]map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
var counter = keyCounter{
|
||||||
|
m: make(map[string]map[string]int),
|
||||||
|
}
|
||||||
|
|
||||||
|
// CounterRead returns uid to countMap
|
||||||
|
func CounterRead(uid string) map[string]int {
|
||||||
|
counter.RLock()
|
||||||
|
defer counter.RUnlock()
|
||||||
|
return counter.m[uid]
|
||||||
|
}
|
||||||
|
|
||||||
|
// CounterWrite write count map to share for all response
|
||||||
|
func CounterWrite(uid string, m map[string]int) {
|
||||||
|
counter.Lock()
|
||||||
|
defer counter.Unlock()
|
||||||
|
counter.m[uid] = m
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyCountEXT key count extension process
|
||||||
|
func KeyCountEXT(
|
||||||
|
ctx context.Context,
|
||||||
|
encryptionDB *encryptoapi.Database,
|
||||||
|
respIn types.Response,
|
||||||
|
userID, deviceID string,
|
||||||
|
) (respOut *types.Response) {
|
||||||
|
|
||||||
|
respOut = &respIn
|
||||||
|
// when extension works at the very beginning
|
||||||
|
resp, err := encryptionDB.SyncOneTimeCount(ctx, userID, deviceID)
|
||||||
|
CounterWrite(userID, resp)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respOut.SignNum = resp
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
@ -22,20 +22,27 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
encryptoapi "github.com/matrix-org/dendrite/encryptoapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const pathPrefixR0 = "/_matrix/client/r0"
|
const pathPrefixR0 = "/_matrix/client/r0"
|
||||||
|
const pathPrefixUnstable = "/_matrix/client/unstable"
|
||||||
|
|
||||||
// Setup configures the given mux with sync-server listeners
|
// Setup configures the given mux with sync-server listeners
|
||||||
|
<<<<<<< HEAD:syncapi/routing/routing.go
|
||||||
//
|
//
|
||||||
// Due to Setup being used to call many other functions, a gocyclo nolint is
|
// Due to Setup being used to call many other functions, a gocyclo nolint is
|
||||||
// applied:
|
// applied:
|
||||||
// nolint: gocyclo
|
// nolint: gocyclo
|
||||||
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) {
|
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) {
|
||||||
|
=======
|
||||||
|
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database, notifier *sync.Notifier, encryptDB *encryptoapi.Database) {
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/routing/routing.go
|
||||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||||
|
unstablemux := apiMux.PathPrefix(pathPrefixUnstable).Subrouter()
|
||||||
|
|
||||||
authData := auth.Data{
|
authData := auth.Data{
|
||||||
AccountDB: nil,
|
AccountDB: nil,
|
||||||
|
|
@ -45,7 +52,7 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServer
|
||||||
|
|
||||||
// TODO: Add AS support for all handlers below.
|
// TODO: Add AS support for all handlers below.
|
||||||
r0mux.Handle("/sync", common.MakeAuthAPI("sync", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
r0mux.Handle("/sync", common.MakeAuthAPI("sync", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
return srp.OnIncomingSyncRequest(req, device)
|
return srp.OnIncomingSyncRequest(req, device, encryptDB)
|
||||||
})).Methods(http.MethodGet, http.MethodOptions)
|
})).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
|
@ -71,4 +78,13 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServer
|
||||||
}
|
}
|
||||||
return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"])
|
return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"])
|
||||||
})).Methods(http.MethodGet, http.MethodOptions)
|
})).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
|
unstablemux.Handle("/sendToDevice/{eventType}/{txnId}",
|
||||||
|
common.MakeAuthAPI("look up changes", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
vars := mux.Vars(req)
|
||||||
|
eventType := vars["eventType"]
|
||||||
|
txnID := vars["txnId"]
|
||||||
|
return SendToDevice(req, device.UserID, syncDB, deviceDB, eventType, txnID, notifier)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,8 +27,12 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
||||||
|
<<<<<<< HEAD:syncapi/storage/syncserver.go
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
=======
|
||||||
|
"encoding/json"
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/dendrite/typingserver/cache"
|
"github.com/matrix-org/dendrite/typingserver/cache"
|
||||||
|
|
@ -60,7 +64,11 @@ type SyncServerDatasource struct {
|
||||||
events outputRoomEventsStatements
|
events outputRoomEventsStatements
|
||||||
roomstate currentRoomStateStatements
|
roomstate currentRoomStateStatements
|
||||||
invites inviteEventsStatements
|
invites inviteEventsStatements
|
||||||
|
<<<<<<< HEAD:syncapi/storage/syncserver.go
|
||||||
typingCache *cache.TypingCache
|
typingCache *cache.TypingCache
|
||||||
|
=======
|
||||||
|
stdMsg stdEventsStatements
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSyncServerDatabase creates a new sync server database
|
// NewSyncServerDatabase creates a new sync server database
|
||||||
|
|
@ -85,7 +93,13 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er
|
||||||
if err := d.invites.prepare(d.db); err != nil {
|
if err := d.invites.prepare(d.db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
<<<<<<< HEAD:syncapi/storage/syncserver.go
|
||||||
d.typingCache = cache.NewTypingCache()
|
d.typingCache = cache.NewTypingCache()
|
||||||
|
=======
|
||||||
|
if err := d.stdMsg.prepare(d.db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go
|
||||||
return &d, nil
|
return &d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -221,11 +235,22 @@ func (d *SyncServerDatasource) syncPositionTx(
|
||||||
if maxInviteID > maxEventID {
|
if maxInviteID > maxEventID {
|
||||||
maxEventID = maxInviteID
|
maxEventID = maxInviteID
|
||||||
}
|
}
|
||||||
|
<<<<<<< HEAD:syncapi/storage/syncserver.go
|
||||||
sp.PDUPosition = maxEventID
|
sp.PDUPosition = maxEventID
|
||||||
|
|
||||||
sp.TypingPosition = d.typingCache.GetLatestSyncPosition()
|
sp.TypingPosition = d.typingCache.GetLatestSyncPosition()
|
||||||
|
|
||||||
return
|
return
|
||||||
|
=======
|
||||||
|
maxStdID, err := d.stdMsg.selectMaxStdID(ctx, txn)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if maxStdID > maxID {
|
||||||
|
maxID = maxStdID
|
||||||
|
}
|
||||||
|
return types.StreamPosition(maxID), nil
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go
|
||||||
}
|
}
|
||||||
|
|
||||||
// addPDUDeltaToResponse adds all PDU deltas to a sync response.
|
// addPDUDeltaToResponse adds all PDU deltas to a sync response.
|
||||||
|
|
@ -945,3 +970,94 @@ func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
send to device messaging implementation
|
||||||
|
del / maxID / select in range / insert
|
||||||
|
*/
|
||||||
|
|
||||||
|
// DelStdMessage delete message for a given maxID, those below would be deleted
|
||||||
|
func (d *SyncServerDatabase) DelStdMessage(
|
||||||
|
ctx context.Context, targetUID, targetDevice string, maxID int64,
|
||||||
|
) (err error) {
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
err := d.stdMsg.deleteStdEvent(ctx, targetUID, targetDevice, maxID)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertStdMessage insert std message
|
||||||
|
func (d *SyncServerDatabase) InsertStdMessage(
|
||||||
|
ctx context.Context, stdEvent types.StdHolder, transactionID, targetUID, targetDevice string,
|
||||||
|
) (pos int64, err error) {
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
curPos, err := d.stdMsg.insertStdEvent(ctx, stdEvent, transactionID, targetUID, targetDevice)
|
||||||
|
pos = curPos
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectMaxStdID select maximum id in std stream
|
||||||
|
func (d *SyncServerDatabase) SelectMaxStdID(
|
||||||
|
ctx context.Context,
|
||||||
|
) (maxID int64, err error) {
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
max, err := d.stdMsg.selectMaxStdID(ctx, txn)
|
||||||
|
maxID = max
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectRangedStd select a range of std messages
|
||||||
|
func (d *SyncServerDatabase) SelectRangedStd(
|
||||||
|
ctx context.Context,
|
||||||
|
targetUserID, targetDeviceID string,
|
||||||
|
endPos int64,
|
||||||
|
) (holder []types.StdHolder, err error) {
|
||||||
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
list, err := d.stdMsg.selectStdEventsInRange(ctx, txn, targetUserID, targetDeviceID, endPos)
|
||||||
|
holder = list
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// StdEXT : send to device extension process
|
||||||
|
func StdEXT(
|
||||||
|
ctx context.Context,
|
||||||
|
syncDB *SyncServerDatabase,
|
||||||
|
respIn types.Response,
|
||||||
|
userID, deviceID string,
|
||||||
|
since int64,
|
||||||
|
) (respOut *types.Response) {
|
||||||
|
respOut = &respIn
|
||||||
|
// when extension works at the very beginning
|
||||||
|
err := syncDB.stdMsg.deleteStdEvent(ctx, userID, deviceID, since)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// when err is nil, these before res should be tagged omitted,
|
||||||
|
// when next /sync is coming , and err is nil , all those omitted.
|
||||||
|
res, err := syncDB.SelectRangedStd(ctx, userID, deviceID, since)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//toDevice := &types.ToDevice{}
|
||||||
|
mid := []types.StdEvent{}
|
||||||
|
//toDevice.StdEvent = mid
|
||||||
|
for _, val := range res {
|
||||||
|
ev := types.StdEvent{}
|
||||||
|
ev.Sender = val.Sender
|
||||||
|
ev.Type = val.EventTyp
|
||||||
|
err := json.Unmarshal(val.Event, &ev.Content)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mid = append(mid, ev)
|
||||||
|
}
|
||||||
|
respOut.ToDevice.StdEvent = mid
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
encryptoapi "github.com/matrix-org/dendrite/encryptoapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -44,9 +45,13 @@ func NewRequestPool(db *storage.SyncServerDatasource, n *Notifier, adb *accounts
|
||||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||||
// called in a dedicated goroutine for this request. This function will block the goroutine
|
// called in a dedicated goroutine for this request. This function will block the goroutine
|
||||||
// until a response is ready, or it times out.
|
// until a response is ready, or it times out.
|
||||||
|
<<<<<<< HEAD:syncapi/sync/requestpool.go
|
||||||
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
var syncData *types.Response
|
var syncData *types.Response
|
||||||
|
|
||||||
|
=======
|
||||||
|
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device, encryptDB *encryptoapi.Database) util.JSONResponse {
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
|
||||||
// Extract values from request
|
// Extract values from request
|
||||||
logger := util.GetLogger(req.Context())
|
logger := util.GetLogger(req.Context())
|
||||||
userID := device.UserID
|
userID := device.UserID
|
||||||
|
|
@ -115,7 +120,16 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
// of calculating the sync only to get timed out before we
|
// of calculating the sync only to get timed out before we
|
||||||
// can respond
|
// can respond
|
||||||
|
|
||||||
|
<<<<<<< HEAD:syncapi/sync/requestpool.go
|
||||||
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
||||||
|
=======
|
||||||
|
syncData, err := rp.currentSyncForUser(*syncReq, currPos)
|
||||||
|
|
||||||
|
// std extension consideration
|
||||||
|
syncData = storage.StdEXT(syncReq.ctx, rp.db, *syncData, syncReq.device.UserID, syncReq.device.ID, int64(currPos))
|
||||||
|
syncData = KeyCountEXT(syncReq.ctx, encryptDB, *syncData, syncReq.device.UserID, syncReq.device.ID)
|
||||||
|
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
|
encryptoapi "github.com/matrix-org/dendrite/encryptoapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/consumers"
|
"github.com/matrix-org/dendrite/syncapi/consumers"
|
||||||
"github.com/matrix-org/dendrite/syncapi/routing"
|
"github.com/matrix-org/dendrite/syncapi/routing"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
|
@ -37,6 +38,7 @@ func SetupSyncAPIComponent(
|
||||||
deviceDB *devices.Database,
|
deviceDB *devices.Database,
|
||||||
accountsDB *accounts.Database,
|
accountsDB *accounts.Database,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
encryptDB *encryptoapi.Database,
|
||||||
) {
|
) {
|
||||||
syncDB, err := storage.NewSyncServerDatasource(string(base.Cfg.Database.SyncAPI))
|
syncDB, err := storage.NewSyncServerDatasource(string(base.Cfg.Database.SyncAPI))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -70,6 +72,7 @@ func SetupSyncAPIComponent(
|
||||||
logrus.WithError(err).Panicf("failed to start client data consumer")
|
logrus.WithError(err).Panicf("failed to start client data consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD:syncapi/syncapi.go
|
||||||
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
||||||
base.Cfg, base.KafkaConsumer, notifier, syncDB,
|
base.Cfg, base.KafkaConsumer, notifier, syncDB,
|
||||||
)
|
)
|
||||||
|
|
@ -78,4 +81,7 @@ func SetupSyncAPIComponent(
|
||||||
}
|
}
|
||||||
|
|
||||||
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
|
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
|
||||||
|
=======
|
||||||
|
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB, notifier, encryptDB)
|
||||||
|
>>>>>>> 8b4b3c6fc46900e9bfe5e234eda309200662b34a:src/github.com/matrix-org/dendrite/syncapi/syncapi.go
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -76,6 +76,33 @@ type Response struct {
|
||||||
Invite map[string]InviteResponse `json:"invite"`
|
Invite map[string]InviteResponse `json:"invite"`
|
||||||
Leave map[string]LeaveResponse `json:"leave"`
|
Leave map[string]LeaveResponse `json:"leave"`
|
||||||
} `json:"rooms"`
|
} `json:"rooms"`
|
||||||
|
ToDevice ToDevice `json:"to_device"`
|
||||||
|
SignNum map[string]int `json:"device_one_time_keys_count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StdHolder represents send to device response from db
|
||||||
|
type StdHolder struct {
|
||||||
|
StreamID int64
|
||||||
|
Sender string
|
||||||
|
EventTyp string
|
||||||
|
Event []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// StdRequest represents send to device request format
|
||||||
|
type StdRequest struct {
|
||||||
|
Sender map[string]map[string]interface{} `json:"messages"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToDevice represents a middleware for response send to device
|
||||||
|
type ToDevice struct {
|
||||||
|
StdEvent []StdEvent `json:"events"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StdEvent represents send to device event format
|
||||||
|
type StdEvent struct {
|
||||||
|
Sender string `json:"sender"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Content interface{} `json:"content"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewResponse creates an empty response with initialised maps.
|
// NewResponse creates an empty response with initialised maps.
|
||||||
|
|
@ -106,7 +133,8 @@ func (r *Response) IsEmpty() bool {
|
||||||
len(r.Rooms.Invite) == 0 &&
|
len(r.Rooms.Invite) == 0 &&
|
||||||
len(r.Rooms.Leave) == 0 &&
|
len(r.Rooms.Leave) == 0 &&
|
||||||
len(r.AccountData.Events) == 0 &&
|
len(r.AccountData.Events) == 0 &&
|
||||||
len(r.Presence.Events) == 0
|
len(r.Presence.Events) == 0 &&
|
||||||
|
len(r.ToDevice.StdEvent) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue