mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 23:13:11 -06:00
Remove forceBlock flag
This commit is contained in:
parent
5fce7e23b0
commit
1c7f9f929b
|
|
@ -99,10 +99,8 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForEvents blocks until there are new events for this request. If forceBlock is true, the request
|
// WaitForEvents blocks until there are new events for this request.
|
||||||
// will be forcibly waited until a new event wakes it up. This is typically only useful for testing
|
func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
||||||
// blocking code.
|
|
||||||
func (n *Notifier) WaitForEvents(req syncRequest, forceBlock bool) types.StreamPosition {
|
|
||||||
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
||||||
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
||||||
// - Incoming events wake requests for a matching room ID
|
// - Incoming events wake requests for a matching room ID
|
||||||
|
|
@ -110,8 +108,6 @@ func (n *Notifier) WaitForEvents(req syncRequest, forceBlock bool) types.StreamP
|
||||||
|
|
||||||
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
|
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
|
||||||
// but given we don't do /events, let's pretend it doesn't exist.
|
// but given we don't do /events, let's pretend it doesn't exist.
|
||||||
|
|
||||||
var hasBlocked bool
|
|
||||||
for {
|
for {
|
||||||
// In a guard, check if the /sync request should block, and block it until we get woken up
|
// In a guard, check if the /sync request should block, and block it until we get woken up
|
||||||
n.currPosMutex.RLock()
|
n.currPosMutex.RLock()
|
||||||
|
|
@ -122,15 +118,12 @@ func (n *Notifier) WaitForEvents(req syncRequest, forceBlock bool) types.StreamP
|
||||||
// with a pos which contains no new events for this user. We should probably re-wait for events
|
// with a pos which contains no new events for this user. We should probably re-wait for events
|
||||||
// automatically in this case.
|
// automatically in this case.
|
||||||
if req.since != currentPos {
|
if req.since != currentPos {
|
||||||
if !forceBlock || (forceBlock && hasBlocked) {
|
|
||||||
return currentPos
|
return currentPos
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// wait to be woken up, and then re-check the stream position
|
// wait to be woken up, and then re-check the stream position
|
||||||
req.log.WithField("user_id", req.userID).Info("Waiting for event")
|
req.log.WithField("user_id", req.userID).Info("Waiting for event")
|
||||||
n.blockUser(req.userID)
|
n.blockUser(req.userID)
|
||||||
hasBlocked = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
||||||
currentPos := rp.notifier.WaitForEvents(req, false)
|
currentPos := rp.notifier.WaitForEvents(req)
|
||||||
|
|
||||||
if req.since == types.StreamPosition(0) {
|
if req.since == types.StreamPosition(0) {
|
||||||
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
|
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue