From f14c3a252627cc32c4e9b64dd869a9b4de4abf70 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 30 Sep 2022 15:44:30 +0100 Subject: [PATCH] Tweak transaction handling behaviour for `/sync` --- syncapi/storage/interface.go | 2 ++ syncapi/streams/stream_accountdata.go | 1 + syncapi/streams/stream_devicelist.go | 2 ++ syncapi/streams/stream_invite.go | 1 + syncapi/streams/stream_notificationdata.go | 1 + syncapi/streams/stream_pdu.go | 4 ++++ syncapi/streams/stream_presence.go | 2 ++ syncapi/streams/stream_receipt.go | 1 + syncapi/streams/stream_sendtodevice.go | 1 + syncapi/sync/requestpool.go | 2 +- 10 files changed, 16 insertions(+), 1 deletion(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 3732e43fb..4a03aca74 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/types" @@ -27,6 +28,7 @@ import ( ) type DatabaseTransaction interface { + sqlutil.Transaction SharedUsers MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 3f2f7d134..34135d65a 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -54,6 +54,7 @@ func (p *AccountDataStreamProvider) IncrementalSync( ) if err != nil { req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed") + _ = snapshot.Rollback() return from } diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index 7996c2038..10ede573a 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -34,11 +34,13 @@ func (p *DeviceListStreamProvider) IncrementalSync( to, _, err = internal.DeviceListCatchup(context.Background(), snapshot, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) if err != nil { req.Log.WithError(err).Error("internal.DeviceListCatchup failed") + _ = snapshot.Rollback() return from } err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response) if err != nil { req.Log.WithError(err).Error("internal.DeviceListCatchup failed") + _ = snapshot.Rollback() return from } diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 17b3b8434..b52eaaab1 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -56,6 +56,7 @@ func (p *InviteStreamProvider) IncrementalSync( ) if err != nil { req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed") + _ = snapshot.Rollback() return from } diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go index 5a81fd09a..e1ee02b21 100644 --- a/syncapi/streams/stream_notificationdata.go +++ b/syncapi/streams/stream_notificationdata.go @@ -46,6 +46,7 @@ func (p *NotificationDataStreamProvider) IncrementalSync( countsByRoom, err := snapshot.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms) if err != nil { req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed") + _ = snapshot.Rollback() return from } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 89c5ba35e..43e2ce8ba 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -75,6 +75,7 @@ func (p *PDUStreamProvider) CompleteSync( joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join) if err != nil { req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed") + _ = snapshot.Rollback() return from } @@ -101,6 +102,7 @@ func (p *PDUStreamProvider) CompleteSync( ) if jerr != nil { req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") + _ = snapshot.Rollback() continue // return from } req.Response.Rooms.Join[roomID] = *jr @@ -111,6 +113,7 @@ func (p *PDUStreamProvider) CompleteSync( peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil { req.Log.WithError(err).Error("p.DB.PeeksInRange failed") + _ = snapshot.Rollback() return from } for _, peek := range peeks { @@ -121,6 +124,7 @@ func (p *PDUStreamProvider) CompleteSync( ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") + _ = snapshot.Rollback() continue // return from } req.Response.Rooms.Peek[peek.RoomID] = *jr diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 81cea7d5e..d24c85620 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -67,6 +67,7 @@ func (p *PresenceStreamProvider) IncrementalSync( presences, err := snapshot.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000}) if err != nil { req.Log.WithError(err).Error("p.DB.PresenceAfter failed") + _ = snapshot.Rollback() return from } @@ -95,6 +96,7 @@ func (p *PresenceStreamProvider) IncrementalSync( presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i]) if err != nil { req.Log.WithError(err).Error("unable to query presence for user") + _ = snapshot.Rollback() return from } if len(presences) > req.Filter.Presence.Limit { diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index 8818a5533..40e5bd01e 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -52,6 +52,7 @@ func (p *ReceiptStreamProvider) IncrementalSync( lastPos, receipts, err := snapshot.RoomReceiptsAfter(ctx, joinedRooms, from) if err != nil { req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed") + _ = snapshot.Rollback() return from } diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 00b67cc42..3262832a3 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -44,6 +44,7 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( lastPos, events, err := snapshot.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to) if err != nil { req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed") + _ = snapshot.Rollback() return from } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 1d0ac1a40..db88d80bd 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -310,7 +310,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. logrus.WithError(err).Error("Failed to acquire database snapshot for sync request") return jsonerror.InternalServerError() } - defer snapshot.Rollback() // nolint:errcheck + defer snapshot.Commit() // nolint:errcheck if syncReq.Since.IsEmpty() { // Complete sync