From d660bbb62d8689e59e602afe34a2776b3cef1f0c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 30 Sep 2022 15:47:14 +0100 Subject: [PATCH] More tweaks --- syncapi/consumers/roomserver.go | 5 ++++- syncapi/notifier/notifier.go | 9 +++++++-- syncapi/routing/context.go | 5 ++++- syncapi/routing/search.go | 5 ++++- syncapi/streams/streams.go | 5 ++++- syncapi/sync/requestpool.go | 5 ++++- 6 files changed, 27 insertions(+), 7 deletions(-) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index e5e8fe293..c7a11dbb4 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -27,6 +27,7 @@ import ( "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/internal/fulltext" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" @@ -454,7 +455,8 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head if err != nil { return nil, err } - defer snapshot.Rollback() // nolint:errcheck + var succeeded bool + defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) prevEvent, err := snapshot.GetStateEvent( s.ctx, event.RoomID(), event.Type(), stateKey, @@ -474,6 +476,7 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head } event.Event, err = event.SetUnsigned(prev) + succeeded = true return event, err } diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index a8e5bf9ad..db18c6b77 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -323,7 +324,8 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error { if err != nil { return err } - defer snapshot.Rollback() // nolint:errcheck + var succeeded bool + defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) roomToUsers, err := snapshot.AllJoinedUsersInRooms(ctx) if err != nil { @@ -337,6 +339,7 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error { } n.setPeekingDevices(roomToPeekingDevices) + succeeded = true return nil } @@ -349,7 +352,8 @@ func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs [ if err != nil { return err } - defer snapshot.Rollback() // nolint:errcheck + var succeeded bool + defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) roomToUsers, err := snapshot.AllJoinedUsersInRoom(ctx, roomIDs) if err != nil { @@ -357,6 +361,7 @@ func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs [ } n.setUsersJoinedToRooms(roomToUsers) + succeeded = true return nil } diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 1ce34b85a..0ed164c7e 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" roomserver "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" @@ -55,7 +56,8 @@ func Context( if err != nil { return jsonerror.InternalServerError() } - defer snapshot.Rollback() // nolint:errcheck + var succeeded bool + defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) filter, err := parseRoomEventFilter(req) if err != nil { @@ -184,6 +186,7 @@ func Context( response.End = end.String() response.Start = start.String() } + succeeded = true return util.JSONResponse{ Code: http.StatusOK, JSON: response, diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index bac534a2c..aef355def 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/fulltext" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/userapi/api" ) @@ -65,7 +66,8 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts if err != nil { return jsonerror.InternalServerError() } - defer snapshot.Rollback() // nolint:errcheck + var succeeded bool + defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) // only search rooms the user is actually joined to joinedRooms, err := snapshot.RoomIDsWithMembership(ctx, device.UserID, "join") @@ -249,6 +251,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts logrus.Debugf("Full search request took %v", time.Since(start)) + succeeded = true return util.JSONResponse{ Code: http.StatusOK, JSON: res, diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index eccbb3a4f..dc8547621 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -4,6 +4,7 @@ import ( "context" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" keyapi "github.com/matrix-org/dendrite/keyserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/notifier" @@ -72,7 +73,8 @@ func NewSyncStreamProviders( if err != nil { panic(err) } - defer snapshot.Rollback() // nolint:errcheck + var succeeded bool + defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) streams.PDUStreamProvider.Setup(ctx, snapshot) streams.TypingStreamProvider.Setup(ctx, snapshot) @@ -84,6 +86,7 @@ func NewSyncStreamProviders( streams.DeviceListStreamProvider.Setup(ctx, snapshot) streams.PresenceStreamProvider.Setup(ctx, snapshot) + succeeded = true return streams } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index db88d80bd..3342d2b68 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -31,6 +31,7 @@ import ( "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/internal/sqlutil" keyapi "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" @@ -310,7 +311,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. logrus.WithError(err).Error("Failed to acquire database snapshot for sync request") return jsonerror.InternalServerError() } - defer snapshot.Commit() // nolint:errcheck + var succeeded bool + defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) if syncReq.Since.IsEmpty() { // Complete sync @@ -409,6 +411,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } + succeeded = true return util.JSONResponse{ Code: http.StatusOK, JSON: syncReq.Response,