Duplicate postgres code into sqlite for now just to stop build errors, will fix SQLite soon

This commit is contained in:
Neil Alexander 2020-06-30 13:31:56 +01:00
parent 339ea3d711
commit 2c3540d82b
6 changed files with 348 additions and 212 deletions

View file

@ -24,7 +24,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
const queueSchema = `
const queuePDUsSchema = `
CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
-- The transaction ID that was generated before persisting the event.
transaction_id TEXT NOT NULL,
@ -38,11 +38,11 @@ CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_event_id_idx
ON federationsender_queue (event_id, server_name);
`
const insertQueueSQL = "" +
const insertQueuePDUSQL = "" +
"INSERT INTO federationsender_queue (transaction_id, server_name, json_nid)" +
" VALUES ($1, $2, $3)"
const deleteQueueTransactionSQL = "" +
const deleteQueueTransactionPDUsSQL = "" +
"DELETE FROM federationsender_queue WHERE server_name = $1 AND transaction_id = $2"
const selectQueueNextTransactionIDSQL = "" +
@ -56,22 +56,22 @@ const selectQueuePDUsByTransactionSQL = "" +
" WHERE server_name = $1 AND transaction_id = $2" +
" LIMIT 50"
type queueStatements struct {
insertQueueStmt *sql.Stmt
deleteQueueTransactionStmt *sql.Stmt
type queuePDUsStatements struct {
insertQueuePDUStmt *sql.Stmt
deleteQueueTransactionPDUsStmt *sql.Stmt
selectQueueNextTransactionIDStmt *sql.Stmt
selectQueuePDUsByTransactionStmt *sql.Stmt
}
func (s *queueStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(queueSchema)
func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(queuePDUsSchema)
if err != nil {
return
}
if s.insertQueueStmt, err = db.Prepare(insertQueueSQL); err != nil {
if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil {
return
}
if s.deleteQueueTransactionStmt, err = db.Prepare(deleteQueueTransactionSQL); err != nil {
if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil {
return
}
if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
@ -83,14 +83,14 @@ func (s *queueStatements) prepare(db *sql.DB) (err error) {
return
}
func (s *queueStatements) insertQueuePDU(
func (s *queuePDUsStatements) insertQueuePDU(
ctx context.Context,
txn *sql.Tx,
transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName,
nid int64,
) error {
stmt := sqlutil.TxStmt(txn, s.insertQueueStmt)
stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
_, err := stmt.ExecContext(
ctx,
transactionID, // the transaction ID that we initially attempted
@ -100,17 +100,17 @@ func (s *queueStatements) insertQueuePDU(
return err
}
func (s *queueStatements) deleteQueueTransaction(
func (s *queuePDUsStatements) deleteQueueTransaction(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionStmt)
stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt)
_, err := stmt.ExecContext(ctx, serverName, transactionID)
return err
}
func (s *queueStatements) selectQueueNextTransactionID(
func (s *queuePDUsStatements) selectQueueNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName, sendType string,
) (string, error) {
var transactionID string
@ -119,7 +119,7 @@ func (s *queueStatements) selectQueueNextTransactionID(
return transactionID, err
}
func (s *queueStatements) selectQueuePDUs(
func (s *queuePDUsStatements) selectQueuePDUs(
ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int,
) ([]int64, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)

View file

@ -30,7 +30,7 @@ import (
type Database struct {
joinedHostsStatements
roomStatements
queueStatements
queuePDUsStatements
queueJSONStatements
sqlutil.PartitionOffsetStatements
db *sql.DB
@ -60,7 +60,7 @@ func (d *Database) prepare() error {
return err
}
if err = d.queueStatements.prepare(d.db); err != nil {
if err = d.queuePDUsStatements.prepare(d.db); err != nil {
return err
}

View file

@ -0,0 +1,113 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
const queueJSONSchema = `
-- The queue_retry_json table contains event contents that
-- we failed to send.
CREATE TABLE IF NOT EXISTS federationsender_queue_retry_json (
-- The JSON NID. This allows the federationsender_queue_retry table to
-- cross-reference to find the JSON blob.
json_nid BIGSERIAL,
-- The JSON body. Text so that we preserve UTF-8.
json_body TEXT NOT NULL
);
`
const insertJSONSQL = "" +
"INSERT INTO federationsender_queue_retry_json (json_body)" +
" VALUES ($1)" +
" ON CONFLICT DO NOTHING"
const deleteJSONSQL = "" +
"DELETE FROM federationsender_queue_retry_json WHERE json_nid = ANY($1)"
const selectJSONSQL = "" +
"SELECT json_nid, json_body FROM federationsender_queue_retry_json" +
" WHERE json_nid = ANY($1)"
type queueJSONStatements struct {
insertJSONStmt *sql.Stmt
deleteJSONStmt *sql.Stmt
selectJSONStmt *sql.Stmt
}
func (s *queueJSONStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(queueJSONSchema)
if err != nil {
return
}
if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil {
return
}
if s.deleteJSONStmt, err = db.Prepare(deleteJSONSQL); err != nil {
return
}
if s.selectJSONStmt, err = db.Prepare(selectJSONSQL); err != nil {
return
}
return
}
func (s *queueJSONStatements) insertQueueJSON(
ctx context.Context, txn *sql.Tx, json string,
) (int64, error) {
stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
res, err := stmt.ExecContext(ctx, json)
if err != nil {
return 0, err
}
lastid, err := res.LastInsertId()
return lastid, err
}
func (s *queueJSONStatements) deleteQueueJSON(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt)
_, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs))
return err
}
func (s *queueJSONStatements) selectJSON(
ctx context.Context, txn *sql.Tx, jsonNIDs []int64,
) (map[int64][]byte, error) {
blobs := map[int64][]byte{}
stmt := sqlutil.TxStmt(txn, s.selectJSONStmt)
rows, err := stmt.QueryContext(ctx, pq.Int64Array(jsonNIDs))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed")
for rows.Next() {
var nid int64
var blob []byte
if err = rows.Scan(&nid, &blob); err != nil {
return nil, err
}
blobs[nid] = blob
}
return blobs, err
}

View file

@ -0,0 +1,141 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const queuePDUsSchema = `
CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
-- The transaction ID that was generated before persisting the event.
transaction_id TEXT NOT NULL,
-- The domain part of the user ID the m.room.member event is for.
server_name TEXT NOT NULL,
-- The JSON NID from the federationsender_queue_json table.
json_nid BIGINT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_event_id_idx
ON federationsender_queue (event_id, server_name);
`
const insertQueuePDUSQL = "" +
"INSERT INTO federationsender_queue (transaction_id, server_name, json_nid)" +
" VALUES ($1, $2, $3)"
const deleteQueueTransactionPDUsSQL = "" +
"DELETE FROM federationsender_queue WHERE server_name = $1 AND transaction_id = $2"
const selectQueueNextTransactionIDSQL = "" +
"SELECT transaction_id FROM federationsender_queue" +
" WHERE server_name = $1" +
" ORDER BY transaction_id ASC" +
" LIMIT 1"
const selectQueuePDUsByTransactionSQL = "" +
"SELECT json_nid FROM federationsender_queue" +
" WHERE server_name = $1 AND transaction_id = $2" +
" LIMIT 50"
type queuePDUsStatements struct {
insertQueuePDUStmt *sql.Stmt
deleteQueueTransactionPDUsStmt *sql.Stmt
selectQueueNextTransactionIDStmt *sql.Stmt
selectQueuePDUsByTransactionStmt *sql.Stmt
}
func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(queuePDUsSchema)
if err != nil {
return
}
if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil {
return
}
if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil {
return
}
if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
return
}
if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
return
}
return
}
func (s *queuePDUsStatements) insertQueuePDU(
ctx context.Context,
txn *sql.Tx,
transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName,
nid int64,
) error {
stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
_, err := stmt.ExecContext(
ctx,
transactionID, // the transaction ID that we initially attempted
serverName, // destination server name
nid, // JSON blob NID
)
return err
}
func (s *queuePDUsStatements) deleteQueueTransaction(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt)
_, err := stmt.ExecContext(ctx, serverName, transactionID)
return err
}
func (s *queuePDUsStatements) selectQueueNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName, sendType string,
) (string, error) {
var transactionID string
stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
return transactionID, err
}
func (s *queuePDUsStatements) selectQueuePDUs(
ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int,
) ([]int64, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
rows, err := stmt.QueryContext(ctx, serverName, transactionID)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
var result []int64
for rows.Next() {
var nid int64
if err = rows.Scan(&nid); err != nil {
return nil, err
}
result = append(result, nid)
}
return result, rows.Err()
}

View file

@ -1,167 +0,0 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const queueRetrySchema = `
-- The queue_retry table contains events that we failed to
-- send to a destination host, such that we can try them again
-- later.
CREATE TABLE IF NOT EXISTS federationsender_queue_retry (
-- The string ID of the room.
transaction_id TEXT NOT NULL,
-- The event type: "pdu", "invite", "send_to_device".
send_type TEXT NOT NULL,
-- The event ID of the m.room.member join event.
event_id TEXT NOT NULL,
-- The origin server TS of the event.
origin_server_ts BIGINT NOT NULL,
-- The domain part of the user ID the m.room.member event is for.
server_name TEXT NOT NULL,
-- The JSON body.
json_body BYTEA NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_retry_event_id_idx
ON federationsender_queue_retry (event_id);
`
const insertRetrySQL = "" +
"INSERT INTO federationsender_queue_retry (transaction_id, send_type, event_id, origin_server_ts, server_name, json_body)" +
" VALUES ($1, $2, $3, $4, $5, $6)"
const deleteRetrySQL = "" +
"DELETE FROM federationsender_queue_retry WHERE event_id = ANY($1)"
const selectRetryNextTransactionIDSQL = "" +
"SELECT transaction_id FROM federationsender_queue_retry" +
" WHERE server_name = $1 AND send_type = $2" +
" ORDER BY transaction_id ASC" +
" LIMIT 1"
const selectRetryPDUsByTransactionSQL = "" +
"SELECT event_id, server_name, origin_server_ts, json_body FROM federationsender_queue_retry" +
" WHERE server_name = $1 AND send_type = $2 AND transaction_id = $3" +
" LIMIT 50"
type queueRetryStatements struct {
insertRetryStmt *sql.Stmt
deleteRetryStmt *sql.Stmt
selectRetryNextTransactionIDStmt *sql.Stmt
selectRetryPDUsByTransactionStmt *sql.Stmt
}
func (s *queueRetryStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(queueRetrySchema)
if err != nil {
return
}
if s.insertRetryStmt, err = db.Prepare(insertRetrySQL); err != nil {
return
}
if s.deleteRetryStmt, err = db.Prepare(deleteRetrySQL); err != nil {
return
}
if s.selectRetryNextTransactionIDStmt, err = db.Prepare(selectRetryNextTransactionIDSQL); err != nil {
return
}
if s.selectRetryPDUsByTransactionStmt, err = db.Prepare(selectRetryPDUsByTransactionSQL); err != nil {
return
}
return
}
func (s *queueRetryStatements) insertQueueRetry(
ctx context.Context,
txn *sql.Tx,
transactionID string,
sendtype string,
event gomatrixserverlib.Event,
serverName gomatrixserverlib.ServerName,
) error {
stmt := sqlutil.TxStmt(txn, s.insertRetryStmt)
_, err := stmt.ExecContext(
ctx,
transactionID, // the transaction ID that we initially attempted
sendtype, // either "pdu", "invite", "send_to_device"
event.EventID(), // the event ID
event.OriginServerTS(), // the event origin server TS
serverName, // destination server name
event.JSON(), // JSON body
)
return err
}
func (s *queueRetryStatements) deleteQueueRetry(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteRetryStmt)
_, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs))
return err
}
func (s *queueRetryStatements) selectRetryNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName, sendType string,
) (string, error) {
var transactionID string
stmt := sqlutil.TxStmt(txn, s.selectRetryNextTransactionIDStmt)
err := stmt.QueryRowContext(ctx, serverName, types.FailedEventTypePDU).Scan(&transactionID)
return transactionID, err
}
func (s *queueRetryStatements) selectQueueRetryPDUs(
ctx context.Context, txn *sql.Tx, serverName string, transactionID string,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectRetryPDUsByTransactionStmt)
rows, err := stmt.QueryContext(ctx, serverName, types.FailedEventTypePDU, transactionID)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "queueRetryFromStmt: rows.close() failed")
var result []*gomatrixserverlib.HeaderedEvent
for rows.Next() {
var transactionID, eventID string
var originServerTS int64
var jsonBody []byte
if err = rows.Scan(&transactionID, &eventID, &originServerTS, &jsonBody); err != nil {
return nil, err
}
var event gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(jsonBody, &event); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %w", err)
}
if event.EventID() != eventID {
return nil, fmt.Errorf("event ID %q doesn't match expected %q", event.EventID(), eventID)
}
result = append(result, &event)
}
return result, rows.Err()
}

View file

@ -18,6 +18,7 @@ package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"fmt"
_ "github.com/mattn/go-sqlite3"
@ -31,7 +32,8 @@ import (
type Database struct {
joinedHostsStatements
roomStatements
queueRetryStatements
queuePDUsStatements
queueJSONStatements
sqlutil.PartitionOffsetStatements
db *sql.DB
}
@ -64,7 +66,11 @@ func (d *Database) prepare() error {
return err
}
if err = d.queueRetryStatements.prepare(d.db); err != nil {
if err = d.queuePDUsStatements.prepare(d.db); err != nil {
return err
}
if err = d.queueJSONStatements.prepare(d.db); err != nil {
return err
}
@ -134,44 +140,87 @@ func (d *Database) GetJoinedHosts(
return d.selectJoinedHosts(ctx, roomID)
}
// GetFailedPDUs retrieves PDUs that we have failed to send on
// a specific destination queue.
func (d *Database) GetFailedPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
transactionID, err := d.selectRetryNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU)
// StoreJSON adds a JSON blob into the queue JSON table and returns
// a NID. The NID will then be used when inserting the per-destination
// metadata entries.
func (d *Database) StoreJSON(
ctx context.Context, js []byte,
) (int64, error) {
res, err := d.insertJSONStmt.ExecContext(ctx, js)
if err != nil {
return nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err)
return 0, fmt.Errorf("d.insertRetryJSONStmt: %w", err)
}
events, err := d.selectQueueRetryPDUs(ctx, nil, string(serverName), transactionID)
nid, err := res.LastInsertId()
if err != nil {
return nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err)
return 0, fmt.Errorf("res.LastInsertID: %w", err)
}
return events, nil
return nid, nil
}
// StoreFailedPDUs stores PDUs that we have failed to send on
// a specific destination queue.
func (d *Database) StoreFailedPDUs(
// AssociatePDUWithDestination creates an association that the
// destination queues will use to determine which JSON blobs to send
// to which servers.
func (d *Database) AssociatePDUWithDestination(
ctx context.Context,
transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName,
pdus []*gomatrixserverlib.HeaderedEvent,
nids []int64,
) error {
for _, pdu := range pdus {
if _, err := d.insertRetryStmt.ExecContext(
ctx,
string(transactionID), // transaction ID
types.FailedEventTypePDU, // type of event that was queued
pdu.EventID(), // event ID
pdu.OriginServerTS(), // event origin server TS
string(serverName), // destination server name
pdu.JSON(), // JSON body
for _, nid := range nids {
if err := d.insertQueuePDU(
ctx, // context
nil, // SQL transaction
transactionID, // transaction ID
serverName, // destination server name
nid, // NID from the federationsender_queue_json table
); err != nil {
return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err)
}
}
return nil
}
// GetNextTransactionPDUs retrieves events from the database for
// the next pending transaction, up to the limit specified.
func (d *Database) GetNextTransactionPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
limit int,
) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) {
transactionID, err := d.selectQueueNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU)
if err != nil {
return "", nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err)
}
nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit)
if err != nil {
return "", nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err)
}
blobs, err := d.selectJSON(ctx, nil, nids)
if err != nil {
return "", nil, fmt.Errorf("d.selectJSON: %w", err)
}
var events []*gomatrixserverlib.HeaderedEvent
for _, blob := range blobs {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(blob, &event); err != nil {
return "", nil, fmt.Errorf("json.Unmarshal: %w", err)
}
events = append(events, &event)
}
return gomatrixserverlib.TransactionID(transactionID), events, nil
}
// CleanTransactionPDUs cleans up all associated events for a
// given transaction. This is done when the transaction was sent
// successfully.
func (d *Database) CleanTransactionPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID,
) error {
return d.deleteQueueTransaction(ctx, nil, serverName, transactionID)
}