diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 273b6aea1..4cb858b0e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -133,6 +133,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, + false, ) if err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 45952b727..22eb5503b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -49,7 +49,12 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( add_state_ids TEXT[], remove_state_ids TEXT[], device_id TEXT, -- The local device that sent the event, if any - transaction_id TEXT -- The transaction id used to send the event, if any + transaction_id TEXT, -- The transaction id used to send the event, if any + -- Should the event be excluded from responses to /sync requests. Useful for + -- events retrieved through backfilling that have a position in the stream + -- that relates to the moment these were retrieved rather than the moment these + -- were emitted. + exclude_from_sync BOOL DEFAULT FALSE ); -- for event selection CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); @@ -57,19 +62,24 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" + " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id, exclude_from_sync" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id" const selectEventsSQL = "" + - "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT id, event_json, exclude_from_sync FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + + "SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" +const selectRecentEventsForSyncSQL = "" + + "SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" + + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + + " ORDER BY id DESC LIMIT $4" + const selectEarlyEventsSQL = "" + - "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + + "SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id ASC LIMIT $4" @@ -78,18 +88,20 @@ const selectMaxEventIDSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT id, event_json, add_state_ids, remove_state_ids" + + "SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + + " AND exclude_from_sync = FALSE" + // So far, this request is only used for sync responses, so we can exclude unwanted events right away " ORDER BY id ASC" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectEarlyEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectRecentEventsForSyncStmt *sql.Stmt + selectEarlyEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -109,6 +121,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } + if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { + return + } if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil { return } @@ -142,12 +157,13 @@ func (s *outputRoomEventsStatements) selectStateInRange( for rows.Next() { var ( - streamPos int64 - eventBytes []byte - addIDs pq.StringArray - delIDs pq.StringArray + streamPos int64 + eventBytes []byte + excludeFromSync bool + addIDs pq.StringArray + delIDs pq.StringArray ) - if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -179,8 +195,9 @@ func (s *outputRoomEventsStatements) selectStateInRange( stateNeeded[ev.RoomID()] = needSet eventIDToEvent[ev.EventID()] = StreamEvent{ - Event: ev, - StreamPosition: types.StreamPosition(streamPos), + Event: ev, + StreamPosition: types.StreamPosition(streamPos), + ExcludeFromSync: excludeFromSync, } } @@ -207,7 +224,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID( func (s *outputRoomEventsStatements) insertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string, - transactionID *api.TransactionID, + transactionID *api.TransactionID, excludeFromSync bool, ) (streamPos int64, err error) { var deviceID, txnID *string if transactionID != nil { @@ -225,17 +242,26 @@ func (s *outputRoomEventsStatements) insertEvent( pq.StringArray(removeState), deviceID, txnID, + excludeFromSync, ).Scan(&streamPos) return } -// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. +// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'. +// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude +// from sync. func (s *outputRoomEventsStatements) selectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, - chronologicalOrder bool, + chronologicalOrder bool, onlySyncEvents bool, ) ([]StreamEvent, error) { - stmt := common.TxStmt(txn, s.selectRecentEventsStmt) + var stmt *sql.Stmt + if onlySyncEvents { + stmt = common.TxStmt(txn, s.selectRecentEventsForSyncStmt) + } else { + stmt = common.TxStmt(txn, s.selectRecentEventsStmt) + } + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if err != nil { return nil, err @@ -293,13 +319,14 @@ func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) { var result []StreamEvent for rows.Next() { var ( - streamPos int64 - eventBytes []byte - deviceID *string - txnID *string - transactionID *api.TransactionID + streamPos int64 + eventBytes []byte + excludeFromSync bool + deviceID *string + txnID *string + transactionID *api.TransactionID ) - if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &deviceID, &txnID); err != nil { return nil, err } // TODO: Handle redacted events @@ -316,9 +343,10 @@ func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) { } result = append(result, StreamEvent{ - Event: ev, - StreamPosition: types.StreamPosition(streamPos), - TransactionID: transactionID, + Event: ev, + StreamPosition: types.StreamPosition(streamPos), + TransactionID: transactionID, + ExcludeFromSync: excludeFromSync, }) } return result, nil diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index a37000c0c..e2764fbdd 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -43,8 +43,9 @@ type stateDelta struct { // position for this event. type StreamEvent struct { gomatrixserverlib.Event - StreamPosition types.StreamPosition - TransactionID *api.TransactionID + StreamPosition types.StreamPosition + TransactionID *api.TransactionID + ExcludeFromSync bool } // SyncServerDatabase represents a sync server database @@ -111,11 +112,13 @@ func (d *SyncServerDatabase) WriteEvent( ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, - transactionID *api.TransactionID, + transactionID *api.TransactionID, excludeFromSync bool, ) (streamPos types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID) + pos, err := d.events.insertEvent( + ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, + ) if err != nil { return err } @@ -198,7 +201,7 @@ func (d *SyncServerDatabase) GetEventsInRange( if backwardOrdering { // We need all events matching to < streamPos < from - return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false) + return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false, false) } // We need all events from < streamPos < to @@ -318,7 +321,7 @@ func (d *SyncServerDatabase) CompleteSync( var recentStreamEvents []StreamEvent recentStreamEvents, err = d.events.selectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), pos, - numRecentEventsPerRoom, true, + numRecentEventsPerRoom, true, true, ) if err != nil { return nil, err @@ -447,7 +450,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( endPos = delta.membershipPos } recentStreamEvents, err := d.events.selectRecentEvents( - ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, true, + ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, true, true, ) if err != nil { return err