dendrite/internal/cosmosdbapi/client.go
alexfca 927238a686
Use a common way to generate CollectionName and PartitionKey (#18)
* - Create CosmosDocument as a base class
- Add CT and UT
- Refactor all tables to use the CosmosDocument

* - Add UpsertDocument method to perform updates in a generic way
- Add SetUpdateTime() to update the UT for updates
- Refactor it all

* - Add Performquery method
- Refactor appservice_events_table

* - Update naffka Topics and Messages to use the common pattern

* - Update keyserver to use the common pattern for collection and PK

* - Update mediaapi to use the common pattern for collection and pk

* - Update roomserver to use the common pattern for collectionname and pk

* - Update signingkeyserver to use the common pattern for collectionname and pk

* - Update userapi touse the common pattern for collectionname and pk

* - Update partitionOffset to use the common collectionname and pk
- Remove generic GetPartitionKey() method

Co-authored-by: alexf@example.com <alexf@example.com>
2021-09-23 09:02:37 +10:00

133 lines
2.7 KiB
Go

package cosmosdbapi
import (
"context"
"time"
cosmosapi "github.com/vippsas/go-cosmosdb/cosmosapi"
)
type CosmosConnection struct {
Url string
Key string
}
func GetCosmosConnection(accountEndpoint string, accountKey string) CosmosConnection {
return CosmosConnection{
Url: accountEndpoint,
Key: accountKey,
}
}
func GetClient(conn CosmosConnection) *cosmosapi.Client {
cfg := cosmosapi.Config{
MasterKey: conn.Key,
}
return cosmosapi.New(conn.Url, cfg, nil, nil)
}
func UpsertDocument(ctx context.Context,
conn CosmosConnection,
databaseName string,
containerName string,
partitonKey string,
dbData interface{}) error {
var options = getUpsertDocumentOptions(partitonKey)
_, _, err := GetClient(conn).CreateDocument(
ctx,
databaseName,
containerName,
&dbData,
options)
return err
}
func (doc *CosmosDocument) SetUpdateTime() {
now := time.Now().UTC()
doc.Ut = now.Format(time.RFC3339)
}
func PerformQuery(ctx context.Context,
conn CosmosConnection,
databaseName string,
containerName string,
partitonKey string,
qryString string,
params map[string]interface{},
response interface{}) error {
optionsQry := GetQueryDocumentsOptions(partitonKey)
var query = GetQuery(qryString, params)
_, err := GetClient(conn).QueryDocuments(
ctx,
databaseName,
containerName,
query,
&response,
optionsQry)
return err
}
func PerformQueryAllPartitions(ctx context.Context,
conn CosmosConnection,
databaseName string,
containerName string,
qryString string,
params map[string]interface{},
response interface{}) error {
var optionsQry = GetQueryAllPartitionsDocumentsOptions()
var query = GetQuery(qryString, params)
_, err := GetClient(conn).QueryDocuments(
ctx,
databaseName,
containerName,
query,
&response,
optionsQry)
// When there are no Rows we seem to get the generic Bad Req JSON error
if err != nil {
// return nil, err
}
return nil
}
func GenerateDocument(
collection string,
tenantName string,
partitionKey string,
docId string,
) CosmosDocument {
doc := CosmosDocument{}
now := time.Now().UTC()
doc.Timestamp = now.Unix()
doc.Ct = now.Format(time.RFC3339)
doc.Ut = now.Format(time.RFC3339)
doc.Cn = collection
doc.Tn = tenantName
doc.Pk = partitionKey
doc.Id = docId
return doc
}
func GetDocumentOrNil(connection CosmosConnection, config CosmosConfig, ctx context.Context, partitionKey string, cosmosDocId string, dbData interface{}) error {
var _, err = GetClient(connection).GetDocument(
ctx,
config.DatabaseName,
config.ContainerName,
cosmosDocId,
GetGetDocumentOptions(partitionKey),
&dbData,
)
if err != nil {
if err.Error() == "Resource that no longer exists" {
dbData = nil
return nil
}
return err
}
return nil
}