From 4ca802c6bb0779c26038fbca3c933240b5e793a0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 19 Apr 2017 15:12:27 +0100 Subject: [PATCH] Implement incremental sync This has the following known shortcomings: - It doesn't handle missing events not in the from/to range. - It doesn't order events in the timeline correctly. - It doesn't handle invited/left rooms at all. - There is no intelligent wakeup logic: events for Bob will wake up Alice's stream. --- .../storage/output_room_events_table.go | 126 ++++++++++++++---- .../dendrite/syncserver/storage/syncserver.go | 37 ++++- .../dendrite/syncserver/sync/requestpool.go | 17 +-- .../dendrite/syncserver/types/types.go | 5 +- 4 files changed, 144 insertions(+), 41 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go index 5e9eb74d4..7c1e7e65b 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go @@ -4,7 +4,9 @@ import ( "database/sql" "fmt" + log "github.com/Sirupsen/logrus" "github.com/lib/pq" + "github.com/matrix-org/dendrite/syncserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -40,17 +42,21 @@ const selectEventsInRangeSQL = "" + "SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2" const selectRecentEventsSQL = "" + - "SELECT event_json FROM output_room_events WHERE room_id = $1 ORDER BY id DESC LIMIT $2" + "SELECT event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4" const selectMaxIDSQL = "" + "SELECT MAX(id) FROM output_room_events" +const selectStateInRangeSQL = "" + + "SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events WHERE id > $1 AND id < $2 AND add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt selectMaxIDStmt *sql.Stmt selectEventsInRangeStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -73,9 +79,102 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } + if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { + return + } return } +// StateBetween returns the state events between the two given stream positions, exclusive of both. +// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the +// two positions, only the most recent state is returned. +func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) { + rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos) + if err != nil { + return nil, err + } + // Fetch all the state change events for all rooms between the two positions then loop each event and: + // - Keep a cache of the event by ID (99% of state change events are for the event itself) + // - For each room ID, build up an array of event IDs which represents cumulative adds/removes + // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID + // if they aren't in the event ID cache. We don't handle state deletion yet. + eventIDToEvent := make(map[string]gomatrixserverlib.Event) + + // RoomID => A set (map[string]bool) of state event IDs which are between the two positions + stateNeeded := make(map[string]map[string]bool) + + for rows.Next() { + var ( + eventBytes []byte + addIDs pq.StringArray + delIDs pq.StringArray + ) + if err := rows.Scan(&eventBytes, &addIDs, &delIDs); err != nil { + return nil, err + } + // Sanity check for deleted state and whine if we see it. We don't need to do anything + // since it'll just mark the event as not being needed. + if len(addIDs) < len(delIDs) { + log.WithFields(log.Fields{ + "since": oldPos, + "current": newPos, + "adds": addIDs, + "dels": delIDs, + }).Warn("StateBetween: ignoring deleted state") + } + + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + needSet := stateNeeded[ev.RoomID()] + if needSet == nil { // make set if required + needSet = make(map[string]bool) + } + for _, id := range delIDs { + needSet[id] = false + } + for _, id := range addIDs { + needSet[id] = true + } + stateNeeded[ev.RoomID()] = needSet + + eventIDToEvent[ev.EventID()] = ev + } + + stateBetween, missingEvents := mapEventIDsToEvents(eventIDToEvent, stateNeeded) + + if len(missingEvents) > 0 { + return nil, fmt.Errorf("error StateBetween: TODO missing events") + } + return stateBetween, nil +} + +// convert the set of event IDs into a set of events. Mark any which are missing. +func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stateNeeded map[string]map[string]bool) (map[string][]gomatrixserverlib.Event, map[string][]string) { + stateBetween := make(map[string][]gomatrixserverlib.Event) + missingEvents := make(map[string][]string) + for roomID, ids := range stateNeeded { + events := stateBetween[roomID] + for id, need := range ids { + if !need { + continue // deleted state + } + e, ok := eventIDToEvent[id] + if ok { + events = append(events, e) + } else { + m := missingEvents[roomID] + m = append(m, id) + missingEvents[roomID] = m + } + } + stateBetween[roomID] = events + } + return stateBetween, missingEvents +} + // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. @@ -92,27 +191,6 @@ func (s *outputRoomEventsStatements) MaxID(txn *sql.Tx) (id int64, err error) { return } -// InRange returns all the events in the range between oldPos exclusive and newPos inclusive. Returns an empty array if -// there are no events between the provided range. Returns an error if events are missing in the range. -func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) { - rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos) - if err != nil { - return nil, err - } - defer rows.Close() - - result, err := rowsToEvents(rows) - if err != nil { - return nil, err - } - // Expect one event per position, exclusive of old. eg old=3, new=5, expect 4,5 so 2 events. - wantNum := int(newPos - oldPos) - if len(result) != wantNum { - return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", len(result), wantNum) - } - return result, nil -} - // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position // of the inserted event. func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) { @@ -123,8 +201,8 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser } // RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, limit int) ([]gomatrixserverlib.Event, error) { - rows, err := s.selectRecentEventsStmt.Query(roomID, limit) +func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]gomatrixserverlib.Event, error) { + rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go index fa2e6a42c..61bcbe110 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go @@ -96,6 +96,36 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) return types.StreamPosition(id), nil } +// IncrementalSync returns all the data needed in order to create an incremental sync response. +func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) { + data = make(map[string]types.RoomData) + returnErr = runTransaction(d.db, func(txn *sql.Tx) error { + roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") + if err != nil { + return err + } + + state, err := d.events.StateBetween(txn, fromPos, toPos) + if err != nil { + return err + } + + for _, roomID := range roomIDs { + recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom) + if err != nil { + return err + } + roomData := types.RoomData{ + State: state[roomID], + RecentEvents: recentEvents, + } + data[roomID] = roomData + } + return nil + }) + return +} + // CompleteSync returns all the data needed in order to create a complete sync response. func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) { data = make(map[string]types.RoomData) @@ -121,7 +151,7 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom } // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, numRecentEventsPerRoom) + recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom) if err != nil { return err } @@ -135,11 +165,6 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom return } -// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos. -func (d *SyncServerDatabase) EventsInRange(oldPos, newPos types.StreamPosition) ([]gomatrixserverlib.Event, error) { - return d.events.InRange(int64(oldPos), int64(newPos)) -} - func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go index 69dfaeaa2..2fee3be64 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go @@ -139,21 +139,18 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err // TODO: handle ignored users - evs, err := rp.db.EventsInRange(req.since, currentPos) + data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) if err != nil { return nil, err } res := types.NewResponse(currentPos) - // for now, dump everything as join timeline events - for _, ev := range evs { - roomData := res.Rooms.Join[ev.RoomID()] - roomData.Timeline.Events = append(roomData.Timeline.Events, gomatrixserverlib.ToClientEvent(ev, gomatrixserverlib.FormatSync)) - res.Rooms.Join[ev.RoomID()] = roomData + for roomID, d := range data { + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } - - // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this - // as an integer even though (at the moment) it is. - res.NextBatch = currentPos.String() return res, nil } diff --git a/src/github.com/matrix-org/dendrite/syncserver/types/types.go b/src/github.com/matrix-org/dendrite/syncserver/types/types.go index 7d904c146..663976d54 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncserver/types/types.go @@ -1,8 +1,9 @@ package types import ( - "github.com/matrix-org/gomatrixserverlib" "strconv" + + "github.com/matrix-org/gomatrixserverlib" ) // StreamPosition represents the offset in the sync stream a client is at. @@ -38,6 +39,8 @@ type Response struct { // NewResponse creates an empty response with initialised maps. func NewResponse(pos StreamPosition) *Response { res := Response{} + // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this + // as an integer even though (at the moment) it is. res.NextBatch = pos.String() // Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section, // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.