From 3c43e514fb968ec83213a6086265270dfddd0837 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 7 Apr 2017 09:19:58 +0100 Subject: [PATCH] Return a chunk of events based on the since token Missing waiting aspect --- .../storage/output_room_events_table.go | 41 +++++++++++++++++-- .../dendrite/clientapi/storage/syncserver.go | 5 +++ .../dendrite/clientapi/sync/requestpool.go | 12 ++++-- 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go index bf799f609..1a6e6bf92 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go @@ -36,13 +36,17 @@ const insertEventSQL = "" + const selectEventsSQL = "" + "SELECT event_json FROM output_room_events WHERE event_id = ANY($1)" +const selectEventsInRangeSQL = "" + + "SELECT event_json FROM output_room_events WHERE id >= $1 AND id <= $2" + const selectMaxIDSQL = "" + "SELECT MAX(id) FROM output_room_events" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxIDStmt *sql.Stmt + selectEventsInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -59,6 +63,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil { return } + if s.selectEventsInRangeStmt, err = db.Prepare(selectEventsInRangeSQL); err != nil { + return + } return } @@ -69,6 +76,34 @@ func (s *outputRoomEventsStatements) MaxID() (id int64, err error) { return } +func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) { + rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []gomatrixserverlib.Event + var i int64 + for ; rows.Next(); i++ { + var eventBytes []byte + if err := rows.Scan(&eventBytes); err != nil { + return nil, err + } + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + result = append(result, ev) + } + // Expect one event per position, inclusive eg old=3, new=5, expect 3,4,5 so 3 events. + wantNum := (1 + newPos - oldPos) + if i != wantNum { + return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", i, wantNum) + } + return result, nil +} + // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) { err = txn.Stmt(s.insertEventStmt).QueryRow( diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index c9bff724b..380c6934e 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -90,6 +90,11 @@ func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) { return d.events.MaxID() } +// EventsInRange returns all events in the given range, inclusive. +func (d *SyncServerDatabase) EventsInRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) { + return d.events.InRange(oldPos, newPos) +} + func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil { diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 7b5abd3e2..2465be0c5 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -81,12 +81,18 @@ func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPos } func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) { + if req.since == rp.currPos { + // wait for new event + } + + return rp.db.EventsInRange(int64(req.since), int64(rp.currPos)) + // https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179 // Check if we are going to return immediately and if so, calculate the current // sync for this user and return. - if req.since == 0 || req.timeout == time.Duration(0) || req.wantFullState { - return []gomatrixserverlib.Event{}, nil - } + // if req.since == 0 || req.timeout == time.Duration(0) || req.wantFullState { + // return []gomatrixserverlib.Event{}, nil + // } // Steps: (no token) // - get all rooms the user is joined to.