From 9959817d315ecadb23fbecf01cfff81c25403baa Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 18 Aug 2020 15:05:03 +0100 Subject: [PATCH] Latest events and membership updaters return cleanup lambdas --- roomserver/internal/input_latest_events.go | 12 +--- roomserver/internal/input_membership.go | 3 +- roomserver/internal/perform_invite.go | 3 +- roomserver/storage/interface.go | 4 +- .../storage/shared/membership_updater.go | 27 ++++----- .../shared/room_recent_events_updater.go | 18 +++--- roomserver/storage/shared/storage.go | 8 +-- roomserver/storage/sqlite3/storage.go | 56 +++++++++---------- roomserver/types/types.go | 2 +- 9 files changed, 57 insertions(+), 76 deletions(-) diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index 66316ac4f..ef53ef670 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -21,7 +21,6 @@ import ( "context" "fmt" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" @@ -54,17 +53,11 @@ func (r *RoomserverInternalAPI) updateLatestEvents( sendAsServer string, transactionID *api.TransactionID, ) (err error) { - updater, err := r.DB.GetLatestEventsForUpdate(ctx, roomNID) + updater, cleanup, err := r.DB.GetLatestEventsForUpdate(ctx, roomNID) if err != nil { return } - succeeded := false - defer func() { - txerr := sqlutil.EndTransaction(updater, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() + defer cleanup() // nolint:errcheck u := latestEventsUpdater{ ctx: ctx, @@ -81,7 +74,6 @@ func (r *RoomserverInternalAPI) updateLatestEvents( return err } - succeeded = true return } diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input_membership.go index af0c7f8b3..71cebb442 100644 --- a/roomserver/internal/input_membership.go +++ b/roomserver/internal/input_membership.go @@ -112,10 +112,11 @@ func (r *RoomserverInternalAPI) updateMembership( return updates, nil } - mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add)) + mu, cleanup, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add)) if err != nil { return nil, err } + defer cleanup() // nolint:errcheck switch newMembership { case gomatrixserverlib.Invite: diff --git a/roomserver/internal/perform_invite.go b/roomserver/internal/perform_invite.go index aab3e8a8a..a9a3453f1 100644 --- a/roomserver/internal/perform_invite.go +++ b/roomserver/internal/perform_invite.go @@ -161,10 +161,11 @@ func (r *RoomserverInternalAPI) PerformInvite( // The invite originated over federation. Process the membership // update, which will notify the sync API etc about the incoming // invite. - updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion) + updater, cleanup, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion) if err != nil { return fmt.Errorf("r.DB.MembershipUpdater: %w", err) } + defer cleanup() // nolint:errcheck unwrapped := event.Unwrap() outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion) diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index afe5bcb1f..82d08844c 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -86,7 +86,7 @@ type Database interface { // The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error. // Returns the latest events in the room and the last eventID sent to the log along with an updater. // If this returns an error then no further action is required. - GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error) + GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, func() error, error) // Look up event ID by transaction's info. // This is used to determine if the room event is processed/processing already. // Returns an empty string if no such event exists. @@ -123,7 +123,7 @@ type Database interface { // Returns an error if there was a problem talking to the database. RemoveRoomAlias(ctx context.Context, alias string) error // Build a membership updater for the target user in a room. - MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error) + MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, func() error, error) // Lookup the membership of a given user in a given room. // Returns the numeric ID of the latest membership event sent from this user // in this room, along a boolean set to true if the user is still in this room, diff --git a/roomserver/storage/shared/membership_updater.go b/roomserver/storage/shared/membership_updater.go index 5ddf6d84d..4e5b7fbb3 100644 --- a/roomserver/storage/shared/membership_updater.go +++ b/roomserver/storage/shared/membership_updater.go @@ -20,11 +20,10 @@ type membershipUpdater struct { func NewMembershipUpdater( ctx context.Context, d *Database, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, - useTxns bool, -) (types.MembershipUpdater, error) { +) (types.MembershipUpdater, func() error, error) { txn, err := d.DB.Begin() if err != nil { - return nil, err + return nil, nil, err } succeeded := false defer func() { @@ -35,25 +34,21 @@ func NewMembershipUpdater( roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion) if err != nil { - return nil, err + return nil, nil, err } targetUserNID, err := d.assignStateKeyNID(ctx, txn, targetUserID) if err != nil { - return nil, err + return nil, nil, err } - updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) + updater, cleanup, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) if err != nil { - return nil, err + return nil, nil, err } succeeded = true - if !useTxns { - txn.Commit() // nolint: errcheck - updater.transaction.txn = nil - } - return updater, nil + return updater, cleanup, nil } func (d *Database) membershipUpdaterTxn( @@ -62,20 +57,20 @@ func (d *Database) membershipUpdaterTxn( roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, targetLocal bool, -) (*membershipUpdater, error) { +) (*membershipUpdater, func() error, error) { if err := d.MembershipTable.InsertMembership(ctx, txn, roomNID, targetUserNID, targetLocal); err != nil { - return nil, err + return nil, nil, err } membership, err := d.MembershipTable.SelectMembershipForUpdate(ctx, txn, roomNID, targetUserNID) if err != nil { - return nil, err + return nil, nil, err } return &membershipUpdater{ transaction{ctx, txn}, d, roomNID, targetUserNID, membership, - }, nil + }, func() error { return txn.Commit() }, nil } // IsInvite implements types.MembershipUpdater diff --git a/roomserver/storage/shared/room_recent_events_updater.go b/roomserver/storage/shared/room_recent_events_updater.go index 2798df037..14d59a017 100644 --- a/roomserver/storage/shared/room_recent_events_updater.go +++ b/roomserver/storage/shared/room_recent_events_updater.go @@ -18,16 +18,16 @@ type roomRecentEventsUpdater struct { currentStateSnapshotNID types.StateSnapshotNID } -func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID, useTxns bool) (types.RoomRecentEventsUpdater, error) { +func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, func() error, error) { txn, err := d.DB.Begin() if err != nil { - return nil, fmt.Errorf("d.DB.Begin: %w", err) + return nil, nil, fmt.Errorf("d.DB.Begin: %w", err) } eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomNID) if err != nil && err != sql.ErrNoRows { txn.Rollback() // nolint: errcheck - return nil, fmt.Errorf("d.RoomsTable.SelectLatestEventsNIDsForUpdate: %w", err) + return nil, nil, fmt.Errorf("d.RoomsTable.SelectLatestEventsNIDsForUpdate: %w", err) } var stateAndRefs []types.StateAtEventAndReference var lastEventIDSent string @@ -35,23 +35,19 @@ func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types. stateAndRefs, err = d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs) if err != nil { txn.Rollback() // nolint: errcheck - return nil, fmt.Errorf("d.EventsTable.BulkSelectStateAtEventAndReference: %w", err) + return nil, nil, fmt.Errorf("d.EventsTable.BulkSelectStateAtEventAndReference: %w", err) } if lastEventNIDSent != 0 { lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent) if err != nil { txn.Rollback() // nolint: errcheck - return nil, fmt.Errorf("d.EventsTable.SelectEventID: %w", err) + return nil, nil, fmt.Errorf("d.EventsTable.SelectEventID: %w", err) } } } - if !useTxns { - txn.Commit() // nolint: errcheck - txn = nil - } return &roomRecentEventsUpdater{ transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID, - }, nil + }, func() error { return txn.Commit() }, nil } // RoomVersion implements types.RoomRecentEventsUpdater @@ -119,6 +115,6 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error return u.d.EventsTable.UpdateEventSentToOutput(u.ctx, u.txn, eventNID) } -func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (types.MembershipUpdater, error) { +func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (types.MembershipUpdater, func() error, error) { return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID, targetLocal) } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index d0bd7f076..ad3c1f5e0 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -332,14 +332,14 @@ func (d *Database) GetTransactionEventID( func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, -) (types.MembershipUpdater, error) { - return NewMembershipUpdater(ctx, d, roomID, targetUserID, targetLocal, roomVersion, true) +) (types.MembershipUpdater, func() error, error) { + return NewMembershipUpdater(ctx, d, roomID, targetUserID, targetLocal, roomVersion) } func (d *Database) GetLatestEventsForUpdate( ctx context.Context, roomNID types.RoomNID, -) (types.RoomRecentEventsUpdater, error) { - return NewRoomRecentEventsUpdater(d, ctx, roomNID, true) +) (types.RoomRecentEventsUpdater, func() error, error) { + return NewRoomRecentEventsUpdater(d, ctx, roomNID) } func (d *Database) StoreEvent( diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index ae3140d7d..5b73c003c 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" @@ -41,6 +42,7 @@ type Database struct { invites tables.Invites membership tables.Membership db *sql.DB + writer *sqlutil.TransactionWriter } // Open a sqlite database. @@ -51,7 +53,7 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } - writer := sqlutil.NewTransactionWriter() + d.writer = sqlutil.NewTransactionWriter() //d.db.Exec("PRAGMA journal_mode=WAL;") //d.db.Exec("PRAGMA read_uncommitted = true;") @@ -61,59 +63,59 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) { // which it will never obtain. d.db.SetMaxOpenConns(20) - d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db, writer) + d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db, d.writer) if err != nil { return nil, err } - d.eventTypes, err = NewSqliteEventTypesTable(d.db, writer) + d.eventTypes, err = NewSqliteEventTypesTable(d.db, d.writer) if err != nil { return nil, err } - d.eventJSON, err = NewSqliteEventJSONTable(d.db, writer) + d.eventJSON, err = NewSqliteEventJSONTable(d.db, d.writer) if err != nil { return nil, err } - d.events, err = NewSqliteEventsTable(d.db, writer) + d.events, err = NewSqliteEventsTable(d.db, d.writer) if err != nil { return nil, err } - d.rooms, err = NewSqliteRoomsTable(d.db, writer) + d.rooms, err = NewSqliteRoomsTable(d.db, d.writer) if err != nil { return nil, err } - d.transactions, err = NewSqliteTransactionsTable(d.db, writer) + d.transactions, err = NewSqliteTransactionsTable(d.db, d.writer) if err != nil { return nil, err } - stateBlock, err := NewSqliteStateBlockTable(d.db, writer) + stateBlock, err := NewSqliteStateBlockTable(d.db, d.writer) if err != nil { return nil, err } - stateSnapshot, err := NewSqliteStateSnapshotTable(d.db, writer) + stateSnapshot, err := NewSqliteStateSnapshotTable(d.db, d.writer) if err != nil { return nil, err } - d.prevEvents, err = NewSqlitePrevEventsTable(d.db, writer) + d.prevEvents, err = NewSqlitePrevEventsTable(d.db, d.writer) if err != nil { return nil, err } - roomAliases, err := NewSqliteRoomAliasesTable(d.db, writer) + roomAliases, err := NewSqliteRoomAliasesTable(d.db, d.writer) if err != nil { return nil, err } - d.invites, err = NewSqliteInvitesTable(d.db, writer) + d.invites, err = NewSqliteInvitesTable(d.db, d.writer) if err != nil { return nil, err } - d.membership, err = NewSqliteMembershipTable(d.db, writer) + d.membership, err = NewSqliteMembershipTable(d.db, d.writer) if err != nil { return nil, err } - published, err := NewSqlitePublishedTable(d.db, writer) + published, err := NewSqlitePublishedTable(d.db, d.writer) if err != nil { return nil, err } - redactions, err := NewSqliteRedactionsTable(d.db, writer) + redactions, err := NewSqliteRedactionsTable(d.db, d.writer) if err != nil { return nil, err } @@ -137,27 +139,21 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) { return &d, nil } -func (d *Database) GetLatestEventsForUpdate( - ctx context.Context, roomNID types.RoomNID, -) (types.RoomRecentEventsUpdater, error) { - // TODO: Do not use transactions. We should be holding open this transaction but we cannot have - // multiple write transactions on sqlite. The code will perform additional - // write transactions independent of this one which will consistently cause - // 'database is locked' errors. As sqlite doesn't support multi-process on the - // same DB anyway, and we only execute updates sequentially, the only worries - // are for rolling back when things go wrong. (atomicity) - return shared.NewRoomRecentEventsUpdater(&d.Database, ctx, roomNID, false) +func (d *Database) StoreEvent( + ctx context.Context, event gomatrixserverlib.Event, + txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, +) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { + return d.Database.StoreEvent(ctx, event, txnAndSessionID, authEventNIDs) } -func (d *Database) MembershipUpdater( - ctx context.Context, roomID, targetUserID string, - targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, -) (updater types.MembershipUpdater, err error) { +func (d *Database) GetLatestEventsForUpdate( + ctx context.Context, roomNID types.RoomNID, +) (types.RoomRecentEventsUpdater, func() error, error) { // TODO: Do not use transactions. We should be holding open this transaction but we cannot have // multiple write transactions on sqlite. The code will perform additional // write transactions independent of this one which will consistently cause // 'database is locked' errors. As sqlite doesn't support multi-process on the // same DB anyway, and we only execute updates sequentially, the only worries // are for rolling back when things go wrong. (atomicity) - return shared.NewMembershipUpdater(ctx, &d.Database, roomID, targetUserID, targetLocal, roomVersion, false) + return shared.NewRoomRecentEventsUpdater(&d.Database, ctx, roomNID) } diff --git a/roomserver/types/types.go b/roomserver/types/types.go index 241e1e15d..2429a3620 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -172,7 +172,7 @@ type RoomRecentEventsUpdater interface { MarkEventAsSent(eventNID EventNID) error // Build a membership updater for the target user in this room. // It will share the same transaction as this updater. - MembershipUpdater(targetUserNID EventStateKeyNID, isTargetLocalUser bool) (MembershipUpdater, error) + MembershipUpdater(targetUserNID EventStateKeyNID, isTargetLocalUser bool) (MembershipUpdater, func() error, error) // Implements Transaction so it can be committed or rolledback sqlutil.Transaction }