mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 14:33:10 -06:00
Return a chunk of events based on the since token
Missing waiting aspect
This commit is contained in:
parent
fa0b055da2
commit
3c43e514fb
|
|
@ -36,13 +36,17 @@ const insertEventSQL = "" +
|
|||
const selectEventsSQL = "" +
|
||||
"SELECT event_json FROM output_room_events WHERE event_id = ANY($1)"
|
||||
|
||||
const selectEventsInRangeSQL = "" +
|
||||
"SELECT event_json FROM output_room_events WHERE id >= $1 AND id <= $2"
|
||||
|
||||
const selectMaxIDSQL = "" +
|
||||
"SELECT MAX(id) FROM output_room_events"
|
||||
|
||||
type outputRoomEventsStatements struct {
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
selectMaxIDStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
selectMaxIDStmt *sql.Stmt
|
||||
selectEventsInRangeStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||
|
|
@ -59,6 +63,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectEventsInRangeStmt, err = db.Prepare(selectEventsInRangeSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -69,6 +76,34 @@ func (s *outputRoomEventsStatements) MaxID() (id int64, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) {
|
||||
rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var result []gomatrixserverlib.Event
|
||||
var i int64
|
||||
for ; rows.Next(); i++ {
|
||||
var eventBytes []byte
|
||||
if err := rows.Scan(&eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, ev)
|
||||
}
|
||||
// Expect one event per position, inclusive eg old=3, new=5, expect 3,4,5 so 3 events.
|
||||
wantNum := (1 + newPos - oldPos)
|
||||
if i != wantNum {
|
||||
return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", i, wantNum)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs.
|
||||
func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) {
|
||||
err = txn.Stmt(s.insertEventStmt).QueryRow(
|
||||
|
|
|
|||
|
|
@ -90,6 +90,11 @@ func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) {
|
|||
return d.events.MaxID()
|
||||
}
|
||||
|
||||
// EventsInRange returns all events in the given range, inclusive.
|
||||
func (d *SyncServerDatabase) EventsInRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) {
|
||||
return d.events.InRange(oldPos, newPos)
|
||||
}
|
||||
|
||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||
txn, err := db.Begin()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -81,12 +81,18 @@ func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPos
|
|||
}
|
||||
|
||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) {
|
||||
if req.since == rp.currPos {
|
||||
// wait for new event
|
||||
}
|
||||
|
||||
return rp.db.EventsInRange(int64(req.since), int64(rp.currPos))
|
||||
|
||||
// https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179
|
||||
// Check if we are going to return immediately and if so, calculate the current
|
||||
// sync for this user and return.
|
||||
if req.since == 0 || req.timeout == time.Duration(0) || req.wantFullState {
|
||||
return []gomatrixserverlib.Event{}, nil
|
||||
}
|
||||
// if req.since == 0 || req.timeout == time.Duration(0) || req.wantFullState {
|
||||
// return []gomatrixserverlib.Event{}, nil
|
||||
// }
|
||||
|
||||
// Steps: (no token)
|
||||
// - get all rooms the user is joined to.
|
||||
|
|
|
|||
Loading…
Reference in a new issue