Latest events and membership updaters return cleanup lambdas

This commit is contained in:
Neil Alexander 2020-08-18 15:05:03 +01:00
parent 93f996bda3
commit 9959817d31
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 57 additions and 76 deletions

View file

@ -21,7 +21,6 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
@ -54,17 +53,11 @@ func (r *RoomserverInternalAPI) updateLatestEvents(
sendAsServer string, sendAsServer string,
transactionID *api.TransactionID, transactionID *api.TransactionID,
) (err error) { ) (err error) {
updater, err := r.DB.GetLatestEventsForUpdate(ctx, roomNID) updater, cleanup, err := r.DB.GetLatestEventsForUpdate(ctx, roomNID)
if err != nil { if err != nil {
return return
} }
succeeded := false defer cleanup() // nolint:errcheck
defer func() {
txerr := sqlutil.EndTransaction(updater, &succeeded)
if err == nil && txerr != nil {
err = txerr
}
}()
u := latestEventsUpdater{ u := latestEventsUpdater{
ctx: ctx, ctx: ctx,
@ -81,7 +74,6 @@ func (r *RoomserverInternalAPI) updateLatestEvents(
return err return err
} }
succeeded = true
return return
} }

View file

@ -112,10 +112,11 @@ func (r *RoomserverInternalAPI) updateMembership(
return updates, nil return updates, nil
} }
mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add)) mu, cleanup, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add))
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer cleanup() // nolint:errcheck
switch newMembership { switch newMembership {
case gomatrixserverlib.Invite: case gomatrixserverlib.Invite:

View file

@ -161,10 +161,11 @@ func (r *RoomserverInternalAPI) PerformInvite(
// The invite originated over federation. Process the membership // The invite originated over federation. Process the membership
// update, which will notify the sync API etc about the incoming // update, which will notify the sync API etc about the incoming
// invite. // invite.
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion) updater, cleanup, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion)
if err != nil { if err != nil {
return fmt.Errorf("r.DB.MembershipUpdater: %w", err) return fmt.Errorf("r.DB.MembershipUpdater: %w", err)
} }
defer cleanup() // nolint:errcheck
unwrapped := event.Unwrap() unwrapped := event.Unwrap()
outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion) outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion)

View file

@ -86,7 +86,7 @@ type Database interface {
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error. // The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
// Returns the latest events in the room and the last eventID sent to the log along with an updater. // Returns the latest events in the room and the last eventID sent to the log along with an updater.
// If this returns an error then no further action is required. // If this returns an error then no further action is required.
GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error) GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, func() error, error)
// Look up event ID by transaction's info. // Look up event ID by transaction's info.
// This is used to determine if the room event is processed/processing already. // This is used to determine if the room event is processed/processing already.
// Returns an empty string if no such event exists. // Returns an empty string if no such event exists.
@ -123,7 +123,7 @@ type Database interface {
// Returns an error if there was a problem talking to the database. // Returns an error if there was a problem talking to the database.
RemoveRoomAlias(ctx context.Context, alias string) error RemoveRoomAlias(ctx context.Context, alias string) error
// Build a membership updater for the target user in a room. // Build a membership updater for the target user in a room.
MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error) MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, func() error, error)
// Lookup the membership of a given user in a given room. // Lookup the membership of a given user in a given room.
// Returns the numeric ID of the latest membership event sent from this user // Returns the numeric ID of the latest membership event sent from this user
// in this room, along a boolean set to true if the user is still in this room, // in this room, along a boolean set to true if the user is still in this room,

View file

