From d343b8fb2c1aca4aeebd7051718244ba237e5f61 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 31 Aug 2020 15:28:24 +0300 Subject: [PATCH] blind stab at adding a `peek` section to /sync --- docs/peeking.md | 2 +- syncapi/storage/shared/syncserver.go | 75 +++++++++++++++++++++++++- syncapi/storage/sqlite3/peeks_table.go | 26 ++++++--- syncapi/types/types.go | 8 ++- 4 files changed, 101 insertions(+), 10 deletions(-) diff --git a/docs/peeking.md b/docs/peeking.md index 35f1d9d83..78bd6f797 100644 --- a/docs/peeking.md +++ b/docs/peeking.md @@ -6,7 +6,7 @@ Implementationwise, this means: * Users call `/peek` and `/unpeek` on the clientapi from a given device. * The clientapi delegates these via HTTP to the roomserver, which coordinates peeking in general for a given room * The roomserver writes an NewPeek event into the kafka log headed to the syncserver - * The syncserver tracks the existence of the local peek in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peeking` section of the /sync response. + * The syncserver tracks the existence of the local peek in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response. Questions (given this is [my](https://github.com/ara4n) first time hacking on Dendrite): * The whole clientapi -> roomserver -> syncapi flow to initiate a peek seems very indirect. Is there a reason not to just let syncapi itself host the implementation of `/peek`? diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 257abf080..cf8dd604c 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -606,6 +606,8 @@ func (d *Database) IncrementalSync( } } + // TODO: handle EDUs in peeked rooms + err = d.addEDUDeltaToResponse( fromPos, toPos, joinedRoomIDs, res, ) @@ -742,6 +744,8 @@ func (d *Database) CompleteSync( return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err) } + // TODO: handle EDUs in peeked rooms + // Use a zero value SyncPosition for fromPos so all EDU states are added. err = d.addEDUDeltaToResponse( types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res, @@ -847,6 +851,14 @@ func (d *Database) addRoomDeltaToResponse( jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr + case gomatrixserverlib.Peek: + jr := types.NewJoinResponse() + + jr.Timeline.PrevBatch = prevBatch.String() + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Peek[delta.roomID] = *jr case gomatrixserverlib.Leave: fallthrough // transitions to leave are the same as ban case gomatrixserverlib.Ban: @@ -968,7 +980,7 @@ func (d *Database) getStateDeltas( // - Get all CURRENTLY joined rooms, and add them to 'joined' block. var deltas []stateDelta - // get all the state events ever between these two positions + // get all the state events ever (i.e. for all available rooms) between these two positions stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { return nil, nil, err @@ -978,6 +990,40 @@ func (d *Database) getStateDeltas( return nil, nil, err } + // find out which rooms this user is peeking, if any. + // We do this before joins so joins overwrite peeks + peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, device.DeviceID) + if err != nil { + return nil, nil, err + } + + // add peek blocks + for _, peek := range peeks { + if peek.New { + // send full room state down instead of a delta + var s []types.StreamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) + if err != nil { + return nil, nil, err + } + state[roomID] = s + } + + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Peek, + stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]), + roomID: peek.RoomID, + }) + } + + if len(peeks) > 0 { + err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.DeviceID) + if err != nil { + return nil, nil, err + } + } + + // handle newly joined rooms and non-joined rooms for roomID, stateStreamEvents := range state { for _, ev := range stateStreamEvents { // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. @@ -1038,8 +1084,13 @@ func (d *Database) getStateDeltasForFullStateSync( return nil, nil, err } + peeks, err = d.Peeks.SelectPeeks(ctx, txn, userID, device,ID) + if err != nil { + return nil, nil, err + } + // Use a reasonable initial capacity - deltas := make([]stateDelta, 0, len(joinedRoomIDs)) + deltas := make([]stateDelta, 0, len(joinedRoomIDs) + len(peeks)) // Add full states for all joined rooms for _, joinedRoomID := range joinedRoomIDs { @@ -1054,6 +1105,26 @@ func (d *Database) getStateDeltasForFullStateSync( }) } + // Add full states for all peeking rooms + for _, peek := range peeks { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) + if stateErr != nil { + return nil, nil, stateErr + } + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Peek, + stateEvents: d.StreamEventsToEvents(device, s), + roomID: peek.RoomID, + }) + } + + if len(peeks) > 0 { + err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.DeviceID) + if err != nil { + return nil, nil, err + } + } + // Get all the state events ever between these two positions stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { diff --git a/syncapi/storage/sqlite3/peeks_table.go b/syncapi/storage/sqlite3/peeks_table.go index 76df6dee3..7ac329f66 100644 --- a/syncapi/storage/sqlite3/peeks_table.go +++ b/syncapi/storage/sqlite3/peeks_table.go @@ -32,6 +32,7 @@ CREATE TABLE IF NOT EXISTS syncapi_peeks ( room_id TEXT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL, + new BOOL NOT NULL DEFAULT true, -- When the peek was created in UNIX epoch ms. creation_ts INTEGER NOT NULL, ); @@ -49,11 +50,14 @@ const deletePeekSQL = "" + "DELETE FROM syncapi_peeks WHERE room_id = $1 AND user_id = $2 and device_id = $3" const selectPeeksSQL == "" + - "SELECT room_id FROM syncapi_peeks WHERE user_id = $1 and device_id = $2" + "SELECT room_id, new FROM syncapi_peeks WHERE user_id = $1 and device_id = $2" const selectPeekingDevicesSQL == "" + "SELECT room_id, user_id, device_id FROM syncapi_peeks" +const markPeeksAsOldSQL == "" + + "UPDATE syncapi_peeks SET new=false WHERE user_id = $1 and device_id = $2" + type peekStatements struct { db *sql.DB insertPeekStmt *sql.Stmt @@ -82,6 +86,9 @@ func NewSqlitePeeksTable(db *sql.DB) (tables.Peeks, error) { if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil { return nil, err } + if s.markPeeksAsOldStmt, err = db.Prepare(markPeeksAsOldSQL); err != nil { + return nil, err + } return s, nil } @@ -110,7 +117,7 @@ func (s *peekStatements) DeletePeek( func (s *peekStatements) SelectPeeks( ctx context.Context, txn *sql.Tx, userID, deviceID string, -) (roomIDs []string, err error) { +) (peeks []Peek, err error) { rows, err := sqlutil.TxStmt(txn, s.selectPeeksStmt).QueryContext(ctx, userID, deviceID) if err != nil { return @@ -118,14 +125,21 @@ func (s *peekStatements) SelectPeeks( defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeks: rows.close() failed") for rows.Next() { - var roomID string - if err = rows.Scan(&roomId); err != nil { + peek = Peek{} + if err = rows.Scan(&peek.roomId, &peek.new); err != nil { return } - roomIDs = append(roomIDs, roomID) + peeks = append(peeks, peek) } - return roomIDs, rows.Err() + return peeks, rows.Err() +} + +func (s *peekStatements) MarkPeeksAsOld ( + ctx context.Context, txn *sql.Tx, userID, deviceID string, +) (err error) { + _, err := sqlutil.TxStmt(txn, s.markPeeksAsOldStmt).ExecContext(ctx, userID, deviceID) + return } func (s *peekStatements) SelectPeekingDevices( diff --git a/syncapi/types/types.go b/syncapi/types/types.go index fd28892f0..80a010904 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -388,6 +388,7 @@ type Response struct { } `json:"presence,omitempty"` Rooms struct { Join map[string]JoinResponse `json:"join"` + Peek map[string]JoinResponse `json:"peek"` Invite map[string]InviteResponse `json:"invite"` Leave map[string]LeaveResponse `json:"leave"` } `json:"rooms"` @@ -407,6 +408,7 @@ func NewResponse() *Response { // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. res.Rooms.Join = make(map[string]JoinResponse) + res.Rooms.Peek = make(map[string]JoinResponse) res.Rooms.Invite = make(map[string]InviteResponse) res.Rooms.Leave = make(map[string]LeaveResponse) @@ -433,7 +435,7 @@ func (r *Response) IsEmpty() bool { len(r.ToDevice.Events) == 0 } -// JoinResponse represents a /sync response for a room which is under the 'join' key. +// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key. type JoinResponse struct { State struct { Events []gomatrixserverlib.ClientEvent `json:"events"` @@ -513,3 +515,7 @@ type PeekingDevice struct { UserID string } +type Peek struct { + RoomID string + New boolean +} \ No newline at end of file