From df2e332ecb5ab0748874e775a97ac8e74f2ef224 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 9 Mar 2022 10:58:51 +0000 Subject: [PATCH] Convert stream positions into topological positions for both `from` and `to` in `/messages` --- syncapi/routing/messages.go | 49 ++++++++++--------- syncapi/storage/interface.go | 2 + .../output_room_events_topology_table.go | 28 ++++++++--- syncapi/storage/shared/syncserver.go | 10 ++++ .../output_room_events_topology_table.go | 30 +++++++++--- syncapi/storage/tables/interface.go | 2 + 6 files changed, 86 insertions(+), 35 deletions(-) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 9aef5db14..71bc5eb29 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -41,7 +41,6 @@ type messagesReq struct { roomID string from *types.TopologyToken to *types.TopologyToken - fromStream *types.StreamingToken device *userapi.Device wasToProvided bool backwardOrdering bool @@ -103,12 +102,17 @@ func OnIncomingMessagesRequest( from, err := types.NewTopologyTokenFromString(fromQuery) if err != nil { - fs, err2 := types.NewStreamTokenFromString(fromQuery) - fromStream = &fs - if err2 != nil { + var streamToken types.StreamingToken + if streamToken, err = types.NewStreamTokenFromString(fromQuery); err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err2.Error()), + JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()), + } + } else { + from, err = db.StreamToTopologicalPosition(req.Context(), streamToken.PDUPosition) + if err != nil { + logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) + return jsonerror.InternalServerError() } } } @@ -128,13 +132,23 @@ func OnIncomingMessagesRequest( // Pagination tokens. To is optional, and its default value depends on the // direction ("b" or "f"). var to types.TopologyToken + toQuery := req.URL.Query().Get("to") wasToProvided := true - if s := req.URL.Query().Get("to"); len(s) > 0 { - to, err = types.NewTopologyTokenFromString(s) + if len(toQuery) > 0 { + to, err = types.NewTopologyTokenFromString(toQuery) if err != nil { - return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()), + var streamToken types.StreamingToken + if streamToken, err = types.NewStreamTokenFromString(toQuery); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()), + } + } else { + to, err = db.StreamToTopologicalPosition(req.Context(), streamToken.PDUPosition) + if err != nil { + logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) + return jsonerror.InternalServerError() + } } } } else { @@ -168,7 +182,6 @@ func OnIncomingMessagesRequest( roomID: roomID, from: &from, to: &to, - fromStream: fromStream, wasToProvided: wasToProvided, filter: filter, backwardOrdering: backwardOrdering, @@ -251,17 +264,9 @@ func (r *messagesReq) retrieveEvents() ( eventFilter := r.filter // Retrieve the events from the local database. - var streamEvents []types.StreamEvent - if r.fromStream != nil { - toStream := r.to.StreamToken() - streamEvents, err = r.db.GetEventsInStreamingRange( - r.ctx, r.fromStream, &toStream, r.roomID, eventFilter, r.backwardOrdering, - ) - } else { - streamEvents, err = r.db.GetEventsInTopologicalRange( - r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering, - ) - } + streamEvents, err := r.db.GetEventsInTopologicalRange( + r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering, + ) if err != nil { err = fmt.Errorf("GetEventsInRange: %w", err) return diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index e44766338..5184aec14 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -149,4 +149,6 @@ type Database interface { SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) + + StreamToTopologicalPosition(ctx context.Context, streamPos types.StreamPosition) (types.TopologyToken, error) } diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 57774453c..a1ebe20a6 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -76,13 +76,17 @@ const selectMaxPositionInTopologySQL = "" + const deleteTopologyForRoomSQL = "" + "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1" +const selectStreamToTopologicalPositionSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE stream_position = $1" + type outputRoomEventsTopologyStatements struct { - insertEventInTopologyStmt *sql.Stmt - selectEventIDsInRangeASCStmt *sql.Stmt - selectEventIDsInRangeDESCStmt *sql.Stmt - selectPositionInTopologyStmt *sql.Stmt - selectMaxPositionInTopologyStmt *sql.Stmt - deleteTopologyForRoomStmt *sql.Stmt + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt + selectMaxPositionInTopologyStmt *sql.Stmt + deleteTopologyForRoomStmt *sql.Stmt + selectStreamToTopologicalPositionStmt *sql.Stmt } func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -109,6 +113,9 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil { return nil, err } + if s.selectStreamToTopologicalPositionStmt, err = db.Prepare(selectStreamToTopologicalPositionSQL); err != nil { + return nil, err + } return s, nil } @@ -170,6 +177,15 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( return } +// SelectStreamToTopologicalPosition returns the position of a given event +// in the topology of the room it belongs to from the given stream position. +func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition( + ctx context.Context, txn *sql.Tx, streamPos types.StreamPosition, +) (topoPos types.StreamPosition, err error) { + err = s.selectStreamToTopologicalPositionStmt.QueryRowContext(ctx, streamPos).Scan(&topoPos) + return +} + func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( ctx context.Context, txn *sql.Tx, roomID string, ) (pos types.StreamPosition, spos types.StreamPosition, err error) { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 87d7c6df7..9cae540b2 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -513,6 +513,16 @@ func (d *Database) EventPositionInTopology( return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } +func (d *Database) StreamToTopologicalPosition( + ctx context.Context, streamPos types.StreamPosition, +) (types.TopologyToken, error) { + topoPos, err := d.Topology.SelectStreamToTopologicalPosition(ctx, nil, streamPos) + if err != nil { + return types.TopologyToken{}, err + } + return types.TopologyToken{Depth: topoPos, PDUPosition: streamPos}, nil +} + func (d *Database) GetFilter( ctx context.Context, localpart string, filterID string, ) (*gomatrixserverlib.Filter, error) { diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index d34b90500..a04fdfb59 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -68,14 +68,18 @@ const selectMaxPositionInTopologySQL = "" + const deleteTopologyForRoomSQL = "" + "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1" +const selectStreamToTopologicalPositionSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE stream_position = $1" + type outputRoomEventsTopologyStatements struct { - db *sql.DB - insertEventInTopologyStmt *sql.Stmt - selectEventIDsInRangeASCStmt *sql.Stmt - selectEventIDsInRangeDESCStmt *sql.Stmt - selectPositionInTopologyStmt *sql.Stmt - selectMaxPositionInTopologyStmt *sql.Stmt - deleteTopologyForRoomStmt *sql.Stmt + db *sql.DB + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt + selectMaxPositionInTopologyStmt *sql.Stmt + deleteTopologyForRoomStmt *sql.Stmt + selectStreamToTopologicalPositionStmt *sql.Stmt } func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -104,6 +108,9 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil { return nil, err } + if s.selectStreamToTopologicalPositionStmt, err = db.Prepare(selectStreamToTopologicalPositionSQL); err != nil { + return nil, err + } return s, nil } @@ -163,6 +170,15 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( return } +// SelectStreamToTopologicalPosition returns the position of a given event +// in the topology of the room it belongs to from the given stream position. +func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition( + ctx context.Context, txn *sql.Tx, streamPos types.StreamPosition, +) (topoPos types.StreamPosition, err error) { + err = s.selectStreamToTopologicalPositionStmt.QueryRowContext(ctx, streamPos).Scan(&topoPos) + return +} + func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( ctx context.Context, txn *sql.Tx, roomID string, ) (pos types.StreamPosition, spos types.StreamPosition, err error) { diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 1ebb42651..615b49104 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -87,6 +87,8 @@ type Topology interface { SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error) // DeleteTopologyForRoom removes all topological information for a room. This should only be done when removing the room entirely. DeleteTopologyForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) + // SelectStreamToTopologicalPosition converts a stream position to a topological position. + SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, streamPos types.StreamPosition) (topoPos types.StreamPosition, err error) } type CurrentRoomState interface {