@ -20,11 +20,10 @@ type membershipUpdater struct {
func NewMembershipUpdater( func NewMembershipUpdater(
ctx context.Context, d *Database, roomID, targetUserID string, ctx context.Context, d *Database, roomID, targetUserID string,
targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion,
useTxns bool, ) (types.MembershipUpdater, func() error, error) {
) (types.MembershipUpdater, error) {
txn, err := d.DB.Begin() txn, err := d.DB.Begin()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
succeeded := false succeeded := false
defer func() { defer func() {
@ -35,25 +34,21 @@ func NewMembershipUpdater(
roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion) roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
targetUserNID, err := d.assignStateKeyNID(ctx, txn, targetUserID) targetUserNID, err := d.assignStateKeyNID(ctx, txn, targetUserID)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) updater, cleanup, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
succeeded = true succeeded = true
if !useTxns { return updater, cleanup, nil
txn.Commit() // nolint: errcheck
updater.transaction.txn = nil
}
return updater, nil
} }
func (d *Database) membershipUpdaterTxn( func (d *Database) membershipUpdaterTxn(
@ -62,20 +57,20 @@ func (d *Database) membershipUpdaterTxn(
roomNID types.RoomNID, roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID, targetUserNID types.EventStateKeyNID,
targetLocal bool, targetLocal bool,
) (*membershipUpdater, error) { ) (*membershipUpdater, func() error, error) {
if err := d.MembershipTable.InsertMembership(ctx, txn, roomNID, targetUserNID, targetLocal); err != nil { if err := d.MembershipTable.InsertMembership(ctx, txn, roomNID, targetUserNID, targetLocal); err != nil {
return nil, err return nil, nil, err
} }
membership, err := d.MembershipTable.SelectMembershipForUpdate(ctx, txn, roomNID, targetUserNID) membership, err := d.MembershipTable.SelectMembershipForUpdate(ctx, txn, roomNID, targetUserNID)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return &membershipUpdater{ return &membershipUpdater{
transaction{ctx, txn}, d, roomNID, targetUserNID, membership, transaction{ctx, txn}, d, roomNID, targetUserNID, membership,
}, nil }, func() error { return txn.Commit() }, nil
} }
// IsInvite implements types.MembershipUpdater // IsInvite implements types.MembershipUpdater

View file

@ -18,16 +18,16 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID currentStateSnapshotNID types.StateSnapshotNID
} }
func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID, useTxns bool) (types.RoomRecentEventsUpdater, error) { func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, func() error, error) {
txn, err := d.DB.Begin() txn, err := d.DB.Begin()
if err != nil { if err != nil {
return nil, fmt.Errorf("d.DB.Begin: %w", err) return nil, nil, fmt.Errorf("d.DB.Begin: %w", err)
} }
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomNID) d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomNID)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
txn.Rollback() // nolint: errcheck txn.Rollback() // nolint: errcheck
return nil, fmt.Errorf("d.RoomsTable.SelectLatestEventsNIDsForUpdate: %w", err) return nil, nil, fmt.Errorf("d.RoomsTable.SelectLatestEventsNIDsForUpdate: %w", err)
} }
var stateAndRefs []types.StateAtEventAndReference var stateAndRefs []types.StateAtEventAndReference
var lastEventIDSent string var lastEventIDSent string
@ -35,23 +35,19 @@ func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.
stateAndRefs, err = d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs) stateAndRefs, err = d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs)
if err != nil { if err != nil {
txn.Rollback() // nolint: errcheck txn.Rollback() // nolint: errcheck
return nil, fmt.Errorf("d.EventsTable.BulkSelectStateAtEventAndReference: %w", err) return nil, nil, fmt.Errorf("d.EventsTable.BulkSelectStateAtEventAndReference: %w", err)
} }
if lastEventNIDSent != 0 { if lastEventNIDSent != 0 {
lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent) lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent)
if err != nil { if err != nil {
txn.Rollback() // nolint: errcheck txn.Rollback() // nolint: errcheck
return nil, fmt.Errorf("d.EventsTable.SelectEventID: %w", err) return nil, nil, fmt.Errorf("d.EventsTable.SelectEventID: %w", err)
} }
} }
} }
if !useTxns {
txn.Commit() // nolint: errcheck
txn = nil
}
return &roomRecentEventsUpdater{ return &roomRecentEventsUpdater{
transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID, transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
}, nil }, func() error { return txn.Commit() }, nil
} }
// RoomVersion implements types.RoomRecentEventsUpdater // RoomVersion implements types.RoomRecentEventsUpdater
@ -119,6 +115,6 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error
return u.d.EventsTable.UpdateEventSentToOutput(u.ctx, u.txn, eventNID) return u.d.EventsTable.UpdateEventSentToOutput(u.ctx, u.txn, eventNID)
} }
func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (types.MembershipUpdater, error) { func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (types.MembershipUpdater, func() error, error) {
return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID, targetLocal) return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID, targetLocal)
} }

View file

