From fa0b055da207a8c4872019fe41908d511088f0eb Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 6 Apr 2017 17:07:33 +0100 Subject: [PATCH] Add some notes --- .../dendrite/clientapi/sync/requestpool.go | 22 ++++++++++++++++++- .../dendrite/clientapi/sync/syncserver.go | 3 +++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 155840af6..7b5abd3e2 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -73,7 +73,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } } -// OnNewEvent is called when a new event is received from the room server +// OnNewEvent is called when a new event is received from the room server. Must only be +// called from a single goroutine. func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { fmt.Println("OnNewEvent =>", ev.EventID(), " pos=", pos, " old_pos=", rp.currPos) rp.currPos = pos @@ -87,6 +88,25 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib. return []gomatrixserverlib.Event{}, nil } + // Steps: (no token) + // - get all rooms the user is joined to. + // - f.e room, get room state. + // - f.e room, get latest N messages. + // - rollback state by N messages. + + // Steps: (up-to-date token) + // - Wait for new event. + // - Check if event should notify user. + // - Notify user or continue waiting, eventually timing out. + + // Steps: (partial token, less than threshold) + // - Get rooms the user is joined to. + // - Get all events between token and now for those rooms. + // - Work out state and message delta and return. + + // Steps: (partial token, more than threshold (too expensive to do the above)) + // - Ignore for now, meaning this code path will be horrendously slow. + // TODO: wait for an event which affects this user or one of their rooms, then recheck for new // sync data. time.Sleep(req.timeout) diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 9095a9d67..892c163dd 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -49,6 +49,9 @@ func (s *Server) Start() error { return s.roomServerConsumer.Start() } +// onMessage is called when the sync server receives a new event from the room server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputRoomEvent