From 9d6c9e4cc30670761b861d256887836e0ad8855f Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Fri, 28 Oct 2022 09:44:27 +0200 Subject: [PATCH] Calculate initial sync async --- syncapi/streams/streams.go | 27 +++++++ syncapi/sync/requestpool.go | 137 +++++++++++++++--------------------- 2 files changed, 85 insertions(+), 79 deletions(-) diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index dc8547621..780104405 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -2,6 +2,7 @@ package streams import ( "context" + "fmt" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -103,3 +104,29 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken { PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx), } } + +func ToToken(provider StreamProvider, position types.StreamPosition) types.StreamingToken { + switch t := provider.(type) { + case *PDUStreamProvider: + return types.StreamingToken{PDUPosition: position} + case *TypingStreamProvider: + return types.StreamingToken{TypingPosition: position} + case *ReceiptStreamProvider: + return types.StreamingToken{ReceiptPosition: position} + case *SendToDeviceStreamProvider: + return types.StreamingToken{SendToDevicePosition: position} + case *InviteStreamProvider: + return types.StreamingToken{InvitePosition: position} + case *AccountDataStreamProvider: + return types.StreamingToken{AccountDataPosition: position} + case *DeviceListStreamProvider: + return types.StreamingToken{DeviceListPosition: position} + case *NotificationDataStreamProvider: + return types.StreamingToken{NotificationDataPosition: position} + case *PresenceStreamProvider: + return types.StreamingToken{PresencePosition: position} + default: + panic(fmt.Sprintf("unknown stream provider: %T", t)) + } + return types.StreamingToken{} +} diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 29d92b293..899793b31 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -225,6 +225,12 @@ var waitingSyncRequests = prometheus.NewGauge( }, ) +// streamPosResponse is the response from a goroutine +type streamPosResponse struct { + provider streams.StreamProvider + pos types.StreamPosition +} + // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be // called in a dedicated goroutine for this request. This function will block the goroutine // until a response is ready, or it times out. @@ -307,10 +313,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } withTransaction := func(from types.StreamPosition, f func(snapshot storage.DatabaseTransaction) types.StreamPosition) types.StreamPosition { + if err := req.Context().Err(); err != nil { + return from + } var succeeded bool snapshot, err := rp.db.NewDatabaseSnapshot(req.Context()) if err != nil { - logrus.WithError(err).Error("Failed to acquire database snapshot for sync request") + syncReq.Log.WithError(err).Error("Failed to acquire database snapshot for sync request") return from } defer func() { @@ -322,82 +331,52 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. if syncReq.Since.IsEmpty() { // Complete sync - syncReq.Response.NextBatch = types.StreamingToken{ - // Get the current DeviceListPosition first, as the currentPosition - // might advance while processing other streams, resulting in flakey - // tests. - DeviceListPosition: withTransaction( - syncReq.Since.DeviceListPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.DeviceListStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - PDUPosition: withTransaction( - syncReq.Since.PDUPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.PDUStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - TypingPosition: withTransaction( - syncReq.Since.TypingPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.TypingStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - ReceiptPosition: withTransaction( - syncReq.Since.ReceiptPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.ReceiptStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - InvitePosition: withTransaction( - syncReq.Since.InvitePosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.InviteStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - SendToDevicePosition: withTransaction( - syncReq.Since.SendToDevicePosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.SendToDeviceStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - AccountDataPosition: withTransaction( - syncReq.Since.AccountDataPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.AccountDataStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - NotificationDataPosition: withTransaction( - syncReq.Since.NotificationDataPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.NotificationDataStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), - PresencePosition: withTransaction( - syncReq.Since.PresencePosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.PresenceStreamProvider.CompleteSync( - syncReq.Context, txn, syncReq, - ) - }, - ), + // The PDU stream needs to be the very first stream to get the data, + // as it sets values the other streams need + pduPos := withTransaction( + 0, + func(txn storage.DatabaseTransaction) types.StreamPosition { + return rp.streams.PDUStreamProvider.CompleteSync( + syncReq.Context, txn, syncReq, + ) + }, + ) + syncReq.Response.NextBatch.PDUPosition = pduPos + allStreams := []streams.StreamProvider{ + rp.streams.DeviceListStreamProvider, + rp.streams.TypingStreamProvider, + rp.streams.ReceiptStreamProvider, + rp.streams.InviteStreamProvider, + rp.streams.SendToDeviceStreamProvider, + rp.streams.AccountDataStreamProvider, + rp.streams.NotificationDataStreamProvider, + rp.streams.PresenceStreamProvider, + } + + streamPosCh := make(chan streamPosResponse, len(allStreams)) + wg := sync.WaitGroup{} + wg.Add(len(allStreams)) + + // fan out stream calculations + for _, s := range allStreams { + go func(stream streams.StreamProvider) { + streamPos := withTransaction( + 0, + func(txn storage.DatabaseTransaction) types.StreamPosition { + return stream.CompleteSync( + syncReq.Context, txn, syncReq, + ) + }, + ) + streamPosCh <- streamPosResponse{provider: stream, pos: streamPos} + wg.Done() + }(s) + } + // Wait for all streams to finish their work + wg.Wait() + close(streamPosCh) + for resp := range streamPosCh { + syncReq.Response.NextBatch.ApplyUpdates(streams.ToToken(resp.provider, resp.pos)) } } else { // Incremental sync @@ -544,7 +523,7 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use } snapshot, err := rp.db.NewDatabaseSnapshot(req.Context()) if err != nil { - logrus.WithError(err).Error("Failed to acquire database snapshot for key change") + syncReq.Log.WithError(err).Error("Failed to acquire database snapshot for key change") return jsonerror.InternalServerError() } var succeeded bool @@ -555,7 +534,7 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition, ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info") + syncReq.Log.WithError(err).Error("Failed to DeviceListCatchup info") return jsonerror.InternalServerError() } succeeded = true