From acf63daf7917c1e229b8ef9cddb00dd2ba59f67a Mon Sep 17 00:00:00 2001 From: alexfca <75228224+alexfca@users.noreply.github.com> Date: Mon, 20 Sep 2021 17:41:04 +1000 Subject: [PATCH] Add CT and UT to all documents and refactor (#17) * - Create CosmosDocument as a base class - Add CT and UT - Refactor all tables to use the CosmosDocument * - Add UpsertDocument method to perform updates in a generic way - Add SetUpdateTime() to update the UT for updates - Refactor it all Co-authored-by: alexf@example.com --- .../cosmosdb/appservice_events_table.go | 89 +++++++++++-------- .../storage/cosmosdb/blacklist_table.go | 40 ++++----- .../storage/cosmosdb/inbound_peeks_table.go | 49 +++++----- .../storage/cosmosdb/joined_hosts_table.go | 71 ++++++++------- .../storage/cosmosdb/outbound_peeks_table.go | 51 ++++++----- .../storage/cosmosdb/queue_edus_table.go | 18 +--- .../storage/cosmosdb/queue_json_table.go | 16 +--- .../storage/cosmosdb/queue_pdus_table.go | 18 +--- internal/cosmosdbapi/client.go | 63 +++++++++++++ internal/cosmosdbapi/document.go | 33 +++---- internal/cosmosdbapi/documentoperations.go | 2 +- internal/cosmosdbutil/document_seq.go | 14 +-- .../cosmosdbutil/partition_offset_table.go | 64 +++++++------ .../naffkacosmosdb/naffka_topics_table.go | 62 +++++-------- .../cosmosdb/cross_signing_keys_table.go | 63 +++++++------ .../cosmosdb/cross_signing_sigs_table.go | 67 ++++++++------ .../storage/cosmosdb/device_keys_table.go | 40 +++++---- .../storage/cosmosdb/key_changes_table.go | 62 +++++++------ .../storage/cosmosdb/one_time_keys_table.go | 54 +++++++---- .../storage/cosmosdb/stale_device_lists.go | 72 ++++++++------- .../cosmosdb/media_repository_table.go | 13 +-- mediaapi/storage/cosmosdb/thumbnail_table.go | 22 ++--- .../storage/cosmosdb/event_json_table.go | 60 +++++++------ .../cosmosdb/event_state_keys_table.go | 30 ++----- .../storage/cosmosdb/event_types_table.go | 16 +--- roomserver/storage/cosmosdb/events_table.go | 29 ++---- roomserver/storage/cosmosdb/invite_table.go | 18 +--- .../storage/cosmosdb/membership_table.go | 20 ++--- .../storage/cosmosdb/previous_events_table.go | 28 ++---- .../storage/cosmosdb/published_table.go | 45 ++++------ .../storage/cosmosdb/redactions_table.go | 16 +--- .../storage/cosmosdb/room_aliases_table.go | 16 +--- roomserver/storage/cosmosdb/rooms_table.go | 30 +++---- .../storage/cosmosdb/state_block_table.go | 26 ++---- .../storage/cosmosdb/state_snapshot_table.go | 16 +--- .../storage/cosmosdb/transactions_table.go | 16 +--- .../storage/cosmosdb/server_key_table.go | 71 ++++++++------- .../storage/cosmosdb/account_data_table.go | 62 +++++++------ .../cosmosdb/backwards_extremities_table.go | 61 +++++++------ .../cosmosdb/current_room_state_table.go | 63 +++++++------ syncapi/storage/cosmosdb/filter_table.go | 18 +--- syncapi/storage/cosmosdb/invites_table.go | 16 +--- syncapi/storage/cosmosdb/memberships_table.go | 73 ++++++++------- .../cosmosdb/output_room_events_table.go | 14 +-- .../output_room_events_topology_table.go | 79 +++++++++------- syncapi/storage/cosmosdb/peeks_table.go | 67 ++++++++------ syncapi/storage/cosmosdb/receipt_table.go | 18 +--- .../storage/cosmosdb/send_to_device_table.go | 16 +--- .../accounts/cosmosdb/account_data_table.go | 71 ++++++++------- .../accounts/cosmosdb/accounts_table.go | 17 +--- .../accounts/cosmosdb/key_backup_table.go | 16 +--- .../cosmosdb/key_backup_version_table.go | 14 +-- .../storage/accounts/cosmosdb/openid_table.go | 16 +--- .../accounts/cosmosdb/profile_table.go | 18 +--- .../accounts/cosmosdb/threepid_table.go | 18 +--- .../storage/devices/cosmosdb/devices_table.go | 17 +--- 56 files changed, 994 insertions(+), 1100 deletions(-) diff --git a/appservice/storage/cosmosdb/appservice_events_table.go b/appservice/storage/cosmosdb/appservice_events_table.go index 3ddff1504..efc23db7c 100644 --- a/appservice/storage/cosmosdb/appservice_events_table.go +++ b/appservice/storage/cosmosdb/appservice_events_table.go @@ -57,13 +57,8 @@ type EventNumberCosmosData struct { } type EventCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Event EventCosmos `json:"mx_appservice_event"` + cosmosdbapi.CosmosDocument + Event EventCosmos `json:"mx_appservice_event"` } // "SELECT id, headered_event_json, txn_id " + @@ -166,6 +161,23 @@ func queryEventEventNumber(s *eventsStatements, ctx context.Context, qry string, return response, nil } +func getEvent(s *eventsStatements, ctx context.Context, pk string, docId string) (*EventCosmosData, error) { + response := EventCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func setEvent(s *eventsStatements, ctx context.Context, event EventCosmosData) (*EventCosmosData, error) { var optionsReplace = cosmosdbapi.GetReplaceDocumentOptions(event.Pk, event.ETag) var _, _, ex = cosmosdbapi.GetClient(s.db.connection).ReplaceDocument( @@ -338,46 +350,45 @@ func (s *eventsStatements) insertEvent( // "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + // "VALUES ($1, $2, $3)" - // id INTEGER PRIMARY KEY AUTOINCREMENT, - idSeq, seqErr := GetNextEventID(s, ctx) - if seqErr != nil { - return seqErr - } - var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) - docId := fmt.Sprintf("%d", idSeq) + docId := fmt.Sprintf("%s", appServiceID) cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - // appServiceID, - // eventJSON, - // -1, // No transaction ID yet - data := EventCosmos{ - AppServiceID: appServiceID, - HeaderedEventJSON: eventJSON, - ID: idSeq, - TXNID: -1, + dbData, err := getEvent(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.Event.HeaderedEventJSON = eventJSON + } else { + // id INTEGER PRIMARY KEY AUTOINCREMENT, + idSeq, seqErr := GetNextEventID(s, ctx) + if seqErr != nil { + return seqErr + } + + // appServiceID, + // eventJSON, + // -1, // No transaction ID yet + data := EventCosmos{ + AppServiceID: appServiceID, + HeaderedEventJSON: eventJSON, + ID: idSeq, + TXNID: -1, + } + + dbData = &EventCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Event: data, + } + } - dbData := &EventCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Event: data, - } - - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return err - + dbData.Pk, + &dbData) } // updateTxnIDForEvents sets the transactionID for a collection of events. Done diff --git a/federationsender/storage/cosmosdb/blacklist_table.go b/federationsender/storage/cosmosdb/blacklist_table.go index d5b08756c..531ffddee 100644 --- a/federationsender/storage/cosmosdb/blacklist_table.go +++ b/federationsender/storage/cosmosdb/blacklist_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -38,12 +37,7 @@ type BlacklistCosmos struct { } type BlacklistCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument Blacklist BlacklistCosmos `json:"mx_federationsender_blacklist"` } @@ -149,30 +143,28 @@ func (s *blacklistStatements) InsertBlacklist( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := BlacklistCosmos{ - ServerName: string(serverName), - } + dbData, _ := getBlacklist(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + } else { + data := BlacklistCosmos{ + ServerName: string(serverName), + } - dbData := &BlacklistCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Blacklist: data, + dbData = &BlacklistCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Blacklist: data, + } } // _, err := stmt.ExecContext(ctx, serverName) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return err + dbData.Pk, + &dbData) } // selectRoomForUpdate locks the row for the room and returns the last_event_id. diff --git a/federationsender/storage/cosmosdb/inbound_peeks_table.go b/federationsender/storage/cosmosdb/inbound_peeks_table.go index fd8bc911f..2d139a46a 100644 --- a/federationsender/storage/cosmosdb/inbound_peeks_table.go +++ b/federationsender/storage/cosmosdb/inbound_peeks_table.go @@ -48,12 +48,7 @@ type InboundPeekCosmos struct { } type InboundPeekCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument InboundPeek InboundPeekCosmos `json:"mx_federationsender_inbound_peek"` } @@ -183,33 +178,35 @@ func (s *inboundPeeksStatements) InsertInboundPeek( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := InboundPeekCosmos{ - RoomID: roomID, - ServerName: string(serverName), - PeekID: peekID, - CreationTimestamp: nowMilli, - RenewedTimestamp: nowMilli, - RenewalInterval: renewalInterval, - } + dbData, _ := getInboundPeek(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.InboundPeek.RenewedTimestamp = nowMilli + dbData.InboundPeek.RenewalInterval = renewalInterval + } else { + data := InboundPeekCosmos{ + RoomID: roomID, + ServerName: string(serverName), + PeekID: peekID, + CreationTimestamp: nowMilli, + RenewedTimestamp: nowMilli, + RenewalInterval: renewalInterval, + } - dbData := &InboundPeekCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - InboundPeek: data, + dbData = &InboundPeekCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + InboundPeek: data, + } } // _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + &dbData) return } diff --git a/federationsender/storage/cosmosdb/joined_hosts_table.go b/federationsender/storage/cosmosdb/joined_hosts_table.go index 2b385f90a..ff8bec7b7 100644 --- a/federationsender/storage/cosmosdb/joined_hosts_table.go +++ b/federationsender/storage/cosmosdb/joined_hosts_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -53,12 +52,7 @@ type JoinedHostCosmos struct { } type JoinedHostCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument JoinedHost JoinedHostCosmos `json:"mx_federationsender_joined_host"` } @@ -102,6 +96,23 @@ type joinedHostsStatements struct { tableName string } +func getJoinedHost(s *joinedHostsStatements, ctx context.Context, pk string, docId string) (*JoinedHostCosmosData, error) { + response := JoinedHostCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryJoinedHostDistinct(s *joinedHostsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]JoinedHostCosmos, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -190,32 +201,28 @@ func (s *joinedHostsStatements) InsertJoinedHosts( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := JoinedHostCosmos{ - EventID: eventID, - RoomID: roomID, - ServerName: string(serverName), + dbData, _ := getJoinedHost(s, ctx, pk, cosmosDocId) + if dbData == nil { + data := JoinedHostCosmos{ + EventID: eventID, + RoomID: roomID, + ServerName: string(serverName), + } + + dbData = &JoinedHostCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + JoinedHost: data, + } + // _, err := stmt.ExecContext(ctx, roomID, eventID, serverName) + + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, + s.db.cosmosConfig.DatabaseName, + s.db.cosmosConfig.ContainerName, + dbData.Pk, + &dbData) } - - dbData := &JoinedHostCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - JoinedHost: data, - } - - // _, err := stmt.ExecContext(ctx, roomID, eventID, serverName) - - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, - s.db.cosmosConfig.DatabaseName, - s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return err + return nil } func (s *joinedHostsStatements) DeleteJoinedHosts( diff --git a/federationsender/storage/cosmosdb/outbound_peeks_table.go b/federationsender/storage/cosmosdb/outbound_peeks_table.go index 5d1948c9f..ab2efd51d 100644 --- a/federationsender/storage/cosmosdb/outbound_peeks_table.go +++ b/federationsender/storage/cosmosdb/outbound_peeks_table.go @@ -48,12 +48,7 @@ type OutboundPeekCosmos struct { } type OutboundPeekCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument OutboundPeek OutboundPeekCosmos `json:"mx_federationsender_outbound_peek"` } @@ -179,33 +174,37 @@ func (s *outboundPeeksStatements) InsertOutboundPeek( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := OutboundPeekCosmos{ - RoomID: roomID, - ServerName: string(serverName), - PeekID: peekID, - CreationTimestamp: nowMilli, - RenewedTimestamp: nowMilli, - RenewalInterval: renewalInterval, - } + dbData, _ := getOutboundPeek(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.OutboundPeek.RenewalInterval = renewalInterval + dbData.OutboundPeek.RenewedTimestamp = nowMilli + + } else { + data := OutboundPeekCosmos{ + RoomID: roomID, + ServerName: string(serverName), + PeekID: peekID, + CreationTimestamp: nowMilli, + RenewedTimestamp: nowMilli, + RenewalInterval: renewalInterval, + } + + dbData = &OutboundPeekCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + OutboundPeek: data, + } - dbData := &OutboundPeekCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - OutboundPeek: data, } // _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + &dbData) return } diff --git a/federationsender/storage/cosmosdb/queue_edus_table.go b/federationsender/storage/cosmosdb/queue_edus_table.go index 0f4935c65..790a6dae8 100644 --- a/federationsender/storage/cosmosdb/queue_edus_table.go +++ b/federationsender/storage/cosmosdb/queue_edus_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -50,13 +49,8 @@ type QueueEDUCosmosNumber struct { } type QueueEDUCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - QueueEDU QueueEDUCosmos `json:"mx_federationsender_queue_edu"` + cosmosdbapi.CosmosDocument + QueueEDU QueueEDUCosmos `json:"mx_federationsender_queue_edu"` } // const insertQueueEDUSQL = "" + @@ -218,12 +212,8 @@ func (s *queueEDUsStatements) InsertQueueEDU( } dbData := &QueueEDUCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - QueueEDU: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + QueueEDU: data, } // _, err := stmt.ExecContext( diff --git a/federationsender/storage/cosmosdb/queue_json_table.go b/federationsender/storage/cosmosdb/queue_json_table.go index a7faada2b..0c464fae5 100644 --- a/federationsender/storage/cosmosdb/queue_json_table.go +++ b/federationsender/storage/cosmosdb/queue_json_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" ) @@ -42,12 +41,7 @@ type QueueJSONCosmos struct { } type QueueJSONCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument QueueJSON QueueJSONCosmos `json:"mx_federationsender_queue_json"` } @@ -143,12 +137,8 @@ func (s *queueJSONStatements) InsertQueueJSON( } dbData := &QueueJSONCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - QueueJSON: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + QueueJSON: data, } // stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) diff --git a/federationsender/storage/cosmosdb/queue_pdus_table.go b/federationsender/storage/cosmosdb/queue_pdus_table.go index 04deb41e3..a4afa5dce 100644 --- a/federationsender/storage/cosmosdb/queue_pdus_table.go +++ b/federationsender/storage/cosmosdb/queue_pdus_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -51,13 +50,8 @@ type QueuePDUCosmosNumber struct { } type QueuePDUCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - QueuePDU QueuePDUCosmos `json:"mx_federationsender_queue_pdu"` + cosmosdbapi.CosmosDocument + QueuePDU QueuePDUCosmos `json:"mx_federationsender_queue_pdu"` } // const insertQueuePDUSQL = "" + @@ -230,12 +224,8 @@ func (s *queuePDUsStatements) InsertQueuePDU( } dbData := &QueuePDUCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - QueuePDU: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + QueuePDU: data, } // stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) diff --git a/internal/cosmosdbapi/client.go b/internal/cosmosdbapi/client.go index eb99795b5..1dfddb94e 100644 --- a/internal/cosmosdbapi/client.go +++ b/internal/cosmosdbapi/client.go @@ -1,6 +1,9 @@ package cosmosdbapi import ( + "context" + "time" + cosmosapi "github.com/vippsas/go-cosmosdb/cosmosapi" ) @@ -22,3 +25,63 @@ func GetClient(conn CosmosConnection) *cosmosapi.Client { } return cosmosapi.New(conn.Url, cfg, nil, nil) } + +func UpsertDocument(ctx context.Context, + conn CosmosConnection, + databaseName string, + containerName string, + partitonKey string, + dbData interface{}) error { + var options = getUpsertDocumentOptions(partitonKey) + _, _, err := GetClient(conn).CreateDocument( + ctx, + databaseName, + containerName, + &dbData, + options) + return err +} + +func (doc *CosmosDocument) SetUpdateTime() { + now := time.Now().UTC() + doc.Ut = now.Format(time.RFC3339) +} + +func GenerateDocument( + collection string, + tenantName string, + partitionKey string, + docId string, +) CosmosDocument { + doc := CosmosDocument{} + now := time.Now().UTC() + doc.Timestamp = now.Unix() + doc.Ct = now.Format(time.RFC3339) + doc.Ut = now.Format(time.RFC3339) + doc.Cn = collection + doc.Tn = tenantName + doc.Pk = partitionKey + doc.Id = docId + return doc +} + +func GetDocumentOrNil(connection CosmosConnection, config CosmosConfig, ctx context.Context, partitionKey string, cosmosDocId string, dbData interface{}) error { + var _, err = GetClient(connection).GetDocument( + ctx, + config.DatabaseName, + config.ContainerName, + cosmosDocId, + GetGetDocumentOptions(partitionKey), + &dbData, + ) + + if err != nil { + if err.Error() == "Resource that no longer exists" { + dbData = nil + return nil + } + return err + } + + return nil +} diff --git a/internal/cosmosdbapi/document.go b/internal/cosmosdbapi/document.go index 6c6edafed..64c68de01 100644 --- a/internal/cosmosdbapi/document.go +++ b/internal/cosmosdbapi/document.go @@ -1,11 +1,21 @@ package cosmosdbapi import ( - "context" "fmt" "strings" ) +type CosmosDocument struct { + Id string `json:"id"` + Pk string `json:"_pk"` + Tn string `json:"_sid"` + Cn string `json:"_cn"` + Ct string `json:"_ct"` + Ut string `json:"_ut"` + ETag string `json:"_etag"` + Timestamp int64 `json:"_ts"` +} + func removeSpecialChars(docId string) string { // The following characters are restricted and cannot be used in the Id property: '/', '\', '?', '#' invalidChars := [4]string{"/", "\\", "?", "#"} @@ -25,24 +35,3 @@ func GetDocumentId(tenantName string, collectionName string, id string) string { func GetPartitionKey(tenantName string, collectionName string) string { return fmt.Sprintf("%s,%s", collectionName, tenantName) } - -func GetDocumentOrNil(connection CosmosConnection, config CosmosConfig, ctx context.Context, partitionKey string, cosmosDocId string, dbData interface{}) error { - var _, err = GetClient(connection).GetDocument( - ctx, - config.DatabaseName, - config.ContainerName, - cosmosDocId, - GetGetDocumentOptions(partitionKey), - &dbData, - ) - - if err != nil { - if err.Error() == "Resource that no longer exists" { - dbData = nil - return nil - } - return err - } - - return nil -} diff --git a/internal/cosmosdbapi/documentoperations.go b/internal/cosmosdbapi/documentoperations.go index ad52c05c7..6ec944fe3 100644 --- a/internal/cosmosdbapi/documentoperations.go +++ b/internal/cosmosdbapi/documentoperations.go @@ -11,7 +11,7 @@ func GetCreateDocumentOptions(pk string) cosmosapi.CreateDocumentOptions { } } -func GetUpsertDocumentOptions(pk string) cosmosapi.CreateDocumentOptions { +func getUpsertDocumentOptions(pk string) cosmosapi.CreateDocumentOptions { return cosmosapi.CreateDocumentOptions{ IsUpsert: true, PartitionKeyValue: pk, diff --git a/internal/cosmosdbutil/document_seq.go b/internal/cosmosdbutil/document_seq.go index e39630811..09963dbe5 100644 --- a/internal/cosmosdbutil/document_seq.go +++ b/internal/cosmosdbutil/document_seq.go @@ -8,13 +8,8 @@ import ( ) type SequenceCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Value int64 `json:"_value"` + cosmosdbapi.CosmosDocument + Value int64 `json:"_value"` } func GetNextSequence( @@ -43,10 +38,7 @@ func GetNextSequence( if dbData.Id == "" { dbData = SequenceCosmosData{} - dbData.Id = cosmosDocId - dbData.Pk = pk - dbData.Tn = config.TenantName - dbData.Cn = dbCollectionName + dbData.CosmosDocument = cosmosdbapi.GenerateDocument(dbCollectionName, config.TenantName, pk, cosmosDocId) dbData.Value = initial var optionsCreate = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) var _, _, err = cosmosdbapi.GetClient(connection).CreateDocument( diff --git a/internal/cosmosdbutil/partition_offset_table.go b/internal/cosmosdbutil/partition_offset_table.go index 7bdea4b18..39f296f75 100644 --- a/internal/cosmosdbutil/partition_offset_table.go +++ b/internal/cosmosdbutil/partition_offset_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -54,12 +53,7 @@ type PartitionOffsetCosmos struct { } type PartitionOffsetCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument PartitionOffset PartitionOffsetCosmos `json:"mx_partition_offset"` } @@ -90,6 +84,23 @@ type PartitionOffsetStatements struct { tableName string } +func getPartitionOffset(s *PartitionOffsetStatements, ctx context.Context, pk string, docId string) (*PartitionOffsetCosmosData, error) { + response := PartitionOffsetCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.Connection, + s.db.CosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryPartitionOffset(s *PartitionOffsetStatements, ctx context.Context, qry string, params map[string]interface{}) ([]PartitionOffsetCosmosData, error) { var dbCollectionName = getCollectionName(*s) var pk = cosmosdbapi.GetPartitionKey(s.db.CosmosConfig.ContainerName, dbCollectionName) @@ -192,33 +203,32 @@ func (s *PartitionOffsetStatements) upsertPartitionOffset( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.CosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.CosmosConfig.ContainerName, dbCollectionName) - data := PartitionOffsetCosmos{ - Partition: partition, - PartitionOffset: offset, - Topic: topic, - } + dbData, _ := getPartitionOffset(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.PartitionOffset.PartitionOffset = offset + } else { + data := PartitionOffsetCosmos{ + Partition: partition, + PartitionOffset: offset, + Topic: topic, + } + + dbData = &PartitionOffsetCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.CosmosConfig.TenantName, pk, cosmosDocId), + PartitionOffset: data, + } - dbData := &PartitionOffsetCosmosData{ - Id: cosmosDocId, - Tn: s.db.CosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - // nowMilli := time.Now().UnixNano() / int64(time.Millisecond) - Timestamp: time.Now().Unix(), - PartitionOffset: data, } // _, err := stmt.ExecContext(ctx, topic, partition, offset) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(s.db.Connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.Connection, s.db.CosmosConfig.DatabaseName, s.db.CosmosConfig.ContainerName, - &dbData, - options) - - return err + dbData.Pk, + &dbData) }) } diff --git a/internal/naffka/naffkacosmosdb/naffka_topics_table.go b/internal/naffka/naffkacosmosdb/naffka_topics_table.go index cd5d598ac..8bf4d18a7 100644 --- a/internal/naffka/naffkacosmosdb/naffka_topics_table.go +++ b/internal/naffka/naffkacosmosdb/naffka_topics_table.go @@ -38,13 +38,8 @@ type TopicCosmosNumber struct { } type TopicCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Topic TopicCosmos `json:"mx_naffka_topic"` + cosmosdbapi.CosmosDocument + Topic TopicCosmos `json:"mx_naffka_topic"` } type MessageCosmos struct { @@ -56,13 +51,8 @@ type MessageCosmos struct { } type MessageCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Message MessageCosmos `json:"mx_naffka_message"` + cosmosdbapi.CosmosDocument + Message MessageCosmos `json:"mx_naffka_message"` } // const insertTopicSQL = "" + @@ -228,31 +218,30 @@ func (t *topicsStatements) InsertTopic( cosmosDocId := cosmosdbapi.GetDocumentId(t.DB.cosmosConfig.ContainerName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(t.DB.cosmosConfig.ContainerName, dbCollectionName) - data := TopicCosmos{ - TopicNID: topicNID, - TopicName: topicName, - } + dbData, _ := getTopic(t, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.Topic.TopicName = topicName + } else { + data := TopicCosmos{ + TopicNID: topicNID, + TopicName: topicName, + } - dbData := &TopicCosmosData{ - Id: cosmosDocId, - Tn: t.DB.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Topic: data, + dbData = &TopicCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, t.DB.cosmosConfig.TenantName, pk, cosmosDocId), + Topic: data, + } } // _, err := stmt.ExecContext(ctx, topicName, topicNID) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(t.DB.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + t.DB.connection, t.DB.cosmosConfig.DatabaseName, t.DB.cosmosConfig.ContainerName, - &dbData, - options) - - return err + dbData.Pk, + dbData) } func (t *topicsStatements) SelectNextTopicNID( @@ -366,12 +355,8 @@ func (t *topicsStatements) InsertTopics( } dbData := &MessageCosmosData{ - Id: cosmosDocId, - Tn: t.DB.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Message: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, t.DB.cosmosConfig.TenantName, pk, cosmosDocId), + Message: data, } // _, err := stmt.ExecContext(ctx, topicNID, messageOffset, topicKey, topicValue, messageTimestampNs) @@ -383,7 +368,6 @@ func (t *topicsStatements) InsertTopics( t.DB.cosmosConfig.ContainerName, &dbData, options) - return err } diff --git a/keyserver/storage/cosmosdb/cross_signing_keys_table.go b/keyserver/storage/cosmosdb/cross_signing_keys_table.go index fd32e7019..99ce6af05 100644 --- a/keyserver/storage/cosmosdb/cross_signing_keys_table.go +++ b/keyserver/storage/cosmosdb/cross_signing_keys_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/keyserver/storage/tables" @@ -42,12 +41,7 @@ type CrossSigningKeysCosmos struct { } type CrossSigningKeysCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument CrossSigningKeys CrossSigningKeysCosmos `json:"mx_keyserver_cross_signing_keys"` } @@ -68,6 +62,23 @@ type crossSigningKeysStatements struct { tableName string } +func getCrossSigningKeys(s *crossSigningKeysStatements, ctx context.Context, pk string, docId string) (*CrossSigningKeysCosmosData, error) { + response := CrossSigningKeysCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryCrossSigningKeys(s *crossSigningKeysStatements, ctx context.Context, qry string, params map[string]interface{}) ([]CrossSigningKeysCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -149,31 +160,29 @@ func (s *crossSigningKeysStatements) UpsertCrossSigningKeysForUser( docId := fmt.Sprintf("%s_%s", userID, keyType) cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) - data := CrossSigningKeysCosmos{ - UserID: userID, - KeyType: int64(keyTypeInt), - KeyData: keyData, - } + dbData, _ := getCrossSigningKeys(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.CrossSigningKeys.KeyData = keyData + } else { + data := CrossSigningKeysCosmos{ + UserID: userID, + KeyType: int64(keyTypeInt), + KeyData: keyData, + } - dbData := CrossSigningKeysCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - CrossSigningKeys: data, + dbData = &CrossSigningKeysCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + CrossSigningKeys: data, + } } - // if _, err := sqlutil.TxStmt(txn, s.upsertCrossSigningKeysForUserStmt).ExecContext(ctx, userID, keyTypeInt, keyData); err != nil { // return fmt.Errorf("s.upsertCrossSigningKeysForUserStmt: %w", err) // } - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) - - return err + dbData.Pk, + dbData) } diff --git a/keyserver/storage/cosmosdb/cross_signing_sigs_table.go b/keyserver/storage/cosmosdb/cross_signing_sigs_table.go index 2ef8a0518..b6c018185 100644 --- a/keyserver/storage/cosmosdb/cross_signing_sigs_table.go +++ b/keyserver/storage/cosmosdb/cross_signing_sigs_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/keyserver/storage/tables" @@ -46,12 +45,7 @@ type CrossSigningSigsCosmos struct { } type CrossSigningSigsCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument CrossSigningSigs CrossSigningSigsCosmos `json:"mx_keyserver_cross_signing_sigs"` } @@ -80,6 +74,23 @@ type crossSigningSigsStatements struct { tableName string } +func getCrossSigningSigs(s *crossSigningSigsStatements, ctx context.Context, pk string, docId string) (*CrossSigningSigsCosmosData, error) { + response := CrossSigningSigsCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryCrossSigningSigs(s *crossSigningSigsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]CrossSigningSigsCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -182,34 +193,34 @@ func (s *crossSigningSigsStatements) UpsertCrossSigningSigsForTarget( docId := fmt.Sprintf("%s_%s_%s", originUserID, targetUserID, targetKeyID) cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) - data := CrossSigningSigsCosmos{ - TargetUserId: targetUserID, - TargetKeyId: string(targetKeyID), - OriginUserId: originUserID, - OriginKeyId: string(originKeyID), - Signature: signature, - } + dbData, _ := getCrossSigningSigs(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.CrossSigningSigs.OriginKeyId = string(originKeyID) + dbData.CrossSigningSigs.Signature = signature + } else { + data := CrossSigningSigsCosmos{ + TargetUserId: targetUserID, + TargetKeyId: string(targetKeyID), + OriginUserId: originUserID, + OriginKeyId: string(originKeyID), + Signature: signature, + } - dbData := CrossSigningSigsCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - CrossSigningSigs: data, + dbData = &CrossSigningSigsCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + CrossSigningSigs: data, + } } - // if _, err := sqlutil.TxStmt(txn, s.upsertCrossSigningSigsForTargetStmt).ExecContext(ctx, originUserID, originKeyID, targetUserID, targetKeyID, signature); err != nil { // return fmt.Errorf("s.upsertCrossSigningSigsForTargetStmt: %w", err) // } - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) - return err + dbData.Pk, + dbData) } func (s *crossSigningSigsStatements) DeleteCrossSigningSigsForTarget( diff --git a/keyserver/storage/cosmosdb/device_keys_table.go b/keyserver/storage/cosmosdb/device_keys_table.go index f96bd77f4..c67a231e0 100644 --- a/keyserver/storage/cosmosdb/device_keys_table.go +++ b/keyserver/storage/cosmosdb/device_keys_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -55,12 +54,7 @@ type DeviceKeyCosmosNumber struct { } type DeviceKeyCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument DeviceKey DeviceKeyCosmos `json:"mx_keyserver_device_key"` } @@ -167,13 +161,26 @@ func getDeviceKey(s *deviceKeysStatements, ctx context.Context, pk string, docId } func insertDeviceKeyCore(s *deviceKeysStatements, ctx context.Context, dbData DeviceKeyCosmosData) error { - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + // "INSERT INTO keyserver_device_keys (user_id, device_id, ts_added_secs, key_json, stream_id, display_name)" + + // " VALUES ($1, $2, $3, $4, $5, $6)" + + // " ON CONFLICT (user_id, device_id)" + + // " DO UPDATE SET key_json = $4, stream_id = $5, display_name = $6" + existing, _ := getDeviceKey(s, ctx, dbData.Pk, dbData.Id) + if existing != nil { + existing.SetUpdateTime() + existing.DeviceKey.KeyJSON = dbData.DeviceKey.KeyJSON + existing.DeviceKey.StreamID = dbData.DeviceKey.StreamID + existing.DeviceKey.DisplayName = dbData.DeviceKey.DisplayName + } else { + existing = &dbData + } + + err := cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) + existing.Pk, + existing) if err != nil { return err @@ -445,18 +452,13 @@ func (s *deviceKeysStatements) InsertDeviceKeys(ctx context.Context, txn *sql.Tx pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) for _, key := range keys { - now := time.Now().Unix() // UNIQUE (user_id, device_id) docId := fmt.Sprintf("%s_%s", key.UserID, key.DeviceID) cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) dbData := &DeviceKeyCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: now, - DeviceKey: mapFromDeviceKeyMessage(key), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + DeviceKey: mapFromDeviceKeyMessage(key), } err := insertDeviceKeyCore(s, ctx, *dbData) diff --git a/keyserver/storage/cosmosdb/key_changes_table.go b/keyserver/storage/cosmosdb/key_changes_table.go index 981b17c7c..8fdcc157c 100644 --- a/keyserver/storage/cosmosdb/key_changes_table.go +++ b/keyserver/storage/cosmosdb/key_changes_table.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "time" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -48,12 +47,7 @@ type KeyChangeUserMaxCosmosData struct { } type KeyChangeCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument KeyChange KeyChangeCosmos `json:"mx_keyserver_key_change"` } @@ -84,6 +78,23 @@ type keyChangesStatements struct { tableName string } +func getKeyChangeUser(s *keyChangesStatements, ctx context.Context, pk string, docId string) (*KeyChangeCosmosData, error) { + response := KeyChangeCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryKeyChangeUserMax(s *keyChangesStatements, ctx context.Context, qry string, params map[string]interface{}) ([]KeyChangeUserMaxCosmosData, error) { var response []KeyChangeUserMaxCosmosData @@ -127,31 +138,30 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in docId := fmt.Sprintf("%d_%d", partition, offset) cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) - data := KeyChangeCosmos{ - Offset: offset, - Partition: partition, - UserID: userID, - } + dbData, _ := getKeyChangeUser(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.KeyChange.UserID = userID + } else { + data := KeyChangeCosmos{ + Offset: offset, + Partition: partition, + UserID: userID, + } - dbData := KeyChangeCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - KeyChange: data, + dbData = &KeyChangeCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + KeyChange: data, + } } // _, err := s.upsertKeyChangeStmt.ExecContext(ctx, partition, offset, userID) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) - - return err + dbData.Pk, + dbData) } func (s *keyChangesStatements) SelectKeyChanges( diff --git a/keyserver/storage/cosmosdb/one_time_keys_table.go b/keyserver/storage/cosmosdb/one_time_keys_table.go index b2a4ccb21..e85ce6f00 100644 --- a/keyserver/storage/cosmosdb/one_time_keys_table.go +++ b/keyserver/storage/cosmosdb/one_time_keys_table.go @@ -19,7 +19,6 @@ import ( "database/sql" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -59,12 +58,7 @@ type OneTimeKeyAlgoNumberCosmosData struct { } type OneTimeKeyCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument OneTimeKey OneTimeKeyCosmos `json:"mx_keyserver_one_time_key"` } @@ -108,6 +102,23 @@ type oneTimeKeysStatements struct { tableName string } +func getOneTimeKey(s *oneTimeKeysStatements, ctx context.Context, pk string, docId string) (*OneTimeKeyCosmosData, error) { + response := OneTimeKeyCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryOneTimeKey(s *oneTimeKeysStatements, ctx context.Context, qry string, params map[string]interface{}) ([]OneTimeKeyCosmosData, error) { var response []OneTimeKeyCosmosData @@ -155,13 +166,23 @@ func queryOneTimeKeyAlgoCount(s *oneTimeKeysStatements, ctx context.Context, qry } func insertOneTimeKeyCore(s *oneTimeKeysStatements, ctx context.Context, dbData OneTimeKeyCosmosData) error { - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + // "INSERT INTO keyserver_one_time_keys (user_id, device_id, key_id, algorithm, ts_added_secs, key_json)" + + // " VALUES ($1, $2, $3, $4, $5, $6)" + + // " ON CONFLICT (user_id, device_id, key_id, algorithm)" + + // " DO UPDATE SET key_json = $6" + existing, _ := getOneTimeKey(s, ctx, dbData.Pk, dbData.Id) + if existing != nil { + existing.SetUpdateTime() + existing.OneTimeKey.KeyJSON = dbData.OneTimeKey.KeyJSON + } else { + existing = &dbData + } + err := cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) + existing.Pk, + existing) if err != nil { return err @@ -266,7 +287,6 @@ func (s *oneTimeKeysStatements) CountOneTimeKeys(ctx context.Context, userID, de func (s *oneTimeKeysStatements) InsertOneTimeKeys( ctx context.Context, txn *sql.Tx, keys api.OneTimeKeys, ) (*api.OneTimeKeysCount, error) { - now := time.Now().Unix() counts := &api.OneTimeKeysCount{ DeviceID: keys.DeviceID, UserID: keys.UserID, @@ -298,12 +318,8 @@ func (s *oneTimeKeysStatements) InsertOneTimeKeys( } dbData := &OneTimeKeyCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: now, - OneTimeKey: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + OneTimeKey: data, } err := insertOneTimeKeyCore(s, ctx, *dbData) diff --git a/keyserver/storage/cosmosdb/stale_device_lists.go b/keyserver/storage/cosmosdb/stale_device_lists.go index 5d7a0b521..6d5770ab9 100644 --- a/keyserver/storage/cosmosdb/stale_device_lists.go +++ b/keyserver/storage/cosmosdb/stale_device_lists.go @@ -36,20 +36,14 @@ import ( // ` type StaleDeviceListCosmos struct { - UserID string `json:"user_id"` - Domain string `json:"domain"` - IsStale bool `json:"is_stale"` - // Use the CosmosDB.Timestamp for this one - // ts_added_secs int64 `json:"ts_added_secs"` + UserID string `json:"user_id"` + Domain string `json:"domain"` + IsStale bool `json:"is_stale"` + AddedSecs int64 `json:"ts_added_secs"` } type StaleDeviceListCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument StaleDeviceList StaleDeviceListCosmos `json:"mx_keyserver_stale_device_list"` } @@ -78,6 +72,23 @@ type staleDeviceListsStatements struct { tableName string } +func getStaleDeviceList(s *staleDeviceListsStatements, ctx context.Context, pk string, docId string) (*StaleDeviceListCosmosData, error) { + response := StaleDeviceListCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryStaleDeviceList(s *staleDeviceListsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]StaleDeviceListCosmosData, error) { var response []StaleDeviceListCosmosData @@ -126,31 +137,30 @@ func (s *staleDeviceListsStatements) InsertStaleDeviceList(ctx context.Context, docId := userID cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) - data := StaleDeviceListCosmos{ - Domain: string(domain), - IsStale: isStale, - UserID: userID, - } + dbData, _ := getStaleDeviceList(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.StaleDeviceList.IsStale = isStale + dbData.StaleDeviceList.AddedSecs = time.Now().Unix() + } else { + data := StaleDeviceListCosmos{ + Domain: string(domain), + IsStale: isStale, + UserID: userID, + } - dbData := StaleDeviceListCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - StaleDeviceList: data, + dbData = &StaleDeviceListCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + StaleDeviceList: data, + } } - // _, err := s.upsertKeyChangeStmt.ExecContext(ctx, partition, offset, userID) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) - - return err + dbData.Pk, + dbData) } func (s *staleDeviceListsStatements) SelectUserIDsWithStaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) { diff --git a/mediaapi/storage/cosmosdb/media_repository_table.go b/mediaapi/storage/cosmosdb/media_repository_table.go index 8e9f4e0d3..e8939dcc2 100644 --- a/mediaapi/storage/cosmosdb/media_repository_table.go +++ b/mediaapi/storage/cosmosdb/media_repository_table.go @@ -66,12 +66,7 @@ type MediaRepositoryCosmos struct { } type MediaRepositoryCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument MediaRepository MediaRepositoryCosmos `json:"mx_mediaapi_media_repository"` } @@ -173,11 +168,7 @@ func (s *mediaStatements) insertMedia( } dbData := &MediaRepositoryCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), MediaRepository: data, } diff --git a/mediaapi/storage/cosmosdb/thumbnail_table.go b/mediaapi/storage/cosmosdb/thumbnail_table.go index ff7e0c7c8..a62a9b34b 100644 --- a/mediaapi/storage/cosmosdb/thumbnail_table.go +++ b/mediaapi/storage/cosmosdb/thumbnail_table.go @@ -55,12 +55,7 @@ type ThumbnailCosmos struct { } type ThumbnailCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument Thumbnail ThumbnailCosmos `json:"mx_mediaapi_thumbnail"` } @@ -149,7 +144,7 @@ func (s *thumbnailStatements) insertThumbnail( var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) // CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_thumbnail_index ON mediaapi_thumbnail (media_id, media_origin, width, height, resize_method); - docId := fmt.Sprintf("%s_%s_%d_%d_s", + docId := fmt.Sprintf("%s_%s_%d_%d_%s", thumbnailMetadata.MediaMetadata.MediaID, thumbnailMetadata.MediaMetadata.Origin, thumbnailMetadata.ThumbnailSize.Width, @@ -183,22 +178,17 @@ func (s *thumbnailStatements) insertThumbnail( } dbData := &ThumbnailCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Thumbnail: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Thumbnail: data, } - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) + var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( ctx, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, &dbData, options) - return err } @@ -225,7 +215,7 @@ func (s *thumbnailStatements) selectThumbnail( var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) // CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_thumbnail_index ON mediaapi_thumbnail (media_id, media_origin, width, height, resize_method); - docId := fmt.Sprintf("%s_%s_%d_%d_s", + docId := fmt.Sprintf("%s_%s_%d_%d_%s", mediaID, mediaOrigin, width, diff --git a/roomserver/storage/cosmosdb/event_json_table.go b/roomserver/storage/cosmosdb/event_json_table.go index 266bde003..517dbde6c 100644 --- a/roomserver/storage/cosmosdb/event_json_table.go +++ b/roomserver/storage/cosmosdb/event_json_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -40,12 +39,7 @@ type EventJSONCosmos struct { } type EventJSONCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument EventJSON EventJSONCosmos `json:"mx_roomserver_event_json"` } @@ -71,6 +65,23 @@ type eventJSONStatements struct { tableName string } +func getEventJSON(s *eventJSONStatements, ctx context.Context, pk string, docId string) (*EventJSONCosmosData, error) { + response := EventJSONCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryEventJSON(s *eventJSONStatements, ctx context.Context, qry string, params map[string]interface{}) ([]EventJSONCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -121,30 +132,29 @@ func (s *eventJSONStatements) InsertEventJSON( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := EventJSONCosmos{ - EventNID: int64(eventNID), - EventJSON: eventJSON, - } + dbData, _ := getEventJSON(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.EventJSON.EventJSON = eventJSON + } else { + data := EventJSONCosmos{ + EventNID: int64(eventNID), + EventJSON: eventJSON, + } - var dbData = EventJSONCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - EventJSON: data, + dbData = &EventJSONCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + EventJSON: data, + } } //Insert OR Replace - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return err + dbData.Pk, + dbData) } func (s *eventJSONStatements) BulkSelectEventJSON( diff --git a/roomserver/storage/cosmosdb/event_state_keys_table.go b/roomserver/storage/cosmosdb/event_state_keys_table.go index 7b35cb61e..bdb363e6a 100644 --- a/roomserver/storage/cosmosdb/event_state_keys_table.go +++ b/roomserver/storage/cosmosdb/event_state_keys_table.go @@ -18,7 +18,6 @@ package cosmosdb import ( "context" "database/sql" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -44,12 +43,7 @@ type EventStateKeysCosmos struct { } type EventStateKeysCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument EventStateKeys EventStateKeysCosmos `json:"mx_roomserver_event_state_keys"` } @@ -163,11 +157,7 @@ func ensureEventStateKeys(s *eventStateKeyStatements, ctx context.Context) { // event_state_key_nid INTEGER PRIMARY KEY AUTOINCREMENT, dbData := EventStateKeysCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), EventStateKeys: data, } @@ -175,13 +165,12 @@ func ensureEventStateKeys(s *eventStateKeyStatements, ctx context.Context) { } func insertEventStateKeyCore(s *eventStateKeyStatements, ctx context.Context, dbData EventStateKeysCosmosData) error { - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err := cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) + dbData.Pk, + dbData) if err != nil { return err @@ -223,14 +212,11 @@ func (s *eventStateKeyStatements) InsertEventStateKeyNID( // event_state_key_nid INTEGER PRIMARY KEY AUTOINCREMENT, dbData = EventStateKeysCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), EventStateKeys: data, } } else { + dbData.SetUpdateTime() dbData.EventStateKeys = existing.EventStateKeys } diff --git a/roomserver/storage/cosmosdb/event_types_table.go b/roomserver/storage/cosmosdb/event_types_table.go index 69f7792c9..fffe170fa 100644 --- a/roomserver/storage/cosmosdb/event_types_table.go +++ b/roomserver/storage/cosmosdb/event_types_table.go @@ -18,7 +18,6 @@ package cosmosdb import ( "context" "database/sql" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -44,12 +43,7 @@ import ( // ` type EventTypeCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument EventType EventTypeCosmos `json:"mx_roomserver_event_type"` } @@ -172,12 +166,8 @@ func insertEventTypeCore(s *eventTypeStatements, ctx context.Context, eventType pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = EventTypeCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - EventType: eventType, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + EventType: eventType, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/roomserver/storage/cosmosdb/events_table.go b/roomserver/storage/cosmosdb/events_table.go index 669d943ed..b8d5a6049 100644 --- a/roomserver/storage/cosmosdb/events_table.go +++ b/roomserver/storage/cosmosdb/events_table.go @@ -20,7 +20,6 @@ import ( "database/sql" "fmt" "sort" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -66,13 +65,8 @@ type EventCosmosMaxDepth struct { } type EventCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Event EventCosmos `json:"mx_roomserver_event"` + cosmosdbapi.CosmosDocument + Event EventCosmos `json:"mx_roomserver_event"` } // const insertEventSQL = ` @@ -377,12 +371,8 @@ func (s *eventStatements) InsertEvent( } dbData = &EventCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Event: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Event: data, } } else { modified := !isEventSame( @@ -410,17 +400,16 @@ func (s *eventStatements) InsertEvent( dbData.Event.ReferenceSha256 = referenceSHA256 dbData.Event.RoomNID = int64(roomNID) - dbData.Timestamp = time.Now().Unix() + dbData.SetUpdateTime() } // ON CONFLICT DO NOTHING; - Do Upsert - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err := cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + dbData) if err != nil { return 0, 0, err diff --git a/roomserver/storage/cosmosdb/invite_table.go b/roomserver/storage/cosmosdb/invite_table.go index bdda430f4..d173768c6 100644 --- a/roomserver/storage/cosmosdb/invite_table.go +++ b/roomserver/storage/cosmosdb/invite_table.go @@ -18,7 +18,6 @@ package cosmosdb import ( "context" "database/sql" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -51,13 +50,8 @@ type InviteCosmos struct { } type InviteCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Invite InviteCosmos `json:"mx_roomserver_invite"` + cosmosdbapi.CosmosDocument + Invite InviteCosmos `json:"mx_roomserver_invite"` } // const insertInviteEventSQL = "" + @@ -191,12 +185,8 @@ func (s *inviteStatements) InsertInviteEvent( pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = InviteCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Invite: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Invite: data, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/roomserver/storage/cosmosdb/membership_table.go b/roomserver/storage/cosmosdb/membership_table.go index d07eb8da3..e5637a930 100644 --- a/roomserver/storage/cosmosdb/membership_table.go +++ b/roomserver/storage/cosmosdb/membership_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -53,12 +52,7 @@ type MembershipCosmos struct { } type MembershipCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument Membership MembershipCosmos `json:"mx_roomserver_membership"` } @@ -303,6 +297,7 @@ func (s *membershipStatements) InsertMembership( exists.Membership.RoomNID = int64(roomNID) exists.Membership.TargetNID = int64(targetUserNID) exists.Membership.TargetLocal = localTarget + exists.SetUpdateTime() _, errSet := setMembership(s, ctx, *exists) return errSet } @@ -318,23 +313,18 @@ func (s *membershipStatements) InsertMembership( } var dbData = MembershipCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Membership: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Membership: data, } // " ON CONFLICT DO NOTHING" - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) + var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( ctx, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, &dbData, options) - return err } diff --git a/roomserver/storage/cosmosdb/previous_events_table.go b/roomserver/storage/cosmosdb/previous_events_table.go index a9fd1340d..4fb9422a9 100644 --- a/roomserver/storage/cosmosdb/previous_events_table.go +++ b/roomserver/storage/cosmosdb/previous_events_table.go @@ -20,7 +20,6 @@ import ( "database/sql" "fmt" "strings" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -51,12 +50,7 @@ type PreviousEventCosmos struct { } type PreviousEventCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument PreviousEvent PreviousEventCosmos `json:"mx_roomserver_previous_event"` } @@ -160,15 +154,12 @@ func (s *previousEventStatements) InsertPreviousEvent( } dbData = PreviousEventCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - PreviousEvent: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + PreviousEvent: data, } } else { dbData = *existing + dbData.SetUpdateTime() } var nids []string @@ -188,15 +179,12 @@ func (s *previousEventStatements) InsertPreviousEvent( // (previous_event_id, previous_reference_sha256, event_nids) // VALUES ($1, $2, $3) - var optionsReplace = cosmosdbapi.GetUpsertDocumentOptions(pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - optionsReplace, - ) - return err + dbData.Pk, + dbData) } // Check if the event reference exists diff --git a/roomserver/storage/cosmosdb/published_table.go b/roomserver/storage/cosmosdb/published_table.go index 8aebc3976..7a84ee696 100644 --- a/roomserver/storage/cosmosdb/published_table.go +++ b/roomserver/storage/cosmosdb/published_table.go @@ -17,7 +17,6 @@ package cosmosdb import ( "context" "database/sql" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -41,13 +40,8 @@ type PublishCosmos struct { } type PublishCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Publish PublishCosmos `json:"mx_roomserver_publish"` + cosmosdbapi.CosmosDocument + Publish PublishCosmos `json:"mx_roomserver_publish"` } // const upsertPublishedSQL = "" + @@ -137,30 +131,29 @@ func (s *publishedStatements) UpsertRoomPublished( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := PublishCosmos{ - RoomID: roomID, - Published: false, - } + dbData, _ := getPublish(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.Publish.Published = published + } else { + data := PublishCosmos{ + RoomID: roomID, + Published: false, + } - var dbData = PublishCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Publish: data, + dbData = &PublishCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Publish: data, + } } // "INSERT OR REPLACE INTO roomserver_published (room_id, published) VALUES ($1, $2)" - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return err + dbData.Pk, + dbData) } func (s *publishedStatements) SelectPublishedFromRoomID( diff --git a/roomserver/storage/cosmosdb/redactions_table.go b/roomserver/storage/cosmosdb/redactions_table.go index 59e7a1d06..5994129d5 100644 --- a/roomserver/storage/cosmosdb/redactions_table.go +++ b/roomserver/storage/cosmosdb/redactions_table.go @@ -17,7 +17,6 @@ package cosmosdb import ( "context" "database/sql" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -45,12 +44,7 @@ type RedactionCosmos struct { } type RedactionCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument Redaction RedactionCosmos `json:"mx_roomserver_redaction"` } @@ -165,12 +159,8 @@ func (s *redactionStatements) InsertRedaction( } var dbData = RedactionCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Redaction: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Redaction: data, } // "INSERT OR IGNORE INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + diff --git a/roomserver/storage/cosmosdb/room_aliases_table.go b/roomserver/storage/cosmosdb/room_aliases_table.go index 65e084e7b..815154f43 100644 --- a/roomserver/storage/cosmosdb/room_aliases_table.go +++ b/roomserver/storage/cosmosdb/room_aliases_table.go @@ -18,7 +18,6 @@ package cosmosdb import ( "context" "database/sql" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/roomserver/storage/tables" @@ -41,12 +40,7 @@ type RoomAliasCosmos struct { } type RoomAliasCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument RoomAlias RoomAliasCosmos `json:"mx_roomserver_room_alias"` } @@ -157,12 +151,8 @@ func (s *roomAliasesStatements) InsertRoomAlias( pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = RoomAliasCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - RoomAlias: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + RoomAlias: data, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/roomserver/storage/cosmosdb/rooms_table.go b/roomserver/storage/cosmosdb/rooms_table.go index e641b761a..9cf749b8c 100644 --- a/roomserver/storage/cosmosdb/rooms_table.go +++ b/roomserver/storage/cosmosdb/rooms_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -42,13 +41,8 @@ import ( // ` type RoomCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Room RoomCosmos `json:"mx_roomserver_room"` + cosmosdbapi.CosmosDocument + Room RoomCosmos `json:"mx_roomserver_room"` } type RoomCosmos struct { @@ -273,23 +267,21 @@ func (s *roomStatements) InsertRoomNID( } dbData = &RoomCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Room: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Room: data, } + } else { + dbData.SetUpdateTime() + dbData.Room.RoomVersion = string(roomVersion) } // ON CONFLICT DO NOTHING; - Do Upsert - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + dbData) if err != nil { return 0, fmt.Errorf("s.SelectRoomNID: %w", err) diff --git a/roomserver/storage/cosmosdb/state_block_table.go b/roomserver/storage/cosmosdb/state_block_table.go index eb4e1e57d..ac7b7f590 100644 --- a/roomserver/storage/cosmosdb/state_block_table.go +++ b/roomserver/storage/cosmosdb/state_block_table.go @@ -21,7 +21,6 @@ import ( "encoding/hex" "fmt" "sort" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -54,12 +53,7 @@ type StateBlockCosmosMaxNID struct { } type StateBlockCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument StateBlock StateBlockCosmos `json:"mx_roomserver_state_block"` } @@ -192,6 +186,7 @@ func (s *stateBlockStatements) BulkInsertStateData( if existing != nil { //if exists, just update and dont create a new seq existing.StateBlock.EventNIDs = ids + existing.SetUpdateTime() _, err = setStateBlock(s, ctx, *existing) if err != nil { return 0, err @@ -211,21 +206,16 @@ func (s *stateBlockStatements) BulkInsertStateData( } var dbData = StateBlockCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - StateBlock: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + StateBlock: data, } - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + dbData) return } diff --git a/roomserver/storage/cosmosdb/state_snapshot_table.go b/roomserver/storage/cosmosdb/state_snapshot_table.go index a1f7819eb..fb5801423 100644 --- a/roomserver/storage/cosmosdb/state_snapshot_table.go +++ b/roomserver/storage/cosmosdb/state_snapshot_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/roomserver/storage/tables" @@ -56,12 +55,7 @@ type StateSnapshotCosmos struct { } type StateSnapshotCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument StateSnapshot StateSnapshotCosmos `json:"mx_roomserver_state_snapshot"` } @@ -154,12 +148,8 @@ func (s *stateSnapshotStatements) InsertState( pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = StateSnapshotCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - StateSnapshot: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + StateSnapshot: data, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/roomserver/storage/cosmosdb/transactions_table.go b/roomserver/storage/cosmosdb/transactions_table.go index 08d99789e..8e67b3679 100644 --- a/roomserver/storage/cosmosdb/transactions_table.go +++ b/roomserver/storage/cosmosdb/transactions_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -44,12 +43,7 @@ type TransactionCosmos struct { } type TransactionCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument Transaction TransactionCosmos `json:"mx_roomserver_transaction"` } @@ -124,12 +118,8 @@ func (s *transactionStatements) InsertTransaction( pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = TransactionCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Transaction: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Transaction: data, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/signingkeyserver/storage/cosmosdb/server_key_table.go b/signingkeyserver/storage/cosmosdb/server_key_table.go index c2bec1199..22892ed1a 100644 --- a/signingkeyserver/storage/cosmosdb/server_key_table.go +++ b/signingkeyserver/storage/cosmosdb/server_key_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -62,12 +61,7 @@ type ServerKeyCosmos struct { } type ServerKeyCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument ServerKey ServerKeyCosmos `json:"mx_keydb_server_key"` } @@ -93,6 +87,23 @@ type serverKeyStatements struct { tableName string } +func getServerKey(s *serverKeyStatements, ctx context.Context, pk string, docId string) (*ServerKeyCosmosData, error) { + response := ServerKeyCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryServerKey(s *serverKeyStatements, ctx context.Context, qry string, params map[string]interface{}) ([]ServerKeyCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -208,24 +219,27 @@ func (s *serverKeyStatements) upsertServerKeys( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := ServerKeyCosmos{ - ServerName: string(request.ServerName), - ServerKeyID: string(request.KeyID), - ServerNameAndKeyID: nameAndKeyID(request), - ValidUntilTimestamp: int64(key.ValidUntilTS), - ExpiredTimestamp: int64(key.ExpiredTS), - ServerKey: key.Key.Encode(), - } + dbData, _ := getServerKey(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.ServerKey.ValidUntilTimestamp = int64(key.ValidUntilTS) + dbData.ServerKey.ExpiredTimestamp = int64(key.ExpiredTS) + dbData.ServerKey.ServerKey = key.Key.Encode() + } else { + data := ServerKeyCosmos{ + ServerName: string(request.ServerName), + ServerKeyID: string(request.KeyID), + ServerNameAndKeyID: nameAndKeyID(request), + ValidUntilTimestamp: int64(key.ValidUntilTS), + ExpiredTimestamp: int64(key.ExpiredTS), + ServerKey: key.Key.Encode(), + } - dbData := &ServerKeyCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - ServerKey: data, + dbData = &ServerKeyCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + ServerKey: data, + } } - // _, err := stmt.ExecContext( // ctx, // string(request.ServerName), @@ -236,15 +250,12 @@ func (s *serverKeyStatements) upsertServerKeys( // key.Key.Encode(), // ) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return err + dbData.Pk, + dbData) } func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string { diff --git a/syncapi/storage/cosmosdb/account_data_table.go b/syncapi/storage/cosmosdb/account_data_table.go index f916497cb..a401741c6 100644 --- a/syncapi/storage/cosmosdb/account_data_table.go +++ b/syncapi/storage/cosmosdb/account_data_table.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -52,12 +51,7 @@ type AccountDataTypeNumberCosmosData struct { } type AccountDataTypeCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument AccountDataType AccountDataTypeCosmos `json:"mx_syncapi_account_data_type"` } @@ -89,6 +83,23 @@ type accountDataStatements struct { tableName string } +func getAccountDataType(s *accountDataStatements, ctx context.Context, pk string, docId string) (*AccountDataTypeCosmosData, error) { + response := AccountDataTypeCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryAccountDataType(s *accountDataStatements, ctx context.Context, qry string, params map[string]interface{}) ([]AccountDataTypeCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -163,30 +174,31 @@ func (s *accountDataStatements) InsertAccountData( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := AccountDataTypeCosmos{ - ID: int64(pos), - UserID: userID, - RoomID: roomID, - DataType: dataType, - } + dbData, _ := getAccountDataType(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + dbData.AccountDataType.ID = int64(pos) + } else { + data := AccountDataTypeCosmos{ + ID: int64(pos), + UserID: userID, + RoomID: roomID, + DataType: dataType, + } - dbData := &AccountDataTypeCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - AccountDataType: data, + dbData = &AccountDataTypeCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + AccountDataType: data, + } } // _, err = sqlutil.TxStmt(txn, s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType, pos) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + dbData) return } diff --git a/syncapi/storage/cosmosdb/backwards_extremities_table.go b/syncapi/storage/cosmosdb/backwards_extremities_table.go index 86fa79716..944a043d7 100644 --- a/syncapi/storage/cosmosdb/backwards_extremities_table.go +++ b/syncapi/storage/cosmosdb/backwards_extremities_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -45,12 +44,7 @@ type BackwardExtremityCosmos struct { } type BackwardExtremityCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument BackwardExtremity BackwardExtremityCosmos `json:"mx_syncapi_backward_extremity"` } @@ -84,6 +78,23 @@ type backwardExtremitiesStatements struct { tableName string } +func getBackwardExtremity(s *backwardExtremitiesStatements, ctx context.Context, pk string, docId string) (*BackwardExtremityCosmosData, error) { + response := BackwardExtremityCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryBackwardExtremity(s *backwardExtremitiesStatements, ctx context.Context, qry string, params map[string]interface{}) ([]BackwardExtremityCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -147,28 +158,28 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := BackwardExtremityCosmos{ - EventID: eventID, - PrevEventID: prevEventID, - RoomID: roomID, + dbData, _ := getBackwardExtremity(s, ctx, pk, cosmosDocId) + if dbData != nil { + dbData.SetUpdateTime() + } else { + data := BackwardExtremityCosmos{ + EventID: eventID, + PrevEventID: prevEventID, + RoomID: roomID, + } + + dbData = &BackwardExtremityCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + BackwardExtremity: data, + } } - dbData := &BackwardExtremityCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - BackwardExtremity: data, - } - - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + dbData) return } diff --git a/syncapi/storage/cosmosdb/current_room_state_table.go b/syncapi/storage/cosmosdb/current_room_state_table.go index 2ea5ee040..eea182cac 100644 --- a/syncapi/storage/cosmosdb/current_room_state_table.go +++ b/syncapi/storage/cosmosdb/current_room_state_table.go @@ -20,7 +20,6 @@ import ( "database/sql" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -66,12 +65,7 @@ type CurrentRoomStateCosmos struct { } type CurrentRoomStateCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument CurrentRoomState CurrentRoomStateCosmos `json:"mx_syncapi_current_room_state"` } @@ -424,37 +418,42 @@ func (s *currentRoomStateStatements) UpsertRoomState( membershipData = *membership } - data := CurrentRoomStateCosmos{ - RoomID: event.RoomID(), - EventID: event.EventID(), - Type: event.Type(), - Sender: event.Sender(), - ContainsUrl: containsURL, - StateKey: *event.StateKey(), - HeaderedEventJSON: headeredJSON, - Membership: membershipData, - AddedAt: int64(addedAt), - } + dbData, _ := getEvent(s, ctx, pk, cosmosDocId) + if dbData != nil { + // " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9" + dbData.SetUpdateTime() + dbData.CurrentRoomState.EventID = event.EventID() + dbData.CurrentRoomState.Sender = event.Sender() + dbData.CurrentRoomState.ContainsUrl = containsURL + dbData.CurrentRoomState.HeaderedEventJSON = headeredJSON + dbData.CurrentRoomState.Membership = membershipData + dbData.CurrentRoomState.AddedAt = int64(addedAt) + } else { + data := CurrentRoomStateCosmos{ + RoomID: event.RoomID(), + EventID: event.EventID(), + Type: event.Type(), + Sender: event.Sender(), + ContainsUrl: containsURL, + StateKey: *event.StateKey(), + HeaderedEventJSON: headeredJSON, + Membership: membershipData, + AddedAt: int64(addedAt), + } - dbData := &CurrentRoomStateCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - CurrentRoomState: data, + dbData = &CurrentRoomStateCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + CurrentRoomState: data, + } } // _, err = sqlutil.TxStmt(txn, s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType, pos) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return err + dbData.Pk, + dbData) } func minOfInts(a, b int) int { diff --git a/syncapi/storage/cosmosdb/filter_table.go b/syncapi/storage/cosmosdb/filter_table.go index 5d67a3992..a9fec490f 100644 --- a/syncapi/storage/cosmosdb/filter_table.go +++ b/syncapi/storage/cosmosdb/filter_table.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -49,13 +48,8 @@ type FilterCosmos struct { } type FilterCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Filter FilterCosmos `json:"mx_syncapi_filter"` + cosmosdbapi.CosmosDocument + Filter FilterCosmos `json:"mx_syncapi_filter"` } // const selectFilterSQL = "" + @@ -235,12 +229,8 @@ func (s *filterStatements) InsertFilter( var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = FilterCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Filter: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Filter: data, } var optionsCreate = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/syncapi/storage/cosmosdb/invites_table.go b/syncapi/storage/cosmosdb/invites_table.go index 656a5f878..2dc4188ad 100644 --- a/syncapi/storage/cosmosdb/invites_table.go +++ b/syncapi/storage/cosmosdb/invites_table.go @@ -20,7 +20,6 @@ import ( "database/sql" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -58,12 +57,7 @@ type InviteEventCosmosMaxNumber struct { } type InviteEventCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument InviteEvent InviteEventCosmos `json:"mx_syncapi_invite_event"` } @@ -228,12 +222,8 @@ func (s *inviteEventsStatements) InsertInviteEvent( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) var dbData = InviteEventCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - InviteEvent: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + InviteEvent: data, } var optionsCreate = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/syncapi/storage/cosmosdb/memberships_table.go b/syncapi/storage/cosmosdb/memberships_table.go index f7ff07432..105766da5 100644 --- a/syncapi/storage/cosmosdb/memberships_table.go +++ b/syncapi/storage/cosmosdb/memberships_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -62,12 +61,7 @@ type MembershipCosmos struct { } type MembershipCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument Membership MembershipCosmos `json:"mx_syncapi_membership"` } @@ -94,6 +88,23 @@ type membershipsStatements struct { tableName string } +func getMembership(s *membershipsStatements, ctx context.Context, pk string, docId string) (*MembershipCosmosData, error) { + response := MembershipCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryMembership(s *membershipsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]MembershipCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -147,39 +158,41 @@ func (s *membershipsStatements) UpsertMembership( // topologicalPos, // ) - data := MembershipCosmos{ - RoomID: event.RoomID(), - UserID: *event.StateKey(), - Membership: membership, - EventID: event.EventID(), - StreamPos: int64(streamPos), - TopologicalPos: int64(topologicalPos), - } - var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) // UNIQUE (room_id, user_id, membership) docId := fmt.Sprintf("%s_%s_%s", event.RoomID(), *event.StateKey(), membership) cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) - var dbData = MembershipCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Membership: data, + dbData, _ := getMembership(s, ctx, pk, cosmosDocId) + if dbData != nil { + // " DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6" + dbData.SetUpdateTime() + dbData.Membership.EventID = event.EventID() + dbData.Membership.StreamPos = int64(streamPos) + dbData.Membership.TopologicalPos = int64(topologicalPos) + } else { + data := MembershipCosmos{ + RoomID: event.RoomID(), + UserID: *event.StateKey(), + Membership: membership, + EventID: event.EventID(), + StreamPos: int64(streamPos), + TopologicalPos: int64(topologicalPos), + } + + dbData = &MembershipCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Membership: data, + } } - var optionsCreate = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - optionsCreate) - - return err + dbData.Pk, + dbData) } func (s *membershipsStatements) SelectMembership( diff --git a/syncapi/storage/cosmosdb/output_room_events_table.go b/syncapi/storage/cosmosdb/output_room_events_table.go index 88bd61d19..52760c322 100644 --- a/syncapi/storage/cosmosdb/output_room_events_table.go +++ b/syncapi/storage/cosmosdb/output_room_events_table.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "sort" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -73,12 +72,7 @@ type OutputRoomEventCosmosMaxNumber struct { } type OutputRoomEventCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument OutputRoomEvent OutputRoomEventCosmos `json:"mx_syncapi_output_room_event"` } @@ -495,11 +489,7 @@ func (s *outputRoomEventsStatements) InsertEvent( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) var dbData = OutputRoomEventCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), OutputRoomEvent: data, } diff --git a/syncapi/storage/cosmosdb/output_room_events_topology_table.go b/syncapi/storage/cosmosdb/output_room_events_topology_table.go index 89c4576c9..8a5c7ff83 100644 --- a/syncapi/storage/cosmosdb/output_room_events_topology_table.go +++ b/syncapi/storage/cosmosdb/output_room_events_topology_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/syncapi/storage/tables" @@ -48,12 +47,7 @@ type OutputRoomEventTopologyCosmos struct { } type OutputRoomEventTopologyCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument OutputRoomEventTopology OutputRoomEventTopologyCosmos `json:"mx_syncapi_output_room_event_topology"` } @@ -153,6 +147,23 @@ func queryOutputRoomEventTopology(s *outputRoomEventsTopologyStatements, ctx con return response, nil } +func getOutputRoomEventTopology(s *outputRoomEventsTopologyStatements, ctx context.Context, pk string, docId string) (*OutputRoomEventTopologyCosmosData, error) { + response := OutputRoomEventTopologyCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func deleteOutputRoomEventTopology(s *outputRoomEventsTopologyStatements, ctx context.Context, dbData OutputRoomEventTopologyCosmosData) error { var options = cosmosdbapi.GetDeleteDocumentOptions(dbData.Pk) var _, err = cosmosdbapi.GetClient(s.db.connection).DeleteDocument( @@ -198,35 +209,35 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := OutputRoomEventTopologyCosmos{ - EventID: event.EventID(), - TopologicalPosition: event.Depth(), - RoomID: event.RoomID(), - StreamPosition: int64(pos), + var err error + dbData, _ := getOutputRoomEventTopology(s, ctx, pk, cosmosDocId) + if dbData != nil { + // " ON CONFLICT DO NOTHING" + } else { + data := OutputRoomEventTopologyCosmos{ + EventID: event.EventID(), + TopologicalPosition: event.Depth(), + RoomID: event.RoomID(), + StreamPosition: int64(pos), + } + + dbData = &OutputRoomEventTopologyCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + OutputRoomEventTopology: data, + } + // _, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext( + // ctx, event.EventID(), event.Depth(), event.RoomID(), pos, + // ) + + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, + s.db.cosmosConfig.DatabaseName, + s.db.cosmosConfig.ContainerName, + dbData.Pk, + dbData) } - dbData := &OutputRoomEventTopologyCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - OutputRoomEventTopology: data, - } - - // _, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext( - // ctx, event.EventID(), event.Depth(), event.RoomID(), pos, - // ) - - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err := cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, - s.db.cosmosConfig.DatabaseName, - s.db.cosmosConfig.ContainerName, - &dbData, - options) - - return types.StreamPosition(event.Depth()), err + return types.StreamPosition(dbData.OutputRoomEventTopology.TopologicalPosition), err } func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( diff --git a/syncapi/storage/cosmosdb/peeks_table.go b/syncapi/storage/cosmosdb/peeks_table.go index 3edb48b8c..d08d94cae 100644 --- a/syncapi/storage/cosmosdb/peeks_table.go +++ b/syncapi/storage/cosmosdb/peeks_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -58,13 +57,8 @@ type PeekCosmosMaxNumber struct { } type PeekCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Peek PeekCosmos `json:"mx_syncapi_peek"` + cosmosdbapi.CosmosDocument + Peek PeekCosmos `json:"mx_syncapi_peek"` } // const insertPeekSQL = "" + @@ -163,6 +157,23 @@ func queryPeekMaxNumber(s *peekStatements, ctx context.Context, qry string, para return response, nil } +func getPeek(s *peekStatements, ctx context.Context, pk string, docId string) (*PeekCosmosData, error) { + response := PeekCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func setPeek(s *peekStatements, ctx context.Context, peek PeekCosmosData) (*PeekCosmosData, error) { var optionsReplace = cosmosdbapi.GetReplaceDocumentOptions(peek.Pk, peek.ETag) var _, _, ex = cosmosdbapi.GetClient(s.db.connection).ReplaceDocument( @@ -208,32 +219,34 @@ func (s *peekStatements) InsertPeek( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - data := PeekCosmos{ - ID: int64(streamPos), - RoomID: roomID, - UserID: userID, - DeviceID: deviceID, - } + dbData, _ := getPeek(s, ctx, pk, cosmosDocId) + if dbData != nil { + // " (id, room_id, user_id, device_id, creation_ts, deleted)" + + // " VALUES ($1, $2, $3, $4, $5, false)" + dbData.SetUpdateTime() + dbData.Peek.Deleted = false + } else { + data := PeekCosmos{ + ID: int64(streamPos), + RoomID: roomID, + UserID: userID, + DeviceID: deviceID, + } - dbData := &PeekCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - // nowMilli := time.Now().UnixNano() / int64(time.Millisecond) - Timestamp: time.Now().Unix(), - Peek: data, + dbData = &PeekCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Peek: data, + } } // _, err = sqlutil.TxStmt(txn, s.insertPeekStmt).ExecContext(ctx, streamPos, roomID, userID, deviceID, nowMilli) - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + err = cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - &dbData, - options) + dbData.Pk, + dbData) return } diff --git a/syncapi/storage/cosmosdb/receipt_table.go b/syncapi/storage/cosmosdb/receipt_table.go index beedfb906..1b2a03b83 100644 --- a/syncapi/storage/cosmosdb/receipt_table.go +++ b/syncapi/storage/cosmosdb/receipt_table.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -56,13 +55,8 @@ type ReceiptCosmosMaxNumber struct { } type ReceiptCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Receipt ReceiptCosmos `json:"mx_syncapi_receipt"` + cosmosdbapi.CosmosDocument + Receipt ReceiptCosmos `json:"mx_syncapi_receipt"` } // const upsertReceipt = "" + @@ -174,12 +168,8 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room cosmosDocId := cosmosdbapi.GetDocumentId(r.db.cosmosConfig.ContainerName, dbCollectionName, docId) var dbData = ReceiptCosmosData{ - Id: cosmosDocId, - Tn: r.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Receipt: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, r.db.cosmosConfig.TenantName, pk, cosmosDocId), + Receipt: data, } var optionsCreate = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/syncapi/storage/cosmosdb/send_to_device_table.go b/syncapi/storage/cosmosdb/send_to_device_table.go index 57fa5914f..7db89115e 100644 --- a/syncapi/storage/cosmosdb/send_to_device_table.go +++ b/syncapi/storage/cosmosdb/send_to_device_table.go @@ -19,7 +19,6 @@ import ( "database/sql" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/syncapi/storage/tables" @@ -53,12 +52,7 @@ type SendToDeviceCosmosMaxNumber struct { } type SendToDeviceCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument SendToDevice SendToDeviceCosmos `json:"mx_syncapi_send_to_device"` } @@ -200,12 +194,8 @@ func (s *sendToDeviceStatements) InsertSendToDeviceMessage( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) var dbData = SendToDeviceCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - SendToDevice: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + SendToDevice: data, } var optionsCreate = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/userapi/storage/accounts/cosmosdb/account_data_table.go b/userapi/storage/accounts/cosmosdb/account_data_table.go index 0e303cea5..40750dea0 100644 --- a/userapi/storage/accounts/cosmosdb/account_data_table.go +++ b/userapi/storage/accounts/cosmosdb/account_data_table.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" ) @@ -40,12 +39,7 @@ import ( // ` type AccountDataCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument AccountData AccountDataCosmos `json:"mx_userapi_accountdata"` } @@ -72,6 +66,23 @@ func (s *accountDataStatements) prepare(db *Database) (err error) { return } +func getAccountData(s *accountDataStatements, ctx context.Context, pk string, docId string) (*AccountDataCosmosData, error) { + response := AccountDataCosmosData{} + err := cosmosdbapi.GetDocumentOrNil( + s.db.connection, + s.db.cosmosConfig, + ctx, + pk, + docId, + &response) + + if response.Id == "" { + return nil, nil + } + + return &response, err +} + func queryAccountData(s *accountDataStatements, ctx context.Context, qry string, params map[string]interface{}) ([]AccountDataCosmosData, error) { var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) @@ -99,43 +110,43 @@ func (s *accountDataStatements) insertAccountData( // INSERT INTO account_data(localpart, room_id, type, content) VALUES($1, $2, $3, $4) // ON CONFLICT (localpart, room_id, type) DO UPDATE SET content = $4 - var result = AccountDataCosmos{ - LocalPart: localpart, - RoomId: roomID, - Type: dataType, - Content: content, - } - var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.db.accountDatas.tableName) id := "" if roomID == "" { - id = fmt.Sprintf("%s_%s", result.LocalPart, result.Type) + id = fmt.Sprintf("%s_%s", localpart, dataType) } else { - id = fmt.Sprintf("%s_%s_%s", result.LocalPart, result.RoomId, result.Type) + id = fmt.Sprintf("%s_%s_%s", localpart, roomID, dataType) } docId := id cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) - var dbData = AccountDataCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - AccountData: result, + dbData, _ := getAccountData(s, ctx, pk, cosmosDocId) + if dbData != nil { + // ON CONFLICT (localpart, room_id, type) DO UPDATE SET content = $4 + dbData.SetUpdateTime() + dbData.AccountData.Content = content + } else { + var result = AccountDataCosmos{ + LocalPart: localpart, + RoomId: roomID, + Type: dataType, + Content: content, + } + + dbData = &AccountDataCosmosData{ + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + AccountData: result, + } } - var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) - var _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( - ctx, + return cosmosdbapi.UpsertDocument(ctx, + s.db.connection, s.db.cosmosConfig.DatabaseName, s.db.cosmosConfig.ContainerName, - dbData, - options) - - return err + dbData.Pk, + dbData) } func (s *accountDataStatements) selectAccountData( diff --git a/userapi/storage/accounts/cosmosdb/accounts_table.go b/userapi/storage/accounts/cosmosdb/accounts_table.go index 21aa52423..7ff922417 100644 --- a/userapi/storage/accounts/cosmosdb/accounts_table.go +++ b/userapi/storage/accounts/cosmosdb/accounts_table.go @@ -57,13 +57,8 @@ type AccountCosmos struct { } type AccountCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Account AccountCosmos `json:"mx_userapi_account"` + cosmosdbapi.CosmosDocument + Account AccountCosmos `json:"mx_userapi_account"` } type AccountCosmosUserCount struct { @@ -187,12 +182,8 @@ func (s *accountsStatements) insertAccount( pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = AccountCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Account: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Account: data, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/userapi/storage/accounts/cosmosdb/key_backup_table.go b/userapi/storage/accounts/cosmosdb/key_backup_table.go index b88f1835f..a6d968df1 100644 --- a/userapi/storage/accounts/cosmosdb/key_backup_table.go +++ b/userapi/storage/accounts/cosmosdb/key_backup_table.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/userapi/api" @@ -42,12 +41,7 @@ import ( // ` type KeyBackupCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument KeyBackup KeyBackupCosmos `json:"mx_userapi_account_e2e_room_keys"` } @@ -252,12 +246,8 @@ func (s *keyBackupStatements) insertBackupKey( } dbData := &KeyBackupCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - KeyBackup: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + KeyBackup: data, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/userapi/storage/accounts/cosmosdb/key_backup_version_table.go b/userapi/storage/accounts/cosmosdb/key_backup_version_table.go index fe24bbe23..d08358fcf 100644 --- a/userapi/storage/accounts/cosmosdb/key_backup_version_table.go +++ b/userapi/storage/accounts/cosmosdb/key_backup_version_table.go @@ -19,7 +19,6 @@ import ( "encoding/json" "fmt" "strconv" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -43,12 +42,7 @@ import ( // ` type KeyBackupVersionCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument KeyBackupVersion KeyBackupVersionCosmos `json:"mx_userapi_account_e2e_room_keys_versions"` } @@ -194,11 +188,7 @@ func (s *keyBackupVersionStatements) insertKeyBackup( } dbData := &KeyBackupVersionCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), KeyBackupVersion: data, } diff --git a/userapi/storage/accounts/cosmosdb/openid_table.go b/userapi/storage/accounts/cosmosdb/openid_table.go index c962a61f3..44a0a46d4 100644 --- a/userapi/storage/accounts/cosmosdb/openid_table.go +++ b/userapi/storage/accounts/cosmosdb/openid_table.go @@ -2,7 +2,6 @@ package cosmosdb import ( "context" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -30,12 +29,7 @@ type OpenIDTokenCosmos struct { } type OpenIdTokenCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` + cosmosdbapi.CosmosDocument OpenIdToken OpenIDTokenCosmos `json:"mx_userapi_openidtoken"` } @@ -114,12 +108,8 @@ func (s *tokenStatements) insertToken( pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = OpenIdTokenCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - OpenIdToken: mapToToken(*result), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + OpenIdToken: mapToToken(*result), } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/userapi/storage/accounts/cosmosdb/profile_table.go b/userapi/storage/accounts/cosmosdb/profile_table.go index 28f1b0a22..aa5bc0d74 100644 --- a/userapi/storage/accounts/cosmosdb/profile_table.go +++ b/userapi/storage/accounts/cosmosdb/profile_table.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" "github.com/matrix-org/dendrite/internal/cosmosdbutil" @@ -46,13 +45,8 @@ type ProfileCosmos struct { } type ProfileCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Profile ProfileCosmos `json:"mx_userapi_profile"` + cosmosdbapi.CosmosDocument + Profile ProfileCosmos `json:"mx_userapi_profile"` } type profilesStatements struct { @@ -155,12 +149,8 @@ func (s *profilesStatements) insertProfile( pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = ProfileCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Profile: mapToProfile(*result), + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Profile: mapToProfile(*result), } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/userapi/storage/accounts/cosmosdb/threepid_table.go b/userapi/storage/accounts/cosmosdb/threepid_table.go index 6e8fabb64..52d30cd26 100644 --- a/userapi/storage/accounts/cosmosdb/threepid_table.go +++ b/userapi/storage/accounts/cosmosdb/threepid_table.go @@ -17,7 +17,6 @@ package cosmosdb import ( "context" "fmt" - "time" "github.com/matrix-org/dendrite/internal/cosmosdbapi" @@ -44,13 +43,8 @@ type ThreePIDCosmos struct { } type ThreePIDCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - ThreePID ThreePIDCosmos `json:"mx_userapi_threepid"` + cosmosdbapi.CosmosDocument + ThreePID ThreePIDCosmos `json:"mx_userapi_threepid"` } type threepidStatements struct { @@ -163,12 +157,8 @@ func (s *threepidStatements) insertThreePID( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.TenantName, dbCollectionName) var dbData = ThreePIDCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - ThreePID: result, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + ThreePID: result, } var options = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk) diff --git a/userapi/storage/devices/cosmosdb/devices_table.go b/userapi/storage/devices/cosmosdb/devices_table.go index 60e247cbd..bfd8f8847 100644 --- a/userapi/storage/devices/cosmosdb/devices_table.go +++ b/userapi/storage/devices/cosmosdb/devices_table.go @@ -70,13 +70,8 @@ type DeviceCosmos struct { } type DeviceCosmosData struct { - Id string `json:"id"` - Pk string `json:"_pk"` - Tn string `json:"_sid"` - Cn string `json:"_cn"` - ETag string `json:"_etag"` - Timestamp int64 `json:"_ts"` - Device DeviceCosmos `json:"mx_userapi_device"` + cosmosdbapi.CosmosDocument + Device DeviceCosmos `json:"mx_userapi_device"` } type DeviceCosmosSessionCount struct { @@ -238,12 +233,8 @@ func (s *devicesStatements) insertDevice( cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.TenantName, dbCollectionName, docId) var dbData = DeviceCosmosData{ - Id: cosmosDocId, - Tn: s.db.cosmosConfig.TenantName, - Cn: dbCollectionName, - Pk: pk, - Timestamp: time.Now().Unix(), - Device: data, + CosmosDocument: cosmosdbapi.GenerateDocument(dbCollectionName, s.db.cosmosConfig.TenantName, pk, cosmosDocId), + Device: data, } var optionsCreate = cosmosdbapi.GetCreateDocumentOptions(dbData.Pk)