Give read recipts their own database sequence

This commit is contained in:
Neil Alexander 2020-12-11 11:45:47 +00:00
parent c55361c1b8
commit c784781d72
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 104 additions and 4 deletions

View file

@ -0,0 +1,54 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"database/sql"
"fmt"
)
func UpFixSequences(tx *sql.Tx) error {
_, err := tx.Exec(`
-- We need to delete all of the existing receipts because the indexes
-- will be wrong, and we'll get primary key violations if we try to
-- reuse existing stream IDs from a different sequence.
DELETE FROM syncapi_receipts;
-- Use the new syncapi_receipts_id sequence.
CREATE SEQUENCE IF NOT EXISTS syncapi_receipt_id;
ALTER TABLE syncapi_receipts ALTER COLUMN id SET DEFAULT nextval('syncapi_receipt_id');
`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownFixSequences(tx *sql.Tx) error {
_, err := tx.Exec(`
-- We need to delete all of the existing receipts because the indexes
-- will be wrong, and we'll get primary key violations if we try to
-- reuse existing stream IDs from a different sequence.
DELETE FROM syncapi_receipts;
-- Revert back to using the syncapi_stream_id sequence.
DROP SEQUENCE IF EXISTS syncapi_receipt_id;
ALTER TABLE syncapi_receipts ALTER COLUMN id SET DEFAULT nextval('syncapi_stream_id');
`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -0,0 +1,28 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpFixSequences, DownFixSequences)
}
func LoadFixSequences(m *sqlutil.Migrations) {
m.AddMigration(UpFixSequences, DownFixSequences)
}

View file

@ -30,11 +30,12 @@ import (
) )
const receiptsSchema = ` const receiptsSchema = `
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; CREATE SEQUENCE IF NOT EXISTS syncapi_receipt_id;
-- Stores data about receipts -- Stores data about receipts
CREATE TABLE IF NOT EXISTS syncapi_receipts ( CREATE TABLE IF NOT EXISTS syncapi_receipts (
-- The ID -- The ID
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_receipt_id'),
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL, receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL, user_id TEXT NOT NULL,
@ -50,7 +51,7 @@ const upsertReceipt = "" +
" (room_id, receipt_type, user_id, event_id, receipt_ts)" + " (room_id, receipt_type, user_id, event_id, receipt_ts)" +
" VALUES ($1, $2, $3, $4, $5)" + " VALUES ($1, $2, $3, $4, $5)" +
" ON CONFLICT (room_id, receipt_type, user_id)" + " ON CONFLICT (room_id, receipt_type, user_id)" +
" DO UPDATE SET id = nextval('syncapi_stream_id'), event_id = $4, receipt_ts = $5" + " DO UPDATE SET id = nextval('syncapi_receipt_id'), event_id = $4, receipt_ts = $5" +
" RETURNING id" " RETURNING id"
const selectRoomReceipts = "" + const selectRoomReceipts = "" +

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/shared"
) )
@ -36,6 +37,7 @@ type SyncServerDatasource struct {
} }
// NewDatabase creates a new sync server database // NewDatabase creates a new sync server database
// nolint:gocyclo
func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource var d SyncServerDatasource
var err error var err error
@ -86,6 +88,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return nil, err
}
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Writer: d.writer, Writer: d.writer,

View file

@ -82,7 +82,7 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Re
// UpsertReceipt creates new user receipts // UpsertReceipt creates new user receipts
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
pos, err = r.streamIDStatements.nextStreamID(ctx, txn) pos, err = r.streamIDStatements.nextReceiptID(ctx, txn)
if err != nil { if err != nil {
return return
} }

View file

@ -56,3 +56,13 @@ func (s *streamIDStatements) nextStreamID(ctx context.Context, txn *sql.Tx) (pos
err = selectStmt.QueryRowContext(ctx, "global").Scan(&pos) err = selectStmt.QueryRowContext(ctx, "global").Scan(&pos)
return return
} }
func (s *streamIDStatements) nextReceiptID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "receipt"); err != nil {
return
}
err = selectStmt.QueryRowContext(ctx, "receipt").Scan(&pos)
return
}