From 395b3e67c0e1b9b60c949a6d1c4c06a7105f1d93 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 30 Sep 2022 10:27:05 +0100 Subject: [PATCH] Rename to transaction instead of snapshot --- syncapi/internal/history_visibility.go | 2 +- syncapi/routing/context.go | 4 +- syncapi/routing/messages.go | 8 +- syncapi/routing/search.go | 2 +- syncapi/storage/interface.go | 6 +- syncapi/storage/shared/syncserver.go | 41 +++++----- .../shared/{snapshot.go => transaction.go} | 82 +++++++++---------- syncapi/storage/sqlite3/syncserver.go | 8 +- syncapi/storage/storage_test.go | 16 ++-- syncapi/streams/stream_accountdata.go | 6 +- syncapi/streams/stream_devicelist.go | 4 +- syncapi/streams/stream_invite.go | 6 +- syncapi/streams/stream_notificationdata.go | 6 +- syncapi/streams/stream_pdu.go | 18 ++-- syncapi/streams/stream_presence.go | 6 +- syncapi/streams/stream_receipt.go | 6 +- syncapi/streams/stream_sendtodevice.go | 6 +- syncapi/streams/stream_typing.go | 4 +- syncapi/streams/streamprovider.go | 6 +- syncapi/streams/template_stream.go | 2 +- 20 files changed, 121 insertions(+), 118 deletions(-) rename syncapi/storage/shared/{snapshot.go => transaction.go} (80%) diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go index 6f88247d6..bbfe19f4c 100644 --- a/syncapi/internal/history_visibility.go +++ b/syncapi/internal/history_visibility.go @@ -100,7 +100,7 @@ func (ev eventVisibility) allowed() (allowed bool) { // Returns the filtered events and an error, if any. func ApplyHistoryVisibilityFilter( ctx context.Context, - syncDB storage.DatabaseSnapshot, + syncDB storage.DatabaseTransaction, rsAPI api.SyncRoomserverAPI, events []*gomatrixserverlib.HeaderedEvent, alwaysIncludeEventIDs map[string]struct{}, diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 6bb978eac..1ce34b85a 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -194,7 +194,7 @@ func Context( // by combining the events before and after the context event. Returns the filtered events, // and an error, if any. func applyHistoryVisibilityOnContextEvents( - ctx context.Context, snapshot storage.DatabaseSnapshot, rsAPI roomserver.SyncRoomserverAPI, + ctx context.Context, snapshot storage.DatabaseTransaction, rsAPI roomserver.SyncRoomserverAPI, eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent, userID string, ) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) { @@ -228,7 +228,7 @@ func applyHistoryVisibilityOnContextEvents( return filteredBefore, filteredAfter, nil } -func getStartEnd(ctx context.Context, snapshot storage.DatabaseSnapshot, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { +func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { if len(startEvents) > 0 { start, err = snapshot.EventPositionInTopology(ctx, startEvents[0].EventID()) if err != nil { diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 70d65418d..a59b3bcc5 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -39,7 +39,7 @@ import ( type messagesReq struct { ctx context.Context db storage.Database - snapshot storage.DatabaseSnapshot + snapshot storage.DatabaseTransaction rsAPI api.SyncRoomserverAPI cfg *config.SyncAPI roomID string @@ -71,7 +71,7 @@ func OnIncomingMessagesRequest( ) util.JSONResponse { var err error - snapshot, err := db.NewDatabaseWritable(req.Context()) + snapshot, err := db.NewDatabaseTransaction(req.Context()) if err != nil { return jsonerror.InternalServerError() } @@ -247,7 +247,7 @@ func OnIncomingMessagesRequest( // LazyLoadMembers enabled. func (m *messagesResp) applyLazyLoadMembers( ctx context.Context, - db storage.DatabaseSnapshot, + db storage.DatabaseTransaction, roomID string, device *userapi.Device, lazyLoad bool, @@ -561,7 +561,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][] // Returns an error if there was an issue with retrieving the latest position // from the database func setToDefault( - ctx context.Context, snapshot storage.DatabaseSnapshot, backwardOrdering bool, + ctx context.Context, snapshot storage.DatabaseTransaction, backwardOrdering bool, roomID string, ) (to types.TopologyToken, err error) { if backwardOrdering { diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index 0696acbb1..bac534a2c 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -258,7 +258,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts // contextEvents returns the events around a given eventID func contextEvents( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, event *gomatrixserverlib.HeaderedEvent, roomFilter *gomatrixserverlib.RoomEventFilter, searchReq SearchRequest, diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 3e9a5edb1..60d95e74a 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -26,7 +26,7 @@ import ( userapi "github.com/matrix-org/dendrite/userapi/api" ) -type DatabaseSnapshot interface { +type DatabaseTransaction interface { SharedUsers MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) @@ -111,8 +111,8 @@ type Database interface { Presence Notifications - NewDatabaseSnapshot(ctx context.Context) (*shared.DatabaseSnapshot, error) - NewDatabaseWritable(ctx context.Context) (*shared.DatabaseSnapshot, error) + NewDatabaseSnapshot(ctx context.Context) (*shared.DatabaseTransaction, error) + NewDatabaseTransaction(ctx context.Context) (*shared.DatabaseTransaction, error) // Events lookups a list of event by their event ID. // Returns a list of events matching the requested IDs found in the database. diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 3f5bf938d..58dbf9973 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -55,31 +55,34 @@ type Database struct { Presence tables.Presence } -func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseSnapshot, error) { - txn, err := d.DB.BeginTx(ctx, &sql.TxOptions{ - // Set the isolation level so that we see a snapshot of the database. - // In PostgreSQL repeatable read transactions will see a snapshot taken - // at the first query, and since the transaction is read-only it can't - // run into any serialisation errors. - // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }) - if err != nil { - return nil, err - } - return &DatabaseSnapshot{ - Database: d, - txn: txn, - }, nil +func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseTransaction, error) { + return d.NewDatabaseTransaction(ctx) // TODO: revert + /* + txn, err := d.DB.BeginTx(ctx, &sql.TxOptions{ + // Set the isolation level so that we see a snapshot of the database. + // In PostgreSQL repeatable read transactions will see a snapshot taken + // at the first query, and since the transaction is read-only it can't + // run into any serialisation errors. + // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }) + if err != nil { + return nil, err + } + return &DatabaseTransaction{ + Database: d, + txn: txn, + }, nil + */ } -func (d *Database) NewDatabaseWritable(ctx context.Context) (*DatabaseSnapshot, error) { +func (d *Database) NewDatabaseTransaction(ctx context.Context) (*DatabaseTransaction, error) { txn, err := d.DB.BeginTx(ctx, nil) if err != nil { return nil, err } - return &DatabaseSnapshot{ + return &DatabaseTransaction{ Database: d, txn: txn, }, nil diff --git a/syncapi/storage/shared/snapshot.go b/syncapi/storage/shared/transaction.go similarity index 80% rename from syncapi/storage/shared/snapshot.go rename to syncapi/storage/shared/transaction.go index 834d9d869..64e27af7d 100644 --- a/syncapi/storage/shared/snapshot.go +++ b/syncapi/storage/shared/transaction.go @@ -11,26 +11,26 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -type DatabaseSnapshot struct { +type DatabaseTransaction struct { *Database txn *sql.Tx } -func (d *DatabaseSnapshot) Commit() error { +func (d *DatabaseTransaction) Commit() error { if d.txn == nil { return nil } return d.txn.Commit() } -func (d *DatabaseSnapshot) Rollback() error { +func (d *DatabaseTransaction) Rollback() error { if d.txn == nil { return nil } return d.txn.Rollback() } -func (d *DatabaseSnapshot) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) { +func (d *DatabaseTransaction) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) { id, err := d.OutputEvents.SelectMaxEventID(ctx, d.txn) if err != nil { return 0, fmt.Errorf("d.OutputEvents.SelectMaxEventID: %w", err) @@ -38,7 +38,7 @@ func (d *DatabaseSnapshot) MaxStreamPositionForPDUs(ctx context.Context) (types. return types.StreamPosition(id), nil } -func (d *DatabaseSnapshot) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) { +func (d *DatabaseTransaction) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) { id, err := d.Receipts.SelectMaxReceiptID(ctx, d.txn) if err != nil { return 0, fmt.Errorf("d.Receipts.SelectMaxReceiptID: %w", err) @@ -46,7 +46,7 @@ func (d *DatabaseSnapshot) MaxStreamPositionForReceipts(ctx context.Context) (ty return types.StreamPosition(id), nil } -func (d *DatabaseSnapshot) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) { +func (d *DatabaseTransaction) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) { id, err := d.Invites.SelectMaxInviteID(ctx, d.txn) if err != nil { return 0, fmt.Errorf("d.Invites.SelectMaxInviteID: %w", err) @@ -54,7 +54,7 @@ func (d *DatabaseSnapshot) MaxStreamPositionForInvites(ctx context.Context) (typ return types.StreamPosition(id), nil } -func (d *DatabaseSnapshot) MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) { +func (d *DatabaseTransaction) MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) { id, err := d.SendToDevice.SelectMaxSendToDeviceMessageID(ctx, d.txn) if err != nil { return 0, fmt.Errorf("d.SendToDevice.SelectMaxSendToDeviceMessageID: %w", err) @@ -62,7 +62,7 @@ func (d *DatabaseSnapshot) MaxStreamPositionForSendToDeviceMessages(ctx context. return types.StreamPosition(id), nil } -func (d *DatabaseSnapshot) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) { +func (d *DatabaseTransaction) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) { id, err := d.AccountData.SelectMaxAccountDataID(ctx, d.txn) if err != nil { return 0, fmt.Errorf("d.Invites.SelectMaxAccountDataID: %w", err) @@ -70,7 +70,7 @@ func (d *DatabaseSnapshot) MaxStreamPositionForAccountData(ctx context.Context) return types.StreamPosition(id), nil } -func (d *DatabaseSnapshot) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) { +func (d *DatabaseTransaction) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) { id, err := d.NotificationData.SelectMaxID(ctx, d.txn) if err != nil { return 0, fmt.Errorf("d.NotificationData.SelectMaxID: %w", err) @@ -78,39 +78,39 @@ func (d *DatabaseSnapshot) MaxStreamPositionForNotificationData(ctx context.Cont return types.StreamPosition(id), nil } -func (d *DatabaseSnapshot) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { +func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { return d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilterPart, excludeEventIDs) } -func (d *DatabaseSnapshot) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) { +func (d *DatabaseTransaction) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) { return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, d.txn, userID, membership) } -func (d *DatabaseSnapshot) MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) { +func (d *DatabaseTransaction) MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) { return d.Memberships.SelectMembershipCount(ctx, d.txn, roomID, membership, pos) } -func (d *DatabaseSnapshot) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) { +func (d *DatabaseTransaction) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) { return d.Memberships.SelectHeroes(ctx, d.txn, roomID, userID, memberships) } -func (d *DatabaseSnapshot) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { +func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { return d.OutputEvents.SelectRecentEvents(ctx, d.txn, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents) } -func (d *DatabaseSnapshot) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) { +func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) { return d.Topology.SelectPositionInTopology(ctx, d.txn, eventID) } -func (d *DatabaseSnapshot) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) { +func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) { return d.Invites.SelectInviteEventsInRange(ctx, d.txn, targetUserID, r) } -func (d *DatabaseSnapshot) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) { +func (d *DatabaseTransaction) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) { return d.Peeks.SelectPeeksInRange(ctx, d.txn, userID, deviceID, r) } -func (d *DatabaseSnapshot) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) { +func (d *DatabaseTransaction) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) { return d.Receipts.SelectRoomReceiptsAfter(ctx, d.txn, roomIDs, streamPos) } @@ -119,7 +119,7 @@ func (d *DatabaseSnapshot) RoomReceiptsAfter(ctx context.Context, roomIDs []stri // If an event is not found in the database then it will be omitted from the list. // Returns an error if there was a problem talking with the database. // Does not include any transaction IDs in the returned events. -func (d *DatabaseSnapshot) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { +func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { streamEvents, err := d.OutputEvents.SelectEvents(ctx, d.txn, eventIDs, nil, false) if err != nil { return nil, err @@ -130,29 +130,29 @@ func (d *DatabaseSnapshot) Events(ctx context.Context, eventIDs []string) ([]*go return d.StreamEventsToEvents(nil, streamEvents), nil } -func (d *DatabaseSnapshot) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { +func (d *DatabaseTransaction) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { return d.CurrentRoomState.SelectJoinedUsers(ctx, d.txn) } -func (d *DatabaseSnapshot) AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) { +func (d *DatabaseTransaction) AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) { return d.CurrentRoomState.SelectJoinedUsersInRoom(ctx, d.txn, roomIDs) } -func (d *DatabaseSnapshot) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) { +func (d *DatabaseTransaction) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) { return d.Peeks.SelectPeekingDevices(ctx, d.txn) } -func (d *DatabaseSnapshot) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) { +func (d *DatabaseTransaction) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) { return d.CurrentRoomState.SelectSharedUsers(ctx, d.txn, userID, otherUserIDs) } -func (d *DatabaseSnapshot) GetStateEvent( +func (d *DatabaseTransaction) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { return d.CurrentRoomState.SelectStateEvent(ctx, d.txn, roomID, evType, stateKey) } -func (d *DatabaseSnapshot) GetStateEventsForRoom( +func (d *DatabaseTransaction) GetStateEventsForRoom( ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) { stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilter, nil) @@ -164,14 +164,14 @@ func (d *DatabaseSnapshot) GetStateEventsForRoom( // Returns a map following the format data[roomID] = []dataTypes // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error -func (d *DatabaseSnapshot) GetAccountDataInRange( +func (d *DatabaseTransaction) GetAccountDataInRange( ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, ) (map[string][]string, types.StreamPosition, error) { return d.AccountData.SelectAccountDataInRange(ctx, d.txn, userID, r, accountDataFilterPart) } -func (d *DatabaseSnapshot) GetEventsInTopologicalRange( +func (d *DatabaseTransaction) GetEventsInTopologicalRange( ctx context.Context, from, to *types.TopologyToken, roomID string, @@ -207,13 +207,13 @@ func (d *DatabaseSnapshot) GetEventsInTopologicalRange( return } -func (d *DatabaseSnapshot) BackwardExtremitiesForRoom( +func (d *DatabaseTransaction) BackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (backwardExtremities map[string][]string, err error) { return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, d.txn, roomID) } -func (d *DatabaseSnapshot) MaxTopologicalPosition( +func (d *DatabaseTransaction) MaxTopologicalPosition( ctx context.Context, roomID string, ) (types.TopologyToken, error) { depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, d.txn, roomID) @@ -223,7 +223,7 @@ func (d *DatabaseSnapshot) MaxTopologicalPosition( return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil } -func (d *DatabaseSnapshot) EventPositionInTopology( +func (d *DatabaseTransaction) EventPositionInTopology( ctx context.Context, eventID string, ) (types.TopologyToken, error) { depth, stream, err := d.Topology.SelectPositionInTopology(ctx, d.txn, eventID) @@ -233,7 +233,7 @@ func (d *DatabaseSnapshot) EventPositionInTopology( return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } -func (d *DatabaseSnapshot) StreamToTopologicalPosition( +func (d *DatabaseTransaction) StreamToTopologicalPosition( ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool, ) (types.TopologyToken, error) { topoPos, err := d.Topology.SelectStreamToTopologicalPosition(ctx, d.txn, roomID, streamPos, backwardOrdering) @@ -255,7 +255,7 @@ func (d *DatabaseSnapshot) StreamToTopologicalPosition( // GetBackwardTopologyPos retrieves the backward topology position, i.e. the position of the // oldest event in the room's topology. -func (d *DatabaseSnapshot) GetBackwardTopologyPos( +func (d *DatabaseTransaction) GetBackwardTopologyPos( ctx context.Context, events []types.StreamEvent, ) (types.TopologyToken, error) { @@ -276,7 +276,7 @@ func (d *DatabaseSnapshot) GetBackwardTopologyPos( // exclusive of oldPos, inclusive of newPos, for the rooms in which // the user has new membership events. // A list of joined room IDs is also returned in case the caller needs it. -func (d *DatabaseSnapshot) GetStateDeltas( +func (d *DatabaseTransaction) GetStateDeltas( ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, @@ -403,7 +403,7 @@ func (d *DatabaseSnapshot) GetStateDeltas( // requests with full_state=true. // Fetches full state for all joined rooms and uses selectStateInRange to get // updates for other rooms. -func (d *DatabaseSnapshot) GetStateDeltasForFullStateSync( +func (d *DatabaseTransaction) GetStateDeltasForFullStateSync( ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, @@ -513,7 +513,7 @@ func (d *DatabaseSnapshot) GetStateDeltasForFullStateSync( return result, joinedRoomIDs, nil } -func (d *DatabaseSnapshot) currentStateStreamEventsForRoom( +func (d *DatabaseTransaction) currentStateStreamEventsForRoom( ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]types.StreamEvent, error) { @@ -528,7 +528,7 @@ func (d *DatabaseSnapshot) currentStateStreamEventsForRoom( return s, nil } -func (d *DatabaseSnapshot) SendToDeviceUpdatesForSync( +func (d *DatabaseTransaction) SendToDeviceUpdatesForSync( ctx context.Context, userID, deviceID string, from, to types.StreamPosition, @@ -545,12 +545,12 @@ func (d *DatabaseSnapshot) SendToDeviceUpdatesForSync( return lastPos, events, nil } -func (d *DatabaseSnapshot) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) { +func (d *DatabaseTransaction) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) { _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, d.txn, roomIDs, streamPos) return receipts, err } -func (d *DatabaseSnapshot) GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, rooms map[string]string) (map[string]*eventutil.NotificationData, error) { +func (d *DatabaseTransaction) GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, rooms map[string]string) (map[string]*eventutil.NotificationData, error) { roomIDs := make([]string, 0, len(rooms)) for roomID, membership := range rooms { if membership != gomatrixserverlib.Join { @@ -561,14 +561,14 @@ func (d *DatabaseSnapshot) GetUserUnreadNotificationCountsForRooms(ctx context.C return d.NotificationData.SelectUserUnreadCountsForRooms(ctx, d.txn, userID, roomIDs) } -func (d *DatabaseSnapshot) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { +func (d *DatabaseTransaction) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { return d.Presence.GetPresenceForUser(ctx, d.txn, userID) } -func (d *DatabaseSnapshot) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) { +func (d *DatabaseTransaction) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) { return d.Presence.GetPresenceAfter(ctx, d.txn, after, filter) } -func (d *DatabaseSnapshot) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { +func (d *DatabaseTransaction) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { return d.Presence.GetMaxPresenceID(ctx, d.txn) } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index aab0dbfc2..0879030a6 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -49,15 +49,15 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) return &d, nil } -func (d *SyncServerDatasource) NewDatabaseSnapshot(ctx context.Context) (*shared.DatabaseSnapshot, error) { - return &shared.DatabaseSnapshot{ +func (d *SyncServerDatasource) NewDatabaseSnapshot(ctx context.Context) (*shared.DatabaseTransaction, error) { + return &shared.DatabaseTransaction{ Database: &d.Database, // not setting a transaction because SQLite doesn't support it }, nil } -func (d *SyncServerDatasource) NewDatabaseWritable(ctx context.Context) (*shared.DatabaseSnapshot, error) { - return &shared.DatabaseSnapshot{ +func (d *SyncServerDatasource) NewDatabaseTransaction(ctx context.Context) (*shared.DatabaseTransaction, error) { + return &shared.DatabaseTransaction{ Database: &d.Database, // not setting a transaction because SQLite doesn't support it }, nil diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 261a38d54..5ff185a32 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -60,7 +60,7 @@ func TestWriteEvents(t *testing.T) { }) } -func WithSnapshot(t *testing.T, db storage.Database, f func(snapshot storage.DatabaseSnapshot)) { +func WithSnapshot(t *testing.T, db storage.Database, f func(snapshot storage.DatabaseTransaction)) { snapshot, err := db.NewDatabaseSnapshot(ctx) if err != nil { t.Fatal(err) @@ -91,7 +91,7 @@ func TestRecentEventsPDU(t *testing.T) { MustWriteEvents(t, db, test.NewRoom(t, alice).Events()) var latest types.StreamPosition - WithSnapshot(t, db, func(snapshot storage.DatabaseSnapshot) { + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { var err error if latest, err = snapshot.MaxStreamPositionForPDUs(ctx); err != nil { t.Fatal("failed to get MaxStreamPositionForPDUs: %w", err) @@ -157,7 +157,7 @@ func TestRecentEventsPDU(t *testing.T) { var gotEvents []types.StreamEvent var limited bool filter.Limit = tc.Limit - WithSnapshot(t, db, func(snapshot storage.DatabaseSnapshot) { + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { var err error gotEvents, limited, err = snapshot.RecentEvents(ctx, r.ID, types.Range{ From: tc.From, @@ -197,7 +197,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { events := r.Events() _ = MustWriteEvents(t, db, events) - WithSnapshot(t, db, func(snapshot storage.DatabaseSnapshot) { + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { from, err := snapshot.MaxTopologicalPosition(ctx, r.ID) if err != nil { t.Fatalf("failed to get MaxTopologicalPosition: %s", err) @@ -436,7 +436,7 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point there should be no messages. We haven't sent anything // yet. - WithSnapshot(t, db, func(snapshot storage.DatabaseSnapshot) { + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { _, events, err := snapshot.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100) if err != nil { t.Fatal(err) @@ -456,7 +456,7 @@ func TestSendToDeviceBehaviour(t *testing.T) { t.Fatal(err) } - WithSnapshot(t, db, func(snapshot storage.DatabaseSnapshot) { + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { // At this point we should get exactly one message. We're sending the sync position // that we were given from the update and the send-to-device update will be updated // in the database to reflect that this was the sync position we sent the message at. @@ -486,7 +486,7 @@ func TestSendToDeviceBehaviour(t *testing.T) { return } - WithSnapshot(t, db, func(snapshot storage.DatabaseSnapshot) { + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { // At this point we should now have no updates, because we've progressed the sync // position. Therefore the update from before will not be sent again. var events []types.SendToDeviceEvent @@ -523,7 +523,7 @@ func TestSendToDeviceBehaviour(t *testing.T) { lastPos = streamPos } - WithSnapshot(t, db, func(snapshot storage.DatabaseSnapshot) { + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { _, events, err := snapshot.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, lastPos) if err != nil { t.Fatalf("unable to get events: %v", err) diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 7e0fa7094..3f2f7d134 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -16,7 +16,7 @@ type AccountDataStreamProvider struct { } func (p *AccountDataStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) @@ -32,7 +32,7 @@ func (p *AccountDataStreamProvider) Setup( func (p *AccountDataStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) @@ -40,7 +40,7 @@ func (p *AccountDataStreamProvider) CompleteSync( func (p *AccountDataStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index a806ef111..7996c2038 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -18,7 +18,7 @@ type DeviceListStreamProvider struct { func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.LatestPosition(ctx) @@ -26,7 +26,7 @@ func (p *DeviceListStreamProvider) CompleteSync( func (p *DeviceListStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 08dccf63f..1fd3edbfd 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -18,7 +18,7 @@ type InviteStreamProvider struct { } func (p *InviteStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) @@ -34,7 +34,7 @@ func (p *InviteStreamProvider) Setup( func (p *InviteStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) @@ -42,7 +42,7 @@ func (p *InviteStreamProvider) CompleteSync( func (p *InviteStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go index 2535cce53..5a81fd09a 100644 --- a/syncapi/streams/stream_notificationdata.go +++ b/syncapi/streams/stream_notificationdata.go @@ -12,7 +12,7 @@ type NotificationDataStreamProvider struct { } func (p *NotificationDataStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) @@ -28,7 +28,7 @@ func (p *NotificationDataStreamProvider) Setup( func (p *NotificationDataStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) @@ -36,7 +36,7 @@ func (p *NotificationDataStreamProvider) CompleteSync( func (p *NotificationDataStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, _ types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 19b877d76..89c5ba35e 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -40,7 +40,7 @@ type PDUStreamProvider struct { } func (p *PDUStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) @@ -56,7 +56,7 @@ func (p *PDUStreamProvider) Setup( func (p *PDUStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { from := types.StreamPosition(0) @@ -132,7 +132,7 @@ func (p *PDUStreamProvider) CompleteSync( func (p *PDUStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) (newPos types.StreamPosition) { @@ -210,7 +210,7 @@ func (p *PDUStreamProvider) IncrementalSync( // nolint:gocyclo func (p *PDUStreamProvider) addRoomDeltaToResponse( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, device *userapi.Device, r types.Range, delta types.StateDelta, @@ -343,7 +343,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // sure we always return the required events in the timeline. func applyHistoryVisibilityFilter( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, rsAPI roomserverAPI.SyncRoomserverAPI, roomID, userID string, limit int, @@ -375,7 +375,7 @@ func applyHistoryVisibilityFilter( return events, nil } -func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, snapshot storage.DatabaseSnapshot, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { +func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, snapshot storage.DatabaseTransaction, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { // Work out how many members are in the room. joinedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition) invitedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition) @@ -416,7 +416,7 @@ func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, snapshot storage func (p *PDUStreamProvider) getJoinResponseForCompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, roomID string, r types.Range, stateFilter *gomatrixserverlib.StateFilter, @@ -518,7 +518,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } func (p *PDUStreamProvider) lazyLoadMembers( - ctx context.Context, snapshot storage.DatabaseSnapshot, roomID string, + ctx context.Context, snapshot storage.DatabaseTransaction, roomID string, incremental, limited bool, stateFilter *gomatrixserverlib.StateFilter, device *userapi.Device, timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, @@ -581,7 +581,7 @@ func (p *PDUStreamProvider) lazyLoadMembers( // addIgnoredUsersToFilter adds ignored users to the eventfilter and // the syncreq itself for further use in streams. -func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapshot storage.DatabaseSnapshot, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { +func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { ignores, err := snapshot.IgnoresForUser(ctx, req.Device.UserID) if err != nil { if err == sql.ErrNoRows { diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index a1e24f993..81cea7d5e 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -35,7 +35,7 @@ type PresenceStreamProvider struct { } func (p *PresenceStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) @@ -51,7 +51,7 @@ func (p *PresenceStreamProvider) Setup( func (p *PresenceStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) @@ -59,7 +59,7 @@ func (p *PresenceStreamProvider) CompleteSync( func (p *PresenceStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index 73554d678..8818a5533 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -14,7 +14,7 @@ type ReceiptStreamProvider struct { } func (p *ReceiptStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) @@ -30,7 +30,7 @@ func (p *ReceiptStreamProvider) Setup( func (p *ReceiptStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) @@ -38,7 +38,7 @@ func (p *ReceiptStreamProvider) CompleteSync( func (p *ReceiptStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 4132e2e15..00b67cc42 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -12,7 +12,7 @@ type SendToDeviceStreamProvider struct { } func (p *SendToDeviceStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) @@ -28,7 +28,7 @@ func (p *SendToDeviceStreamProvider) Setup( func (p *SendToDeviceStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) @@ -36,7 +36,7 @@ func (p *SendToDeviceStreamProvider) CompleteSync( func (p *SendToDeviceStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go index e895b80d7..a6f7c7a06 100644 --- a/syncapi/streams/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -17,7 +17,7 @@ type TypingStreamProvider struct { func (p *TypingStreamProvider) CompleteSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) @@ -25,7 +25,7 @@ func (p *TypingStreamProvider) CompleteSync( func (p *TypingStreamProvider) IncrementalSync( ctx context.Context, - snapshot storage.DatabaseSnapshot, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { diff --git a/syncapi/streams/streamprovider.go b/syncapi/streams/streamprovider.go index cd21d9fb9..8b12e2eba 100644 --- a/syncapi/streams/streamprovider.go +++ b/syncapi/streams/streamprovider.go @@ -8,7 +8,7 @@ import ( ) type StreamProvider interface { - Setup(ctx context.Context, snapshot storage.DatabaseSnapshot) + Setup(ctx context.Context, snapshot storage.DatabaseTransaction) // Advance will update the latest position of the stream based on // an update and will wake callers waiting on StreamNotifyAfter. @@ -16,12 +16,12 @@ type StreamProvider interface { // CompleteSync will update the response to include all updates as needed // for a complete sync. It will always return immediately. - CompleteSync(ctx context.Context, snapshot storage.DatabaseSnapshot, req *types.SyncRequest) types.StreamPosition + CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest) types.StreamPosition // IncrementalSync will update the response to include all updates between // the from and to sync positions. It will always return immediately, // making no changes if the range contains no updates. - IncrementalSync(ctx context.Context, snapshot storage.DatabaseSnapshot, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition + IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition // LatestPosition returns the latest stream position for this stream. LatestPosition(ctx context.Context) types.StreamPosition diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go index b778be53f..f208d84e4 100644 --- a/syncapi/streams/template_stream.go +++ b/syncapi/streams/template_stream.go @@ -15,7 +15,7 @@ type DefaultStreamProvider struct { } func (p *DefaultStreamProvider) Setup( - ctx context.Context, snapshot storage.DatabaseSnapshot, + ctx context.Context, snapshot storage.DatabaseTransaction, ) { }