This commit is contained in:
Erik Johnston 2017-10-12 10:00:14 +01:00
parent 59d99dcf3c
commit 119a8a2876
2 changed files with 6 additions and 1 deletions

View file

@ -100,7 +100,9 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
}
}
// WaitForEvents blocks until there are new events for this request.
// WaitForEvents blocks until there are events for this request after sincePos.
// In particular, it will return immediately if there are already events after
// sincePos for the request, but otherwise blocks waiting for new events.
func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) types.StreamPosition {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID

View file

@ -122,6 +122,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
func (rp *RequestPool) makeNotifyChannel(syncReq syncRequest, sincePos types.StreamPosition) chan types.StreamPosition {
notified := make(chan types.StreamPosition)
// TODO(#303): We need to ensure that WaitForEvents gets properly cancelled
// when the request is finished, or use some other mechanism to ensure we
// don't leak goroutines here
go (func() {
currentPos := rp.notifier.WaitForEvents(syncReq, sincePos)
notified <- currentPos