From b353c039ff4fef9f240714bef9826c6302dde53b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Jan 2021 10:17:34 +0000 Subject: [PATCH] Wakeups largely working --- .../shared/{pdu_stream.go => stream_pdu.go} | 18 ++-- .../{typing_stream.go => stream_typing.go} | 12 +-- syncapi/storage/shared/syncserver.go | 3 +- .../{pdu_topology.go => topology_pdu.go} | 6 +- syncapi/sync/requestpool.go | 89 ++++++++++--------- syncapi/types/provider.go | 15 ++-- 6 files changed, 80 insertions(+), 63 deletions(-) rename syncapi/storage/shared/{pdu_stream.go => stream_pdu.go} (92%) rename syncapi/storage/shared/{typing_stream.go => stream_typing.go} (91%) rename syncapi/storage/shared/{pdu_topology.go => topology_pdu.go} (88%) diff --git a/syncapi/storage/shared/pdu_stream.go b/syncapi/storage/shared/stream_pdu.go similarity index 92% rename from syncapi/storage/shared/pdu_stream.go rename to syncapi/storage/shared/stream_pdu.go index 4aee8d5ee..664c85fae 100644 --- a/syncapi/storage/shared/pdu_stream.go +++ b/syncapi/storage/shared/stream_pdu.go @@ -46,13 +46,15 @@ func (p *PDUStreamProvider) StreamRange( device *userapi.Device, from, to types.StreamingToken, filter gomatrixserverlib.EventFilter, -) (newPos types.StreamPosition) { +) (newPos types.StreamingToken) { r := types.Range{ From: from.PDUPosition, To: to.PDUPosition, Backwards: from.IsAfter(to), } - newPos = from.PDUPosition + newPos = types.StreamingToken{ + PDUPosition: from.PDUPosition, + } var err error var events []types.StreamEvent @@ -96,8 +98,8 @@ func (p *PDUStreamProvider) StreamRange( ), ) - if event.StreamPosition > newPos { - newPos = event.StreamPosition + if event.StreamPosition > newPos.PDUPosition { + newPos.PDUPosition = event.StreamPosition } } @@ -123,7 +125,7 @@ func (p *PDUStreamProvider) StreamNotifyAfter( check := func() bool { p.latestMutex.RLock() defer p.latestMutex.RUnlock() - if from.PDUPosition > p.latest { + if p.latest > from.PDUPosition { close(ch) return true } @@ -167,9 +169,11 @@ func (p *PDUStreamProvider) StreamNotifyAfter( func (p *PDUStreamProvider) StreamLatestPosition( ctx context.Context, -) types.StreamPosition { +) types.StreamingToken { p.latestMutex.RLock() defer p.latestMutex.RUnlock() - return p.latest + return types.StreamingToken{ + PDUPosition: p.latest, + } } diff --git a/syncapi/storage/shared/typing_stream.go b/syncapi/storage/shared/stream_typing.go similarity index 91% rename from syncapi/storage/shared/typing_stream.go rename to syncapi/storage/shared/stream_typing.go index baac972a0..1e2f8e094 100644 --- a/syncapi/storage/shared/typing_stream.go +++ b/syncapi/storage/shared/stream_typing.go @@ -37,8 +37,8 @@ func (p *TypingStreamProvider) StreamRange( device *userapi.Device, from, to types.StreamingToken, filter gomatrixserverlib.EventFilter, -) types.StreamPosition { - return 0 +) types.StreamingToken { + return types.StreamingToken{} } func (p *TypingStreamProvider) StreamNotifyAfter( @@ -50,7 +50,7 @@ func (p *TypingStreamProvider) StreamNotifyAfter( check := func() bool { p.latestMutex.RLock() defer p.latestMutex.RUnlock() - if from.TypingPosition > p.latest { + if p.latest > from.TypingPosition { close(ch) return true } @@ -94,9 +94,11 @@ func (p *TypingStreamProvider) StreamNotifyAfter( func (p *TypingStreamProvider) StreamLatestPosition( ctx context.Context, -) types.StreamPosition { +) types.StreamingToken { p.latestMutex.RLock() defer p.latestMutex.RUnlock() - return p.latest + return types.StreamingToken{ + TypingPosition: p.latest, + } } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index a57f24720..c7d4035f3 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -354,7 +354,8 @@ func (d *Database) WriteEvent( return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err) } pduPosition = pos - d.PDUStreamProvider.StreamAdvance(pduPosition) + + d.PDUStreamProvider.StreamAdvance(pos) if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil { return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err) diff --git a/syncapi/storage/shared/pdu_topology.go b/syncapi/storage/shared/topology_pdu.go similarity index 88% rename from syncapi/storage/shared/pdu_topology.go rename to syncapi/storage/shared/topology_pdu.go index c2b024cde..80387f98d 100644 --- a/syncapi/storage/shared/pdu_topology.go +++ b/syncapi/storage/shared/topology_pdu.go @@ -22,10 +22,10 @@ func (p *PDUTopologyProvider) TopologyRange(ctx context.Context, res *types.Resp _ = events } -func (p *PDUTopologyProvider) TopologyLatestPosition(ctx context.Context, roomID string) types.StreamPosition { +func (p *PDUTopologyProvider) TopologyLatestPosition(ctx context.Context, roomID string) types.TopologyToken { token, err := p.DB.MaxTopologicalPosition(ctx, roomID) if err != nil { - return 0 + return types.TopologyToken{} } - return token.PDUPosition + return token } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 94f680ca6..7357cae33 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -41,15 +41,18 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - cfg *config.SyncAPI - userAPI userapi.UserInternalAPI - Notifier *Notifier - keyAPI keyapi.KeyInternalAPI - rsAPI roomserverAPI.RoomserverInternalAPI - lastseen sync.Map - pduStream types.StreamProvider - typingStream types.StreamProvider + db storage.Database + cfg *config.SyncAPI + userAPI userapi.UserInternalAPI + keyAPI keyapi.KeyInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI + lastseen sync.Map + pduStream types.StreamProvider + typingStream types.StreamProvider + receiptStream types.StreamProvider + sendToDeviceStream types.StreamProvider + inviteStream types.StreamProvider + deviceListStream types.StreamProvider } // NewRequestPool makes a new RequestPool @@ -59,15 +62,18 @@ func NewRequestPool( rsAPI roomserverAPI.RoomserverInternalAPI, ) *RequestPool { rp := &RequestPool{ - db: db, - cfg: cfg, - userAPI: userAPI, - Notifier: n, - keyAPI: keyAPI, - rsAPI: rsAPI, - lastseen: sync.Map{}, - pduStream: db.PDUStream(), - typingStream: db.TypingStream(), + db: db, + cfg: cfg, + userAPI: userAPI, + keyAPI: keyAPI, + rsAPI: rsAPI, + lastseen: sync.Map{}, + pduStream: db.PDUStream(), + typingStream: db.TypingStream(), + receiptStream: nil, // TODO + sendToDeviceStream: nil, // TODO + inviteStream: nil, // TODO + deviceListStream: nil, // TODO } go rp.cleanLastSeen() return rp @@ -168,10 +174,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. syncData = types.NewResponse() filter := gomatrixserverlib.DefaultEventFilter() - syncData.NextBatch = types.StreamingToken{ - PDUPosition: rp.pduStream.StreamLatestPosition(syncReq.ctx), - TypingPosition: rp.typingStream.StreamLatestPosition(syncReq.ctx), - } waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() @@ -181,31 +183,35 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. defer timer.Stop() select { - case <-syncReq.ctx.Done(): - // Caller gave up - logger.Println("Context expired") - return util.JSONResponse{ - Code: http.StatusOK, - JSON: syncData, - } + case <-syncReq.ctx.Done(): // Caller gave up + return util.JSONResponse{Code: http.StatusOK, JSON: syncData} - case <-timer.C: - // Timeout reached - logger.Println("Timed out") - return util.JSONResponse{ - Code: http.StatusOK, - JSON: syncData, - } + case <-timer.C: // Timeout reached + return util.JSONResponse{Code: http.StatusOK, JSON: syncData} case <-rp.pduStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): - logger.Println("PDU stream awake") case <-rp.typingStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): - logger.Println("Typing stream awake") + // case <-rp.receiptStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): + // case <-rp.sendToDeviceStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): + // case <-rp.inviteStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): + // case <-rp.deviceListStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): } } - syncData.NextBatch.PDUPosition = rp.pduStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, syncData.NextBatch, filter) - syncData.NextBatch.TypingPosition = rp.typingStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, syncData.NextBatch, filter) + var latest types.StreamingToken + latest.ApplyUpdates(rp.pduStream.StreamLatestPosition(syncReq.ctx)) + latest.ApplyUpdates(rp.typingStream.StreamLatestPosition(syncReq.ctx)) + // latest.ApplyUpdates(rp.receiptStream.StreamLatestPosition(syncReq.ctx)) + // latest.ApplyUpdates(rp.sendToDeviceStream.StreamLatestPosition(syncReq.ctx)) + // latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.ctx)) + // latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.ctx)) + + syncData.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) + syncData.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) + // syncData.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) + // syncData.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) + // syncData.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) + // syncData.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) return util.JSONResponse{ Code: http.StatusOK, @@ -333,7 +339,7 @@ func (rp *RequestPool) appendDeviceLists( return data, nil } -// nolint:gocyclo +/* func (rp *RequestPool) appendAccountData( data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, accountDataFilter *gomatrixserverlib.EventFilter, @@ -443,6 +449,7 @@ func (rp *RequestPool) appendAccountData( return data, nil } +*/ // shouldReturnImmediately returns whether the /sync request is an initial sync, // or timeout=0, or full_state=true, in any of the cases the request should diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 3f81557fc..0c6c2ea65 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -9,19 +9,22 @@ import ( type StreamProvider interface { StreamSetup() + + // StreamAdvance will update the latest position of the stream based on + // an update and will wake callers waiting on StreamNotifyAfter. StreamAdvance(latest StreamPosition) - // Range will update the response to include all updates between + // StreamRange will update the response to include all updates between // the from and to sync positions. It will always return immediately, // making no changes if the range contains no updates. - StreamRange(ctx context.Context, res *Response, device *userapi.Device, from, to StreamingToken, filter gomatrixserverlib.EventFilter) StreamPosition + StreamRange(ctx context.Context, res *Response, device *userapi.Device, from, to StreamingToken, filter gomatrixserverlib.EventFilter) StreamingToken - // NotifyAfter returns a channel which will be closed once the + // StreamNotifyAfter returns a channel which will be closed once the // stream advances past the "from" position. StreamNotifyAfter(ctx context.Context, from StreamingToken) chan struct{} - // LatestPosition returns the latest stream position for this stream. - StreamLatestPosition(ctx context.Context) StreamPosition + // StreamLatestPosition returns the latest stream position for this stream. + StreamLatestPosition(ctx context.Context) StreamingToken } type TopologyProvider interface { @@ -33,5 +36,5 @@ type TopologyProvider interface { // LatestPosition returns the latest stream position for this stream // for the given room. - TopologyLatestPosition(ctx context.Context, roomID string) StreamPosition + TopologyLatestPosition(ctx context.Context, roomID string) TopologyToken }