Make transactions table, prevent reprocessing of events

This commit is contained in:
Anant Prakash 2018-05-23 20:49:07 +05:30
parent 0919a5c109
commit 2d56033725
No known key found for this signature in database
GPG key ID: C5D399F626523045
4 changed files with 138 additions and 2 deletions

View file

@ -32,6 +32,7 @@ type RoomEventDatabase interface {
StoreEvent(
ctx context.Context,
event gomatrixserverlib.Event,
txnAndDeviceID *api.TransactionID,
authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, error)
// Look up the state entries for a list of string event IDs
@ -61,6 +62,13 @@ type RoomEventDatabase interface {
MembershipUpdater(
ctx context.Context, roomID, targerUserID string,
) (types.MembershipUpdater, error)
// Look up event ID by transaction's info.
// This is used to determine if the room event is processed/processing already.
// Returns an empty string if no such event exists.
GetTransactionEventID(
ctx context.Context, transactionID string,
deviceID string, userID string,
) (string, error)
}
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
@ -89,8 +97,23 @@ func processRoomEvent(
return err
}
if input.TransactionID != nil {
var eventID string
tdID := input.TransactionID
eventID, err = db.GetTransactionEventID(
ctx, tdID.TransactionID, tdID.DeviceID, input.Event.Sender(),
)
if err != nil {
return err
}
if eventID != "" {
return nil
}
}
// Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, authEventNIDs)
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil {
return err
}

View file

@ -30,6 +30,7 @@ type statements struct {
roomAliasesStatements
inviteStatements
membershipStatements
transactionStatements
}
func (s *statements) prepare(db *sql.DB) error {
@ -47,6 +48,7 @@ func (s *statements) prepare(db *sql.DB) error {
s.roomAliasesStatements.prepare,
s.inviteStatements.prepare,
s.membershipStatements.prepare,
s.transactionStatements.prepare,
} {
if err = prepare(db); err != nil {
return err

View file

@ -20,6 +20,7 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -45,7 +46,8 @@ func Open(dataSourceName string) (*Database, error) {
// StoreEvent implements input.EventDatabase
func (d *Database) StoreEvent(
ctx context.Context, event gomatrixserverlib.Event, authEventNIDs []types.EventNID,
ctx context.Context, event gomatrixserverlib.Event,
txnAndDeviceID *api.TransactionID, authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, error) {
var (
roomNID types.RoomNID
@ -56,6 +58,15 @@ func (d *Database) StoreEvent(
err error
)
if txnAndDeviceID != nil {
if err = d.statements.insertTransaction(
ctx, txnAndDeviceID.TransactionID,
txnAndDeviceID.DeviceID, event.Sender(), event.EventID(),
); err != nil {
return 0, types.StateAtEvent{}, err
}
}
if roomNID, err = d.assignRoomNID(ctx, nil, event.RoomID()); err != nil {
return 0, types.StateAtEvent{}, err
}
@ -308,6 +319,18 @@ func (d *Database) GetLatestEventsForUpdate(
}, nil
}
// GetTransactionEventID implements input.EventDatabase
func (d *Database) GetTransactionEventID(
ctx context.Context, transactionID string,
deviceID string, userID string,
) (string, error) {
eventID, err := d.statements.selectTransactionEventID(ctx, transactionID, deviceID, userID)
if err == sql.ErrNoRows {
return "", nil
}
return eventID, err
}
type roomRecentEventsUpdater struct {
transaction
d *Database

View file

@ -0,0 +1,88 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
)
const transactionsSchema = `
-- The transactions table holds transaction IDs with sender's info and event ID it belongs to.
-- This table is used by roomserver to prevent reprocessing of events.
CREATE TABLE IF NOT EXISTS roomserver_transactions (
-- The transaction ID of the event.
transaction_id TEXT NOT NULL,
-- The device ID of the originating transaction.
device_id TEXT NOT NULL,
-- User ID of the sender who authored the event
user_id TEXT NOT NULL,
-- Event ID corresponding to the transaction
-- Required to return event ID to client on a duplicate request.
event_id TEXT NOT NULL,
-- A transaction ID is unique for a user and device
-- This automatically creates an index.
CONSTRAINT roomserver_transaction_unique PRIMARY KEY (transaction_id, device_id, user_id)
);
`
const insertTransactionSQL = "" +
"INSERT INTO roomserver_transactions (transaction_id, device_id, user_id, event_id)" +
" VALUES ($1, $2, $3, $4)" +
" ON CONFLICT ON CONSTRAINT roomserver_transaction_unique" +
" DO NOTHING"
const selectTransactionEventIDSQL = "" +
"SELECT event_id FROM roomserver_transactions" +
" WHERE transaction_id = $1 AND device_id = $2 AND user_id = $3"
type transactionStatements struct {
insertTransactionStmt *sql.Stmt
selectTransactionEventIDStmt *sql.Stmt
}
func (s *transactionStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(transactionsSchema)
if err != nil {
return
}
return statementList{
{&s.insertTransactionStmt, insertTransactionSQL},
{&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL},
}.prepare(db)
}
func (s *transactionStatements) insertTransaction(
ctx context.Context,
transactionID string,
deviceID string,
userID string,
eventID string,
) (err error) {
_, err = s.insertTransactionStmt.ExecContext(
ctx, transactionID, deviceID, userID, eventID,
)
return
}
func (s *transactionStatements) selectTransactionEventID(
ctx context.Context,
transactionID string,
deviceID string,
userID string,
) (eventID string, err error) {
err = s.selectTransactionEventIDStmt.QueryRowContext(
ctx, transactionID, deviceID, userID,
).Scan(&eventID)
return
}