@ -332,14 +332,14 @@ func (d *Database) GetTransactionEventID(
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,
targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion,
) (types.MembershipUpdater, error) { ) (types.MembershipUpdater, func() error, error) {
return NewMembershipUpdater(ctx, d, roomID, targetUserID, targetLocal, roomVersion, true) return NewMembershipUpdater(ctx, d, roomID, targetUserID, targetLocal, roomVersion)
} }
func (d *Database) GetLatestEventsForUpdate( func (d *Database) GetLatestEventsForUpdate(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
) (types.RoomRecentEventsUpdater, error) { ) (types.RoomRecentEventsUpdater, func() error, error) {
return NewRoomRecentEventsUpdater(d, ctx, roomNID, true) return NewRoomRecentEventsUpdater(d, ctx, roomNID)
} }
func (d *Database) StoreEvent( func (d *Database) StoreEvent(

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
@ -41,6 +42,7 @@ type Database struct {
invites tables.Invites invites tables.Invites
membership tables.Membership membership tables.Membership
db *sql.DB db *sql.DB
writer *sqlutil.TransactionWriter
} }
// Open a sqlite database. // Open a sqlite database.
@ -51,7 +53,7 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
writer := sqlutil.NewTransactionWriter() d.writer = sqlutil.NewTransactionWriter()
//d.db.Exec("PRAGMA journal_mode=WAL;") //d.db.Exec("PRAGMA journal_mode=WAL;")
//d.db.Exec("PRAGMA read_uncommitted = true;") //d.db.Exec("PRAGMA read_uncommitted = true;")
@ -61,59 +63,59 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
// which it will never obtain. // which it will never obtain.
d.db.SetMaxOpenConns(20) d.db.SetMaxOpenConns(20)
d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db, writer) d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.eventTypes, err = NewSqliteEventTypesTable(d.db, writer) d.eventTypes, err = NewSqliteEventTypesTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.eventJSON, err = NewSqliteEventJSONTable(d.db, writer) d.eventJSON, err = NewSqliteEventJSONTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.events, err = NewSqliteEventsTable(d.db, writer) d.events, err = NewSqliteEventsTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.rooms, err = NewSqliteRoomsTable(d.db, writer) d.rooms, err = NewSqliteRoomsTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.transactions, err = NewSqliteTransactionsTable(d.db, writer) d.transactions, err = NewSqliteTransactionsTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stateBlock, err := NewSqliteStateBlockTable(d.db, writer) stateBlock, err := NewSqliteStateBlockTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stateSnapshot, err := NewSqliteStateSnapshotTable(d.db, writer) stateSnapshot, err := NewSqliteStateSnapshotTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.prevEvents, err = NewSqlitePrevEventsTable(d.db, writer) d.prevEvents, err = NewSqlitePrevEventsTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
roomAliases, err := NewSqliteRoomAliasesTable(d.db, writer) roomAliases, err := NewSqliteRoomAliasesTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.invites, err = NewSqliteInvitesTable(d.db, writer) d.invites, err = NewSqliteInvitesTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.membership, err = NewSqliteMembershipTable(d.db, writer) d.membership, err = NewSqliteMembershipTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
published, err := NewSqlitePublishedTable(d.db, writer) published, err := NewSqlitePublishedTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
redactions, err := NewSqliteRedactionsTable(d.db, writer) redactions, err := NewSqliteRedactionsTable(d.db, d.writer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -137,27 +139,21 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
return &d, nil return &d, nil
} }
func (d *Database) GetLatestEventsForUpdate( func (d *Database) StoreEvent(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, event gomatrixserverlib.Event,
) (types.RoomRecentEventsUpdater, error) { txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID,
// TODO: Do not use transactions. We should be holding open this transaction but we cannot have ) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
// multiple write transactions on sqlite. The code will perform additional return d.Database.StoreEvent(ctx, event, txnAndSessionID, authEventNIDs)
// write transactions independent of this one which will consistently cause
// 'database is locked' errors. As sqlite doesn't support multi-process on the
// same DB anyway, and we only execute updates sequentially, the only worries
// are for rolling back when things go wrong. (atomicity)
return shared.NewRoomRecentEventsUpdater(&d.Database, ctx, roomNID, false)
} }
func (d *Database) MembershipUpdater( func (d *Database) GetLatestEventsForUpdate(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomNID types.RoomNID,
targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, ) (types.RoomRecentEventsUpdater, func() error, error) {
) (updater types.MembershipUpdater, err error) {
// TODO: Do not use transactions. We should be holding open this transaction but we cannot have // TODO: Do not use transactions. We should be holding open this transaction but we cannot have
// multiple write transactions on sqlite. The code will perform additional // multiple write transactions on sqlite. The code will perform additional
// write transactions independent of this one which will consistently cause // write transactions independent of this one which will consistently cause
// 'database is locked' errors. As sqlite doesn't support multi-process on the // 'database is locked' errors. As sqlite doesn't support multi-process on the
// same DB anyway, and we only execute updates sequentially, the only worries // same DB anyway, and we only execute updates sequentially, the only worries
// are for rolling back when things go wrong. (atomicity) // are for rolling back when things go wrong. (atomicity)
return shared.NewMembershipUpdater(ctx, &d.Database, roomID, targetUserID, targetLocal, roomVersion, false) return shared.NewRoomRecentEventsUpdater(&d.Database, ctx, roomNID)
} }

View file

@ -172,7 +172,7 @@ type RoomRecentEventsUpdater interface {
MarkEventAsSent(eventNID EventNID) error MarkEventAsSent(eventNID EventNID) error
// Build a membership updater for the target user in this room. // Build a membership updater for the target user in this room.
// It will share the same transaction as this updater. // It will share the same transaction as this updater.
MembershipUpdater(targetUserNID EventStateKeyNID, isTargetLocalUser bool) (MembershipUpdater, error) MembershipUpdater(targetUserNID EventStateKeyNID, isTargetLocalUser bool) (MembershipUpdater, func() error, error)
// Implements Transaction so it can be committed or rolledback // Implements Transaction so it can be committed or rolledback
sqlutil.Transaction sqlutil.Transaction
} }