diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 555245def..8929e7bf1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -99,8 +99,10 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit } } -// WaitForEvents blocks until there are new events for this request. -func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { +// WaitForEvents blocks until there are new events for this request. If forceBlock is true, the request +// will be forcibly waited until a new event wakes it up. This is typically only useful for testing +// blocking code. +func (n *Notifier) WaitForEvents(req syncRequest, forceBlock bool) types.StreamPosition { // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Incoming events wake requests for a matching room ID @@ -109,6 +111,7 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, // but given we don't do /events, let's pretend it doesn't exist. + var hasBlocked bool for { // In a guard, check if the /sync request should block, and block it until we get woken up n.currPosMutex.RLock() @@ -119,12 +122,15 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { // with a pos which contains no new events for this user. We should probably re-wait for events // automatically in this case. if req.since != currentPos { - return currentPos + if !forceBlock || (forceBlock && hasBlocked) { + return currentPos + } } // wait to be woken up, and then re-check the stream position req.log.WithField("user_id", req.userID).Info("Waiting for event") n.blockUser(req.userID) + hasBlocked = true } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index eee117e76..c6189c8b7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -99,7 +99,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { - currentPos := rp.notifier.WaitForEvents(req) + currentPos := rp.notifier.WaitForEvents(req, false) if req.since == types.StreamPosition(0) { pos, data, err := rp.db.CompleteSync(req.userID, req.limit)