Implement incremental sync

This has the following known shortcomings:
 - It doesn't handle missing events not in the from/to range.
 - It doesn't order events in the timeline correctly.
 - It doesn't handle invited/left rooms at all.
 - There is no intelligent wakeup logic: events for Bob will wake up Alice's stream.
This commit is contained in:
Kegan Dougal 2017-04-19 15:12:27 +01:00
parent 22f923ba4f
commit 4ca802c6bb
4 changed files with 144 additions and 41 deletions

View file

@ -4,7 +4,9 @@ import (
"database/sql"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/syncserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -40,17 +42,21 @@ const selectEventsInRangeSQL = "" +
"SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2"
const selectRecentEventsSQL = "" +
"SELECT event_json FROM output_room_events WHERE room_id = $1 ORDER BY id DESC LIMIT $2"
"SELECT event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4"
const selectMaxIDSQL = "" +
"SELECT MAX(id) FROM output_room_events"
const selectStateInRangeSQL = "" +
"SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events WHERE id > $1 AND id < $2 AND add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxIDStmt *sql.Stmt
selectEventsInRangeStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
}
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@ -73,8 +79,101 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
return
}
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return
}
return
}
// StateBetween returns the state events between the two given stream positions, exclusive of both.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) {
rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos)
if err != nil {
return nil, err
}
// Fetch all the state change events for all rooms between the two positions then loop each event and:
// - Keep a cache of the event by ID (99% of state change events are for the event itself)
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
// if they aren't in the event ID cache. We don't handle state deletion yet.
eventIDToEvent := make(map[string]gomatrixserverlib.Event)
// RoomID => A set (map[string]bool) of state event IDs which are between the two positions
stateNeeded := make(map[string]map[string]bool)
for rows.Next() {
var (
eventBytes []byte
addIDs pq.StringArray
delIDs pq.StringArray
)
if err := rows.Scan(&eventBytes, &addIDs, &delIDs); err != nil {
return nil, err
}
// Sanity check for deleted state and whine if we see it. We don't need to do anything
// since it'll just mark the event as not being needed.
if len(addIDs) < len(delIDs) {
log.WithFields(log.Fields{
"since": oldPos,
"current": newPos,
"adds": addIDs,
"dels": delIDs,
}).Warn("StateBetween: ignoring deleted state")
}
// TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
if err != nil {
return nil, err
}
needSet := stateNeeded[ev.RoomID()]
if needSet == nil { // make set if required
needSet = make(map[string]bool)
}
for _, id := range delIDs {
needSet[id] = false
}
for _, id := range addIDs {
needSet[id] = true
}
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = ev
}
stateBetween, missingEvents := mapEventIDsToEvents(eventIDToEvent, stateNeeded)
if len(missingEvents) > 0 {
return nil, fmt.Errorf("error StateBetween: TODO missing events")
}
return stateBetween, nil
}
// convert the set of event IDs into a set of events. Mark any which are missing.
func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stateNeeded map[string]map[string]bool) (map[string][]gomatrixserverlib.Event, map[string][]string) {
stateBetween := make(map[string][]gomatrixserverlib.Event)
missingEvents := make(map[string][]string)
for roomID, ids := range stateNeeded {
events := stateBetween[roomID]
for id, need := range ids {
if !need {
continue // deleted state
}
e, ok := eventIDToEvent[id]
if ok {
events = append(events, e)
} else {
m := missingEvents[roomID]
m = append(m, id)
missingEvents[roomID] = m
}
}
stateBetween[roomID] = events
}
return stateBetween, missingEvents
}
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
// then this function should only ever be used at startup, as it will race with inserting events if it is
@ -92,27 +191,6 @@ func (s *outputRoomEventsStatements) MaxID(txn *sql.Tx) (id int64, err error) {
return
}
// InRange returns all the events in the range between oldPos exclusive and newPos inclusive. Returns an empty array if
// there are no events between the provided range. Returns an error if events are missing in the range.
func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) {
rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos)
if err != nil {
return nil, err
}
defer rows.Close()
result, err := rowsToEvents(rows)
if err != nil {
return nil, err
}
// Expect one event per position, exclusive of old. eg old=3, new=5, expect 4,5 so 2 events.
wantNum := int(newPos - oldPos)
if len(result) != wantNum {
return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", len(result), wantNum)
}
return result, nil
}
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
// of the inserted event.
func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) {
@ -123,8 +201,8 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser
}
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, limit int) ([]gomatrixserverlib.Event, error) {
rows, err := s.selectRecentEventsStmt.Query(roomID, limit)
func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]gomatrixserverlib.Event, error) {
rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
}

View file

@ -96,6 +96,36 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
return types.StreamPosition(id), nil
}
// IncrementalSync returns all the data needed in order to create an incremental sync response.
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) {
data = make(map[string]types.RoomData)
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
if err != nil {
return err
}
state, err := d.events.StateBetween(txn, fromPos, toPos)
if err != nil {
return err
}
for _, roomID := range roomIDs {
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
if err != nil {
return err
}
roomData := types.RoomData{
State: state[roomID],
RecentEvents: recentEvents,
}
data[roomID] = roomData
}
return nil
})
return
}
// CompleteSync returns all the data needed in order to create a complete sync response.
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
data = make(map[string]types.RoomData)
@ -121,7 +151,7 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, numRecentEventsPerRoom)
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom)
if err != nil {
return err
}
@ -135,11 +165,6 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
return
}
// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos.
func (d *SyncServerDatabase) EventsInRange(oldPos, newPos types.StreamPosition) ([]gomatrixserverlib.Event, error) {
return d.events.InRange(int64(oldPos), int64(newPos))
}
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
txn, err := db.Begin()
if err != nil {

View file

@ -139,21 +139,18 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err
// TODO: handle ignored users
evs, err := rp.db.EventsInRange(req.since, currentPos)
data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
if err != nil {
return nil, err
}
res := types.NewResponse(currentPos)
// for now, dump everything as join timeline events
for _, ev := range evs {
roomData := res.Rooms.Join[ev.RoomID()]
roomData.Timeline.Events = append(roomData.Timeline.Events, gomatrixserverlib.ToClientEvent(ev, gomatrixserverlib.FormatSync))
res.Rooms.Join[ev.RoomID()] = roomData
for roomID, d := range data {
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
// Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
// as an integer even though (at the moment) it is.
res.NextBatch = currentPos.String()
return res, nil
}

View file

@ -1,8 +1,9 @@
package types
import (
"github.com/matrix-org/gomatrixserverlib"
"strconv"
"github.com/matrix-org/gomatrixserverlib"
)
// StreamPosition represents the offset in the sync stream a client is at.
@ -38,6 +39,8 @@ type Response struct {
// NewResponse creates an empty response with initialised maps.
func NewResponse(pos StreamPosition) *Response {
res := Response{}
// Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
// as an integer even though (at the moment) it is.
res.NextBatch = pos.String()
// Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.