diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 780104405..caa428827 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -130,3 +130,29 @@ func ToToken(provider StreamProvider, position types.StreamPosition) types.Strea } return types.StreamingToken{} } + +func IncrementalPositions(provider StreamProvider, current, since types.StreamingToken) (types.StreamPosition, types.StreamPosition) { + switch t := provider.(type) { + case *PDUStreamProvider: + return current.PDUPosition, since.PDUPosition + case *TypingStreamProvider: + return current.TypingPosition, since.TypingPosition + case *ReceiptStreamProvider: + return current.ReceiptPosition, since.ReceiptPosition + case *SendToDeviceStreamProvider: + return current.SendToDevicePosition, since.SendToDevicePosition + case *InviteStreamProvider: + return current.InvitePosition, since.InvitePosition + case *AccountDataStreamProvider: + return current.AccountDataPosition, since.AccountDataPosition + case *DeviceListStreamProvider: + return current.DeviceListPosition, since.DeviceListPosition + case *NotificationDataStreamProvider: + return current.NotificationDataPosition, since.NotificationDataPosition + case *PresenceStreamProvider: + return current.PresencePosition, since.PresencePosition + default: + panic(fmt.Sprintf("unknown stream provider: %T", t)) + } + return 0, 0 +} diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 899793b31..766c58737 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -329,6 +329,17 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. return f(snapshot) } + allStreams := []streams.StreamProvider{ + rp.streams.DeviceListStreamProvider, + rp.streams.TypingStreamProvider, + rp.streams.ReceiptStreamProvider, + rp.streams.InviteStreamProvider, + rp.streams.SendToDeviceStreamProvider, + rp.streams.AccountDataStreamProvider, + rp.streams.NotificationDataStreamProvider, + rp.streams.PresenceStreamProvider, + } + if syncReq.Since.IsEmpty() { // Complete sync // The PDU stream needs to be the very first stream to get the data, @@ -342,16 +353,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. }, ) syncReq.Response.NextBatch.PDUPosition = pduPos - allStreams := []streams.StreamProvider{ - rp.streams.DeviceListStreamProvider, - rp.streams.TypingStreamProvider, - rp.streams.ReceiptStreamProvider, - rp.streams.InviteStreamProvider, - rp.streams.SendToDeviceStreamProvider, - rp.streams.AccountDataStreamProvider, - rp.streams.NotificationDataStreamProvider, - rp.streams.PresenceStreamProvider, - } streamPosCh := make(chan streamPosResponse, len(allStreams)) wg := sync.WaitGroup{} @@ -361,7 +362,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. for _, s := range allStreams { go func(stream streams.StreamProvider) { streamPos := withTransaction( - 0, + 0, // we're doing an initial sync func(txn storage.DatabaseTransaction) types.StreamPosition { return stream.CompleteSync( syncReq.Context, txn, syncReq, @@ -380,88 +381,46 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } else { // Incremental sync - syncReq.Response.NextBatch = types.StreamingToken{ - PDUPosition: withTransaction( - syncReq.Since.PDUPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.PDUStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition, - ) - }, - ), - TypingPosition: withTransaction( - syncReq.Since.TypingPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.TypingStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition, - ) - }, - ), - ReceiptPosition: withTransaction( - syncReq.Since.ReceiptPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.ReceiptStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition, - ) - }, - ), - InvitePosition: withTransaction( - syncReq.Since.InvitePosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.InviteStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition, - ) - }, - ), - SendToDevicePosition: withTransaction( - syncReq.Since.SendToDevicePosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.SendToDeviceStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition, - ) - }, - ), - AccountDataPosition: withTransaction( - syncReq.Since.AccountDataPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.AccountDataStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition, - ) - }, - ), - NotificationDataPosition: withTransaction( - syncReq.Since.NotificationDataPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.NotificationDataStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition, - ) - }, - ), - DeviceListPosition: withTransaction( - syncReq.Since.DeviceListPosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.DeviceListStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition, - ) - }, - ), - PresencePosition: withTransaction( - syncReq.Since.PresencePosition, - func(txn storage.DatabaseTransaction) types.StreamPosition { - return rp.streams.PresenceStreamProvider.IncrementalSync( - syncReq.Context, txn, syncReq, - syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition, - ) - }, - ), + // The PDU stream needs to be the very first stream to get the data, + // as it sets values the other streams need + current, since := streams.IncrementalPositions(rp.streams.PDUStreamProvider, rp.Notifier.CurrentPosition(), syncReq.Since) + pduPos := withTransaction( + since, + func(txn storage.DatabaseTransaction) types.StreamPosition { + return rp.streams.PDUStreamProvider.IncrementalSync( + syncReq.Context, txn, syncReq, + since, current, + ) + }, + ) + syncReq.Response.NextBatch.PDUPosition = pduPos + + streamPosCh := make(chan streamPosResponse, len(allStreams)) + wg := sync.WaitGroup{} + wg.Add(len(allStreams)) + + // fan out stream calculations + for _, s := range allStreams { + go func(stream streams.StreamProvider) { + current, since := streams.IncrementalPositions(stream, rp.Notifier.CurrentPosition(), syncReq.Since) + streamPos := withTransaction( + since, + func(txn storage.DatabaseTransaction) types.StreamPosition { + return stream.IncrementalSync( + syncReq.Context, txn, syncReq, + since, current, + ) + }, + ) + streamPosCh <- streamPosResponse{provider: stream, pos: streamPos} + wg.Done() + }(s) + } + // Wait for all streams to finish their work + wg.Wait() + close(streamPosCh) + for resp := range streamPosCh { + syncReq.Response.NextBatch.ApplyUpdates(streams.ToToken(resp.provider, resp.pos)) } // it's possible for there to be no updates for this user even though since < current pos, // e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op