From f831abac55c2b89815ee88a9cea2e584002cc2b7 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 9 Mar 2022 11:47:07 +0000 Subject: [PATCH] Hopefully it works now --- syncapi/routing/messages.go | 62 ++++++++++--------- syncapi/storage/interface.go | 4 +- .../output_room_events_topology_table.go | 41 +++++++----- syncapi/storage/shared/syncserver.go | 35 +---------- .../output_room_events_topology_table.go | 43 +++++++------ syncapi/storage/tables/interface.go | 4 +- 6 files changed, 88 insertions(+), 101 deletions(-) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 71bc5eb29..84d375d1a 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -48,11 +48,10 @@ type messagesReq struct { } type messagesResp struct { - Start string `json:"start"` - StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token - End string `json:"end"` - Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` - State []gomatrixserverlib.ClientEvent `json:"state"` + Start string `json:"start"` + End string `json:"end"` + Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` + State []gomatrixserverlib.ClientEvent `json:"state"` } // OnIncomingMessagesRequest implements the /messages endpoint from the @@ -90,8 +89,8 @@ func OnIncomingMessagesRequest( // Extract parameters from the request's URL. // Pagination tokens. - var fromStream *types.StreamingToken fromQuery := req.URL.Query().Get("from") + toQuery := req.URL.Query().Get("to") emptyFromSupplied := fromQuery == "" if emptyFromSupplied { // NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided. @@ -100,23 +99,6 @@ func OnIncomingMessagesRequest( fromQuery = currPos.String() } - from, err := types.NewTopologyTokenFromString(fromQuery) - if err != nil { - var streamToken types.StreamingToken - if streamToken, err = types.NewStreamTokenFromString(fromQuery); err != nil { - return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()), - } - } else { - from, err = db.StreamToTopologicalPosition(req.Context(), streamToken.PDUPosition) - if err != nil { - logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) - return jsonerror.InternalServerError() - } - } - } - // Direction to return events from. dir := req.URL.Query().Get("dir") if dir != "b" && dir != "f" { @@ -129,10 +111,27 @@ func OnIncomingMessagesRequest( // to have one of the two accepted values (so dir == "f" <=> !backwardOrdering). backwardOrdering := (dir == "b") + from, err := types.NewTopologyTokenFromString(fromQuery) + if err != nil { + var streamToken types.StreamingToken + if streamToken, err = types.NewStreamTokenFromString(fromQuery); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()), + } + } else { + from, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering) + if err != nil { + logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) + return jsonerror.InternalServerError() + } + logrus.Infof("XXX: 'from' mapping %v to %v", fromQuery, from) + } + } + // Pagination tokens. To is optional, and its default value depends on the // direction ("b" or "f"). var to types.TopologyToken - toQuery := req.URL.Query().Get("to") wasToProvided := true if len(toQuery) > 0 { to, err = types.NewTopologyTokenFromString(toQuery) @@ -144,11 +143,12 @@ func OnIncomingMessagesRequest( JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()), } } else { - to, err = db.StreamToTopologicalPosition(req.Context(), streamToken.PDUPosition) + to, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering) if err != nil { logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) return jsonerror.InternalServerError() } + logrus.Infof("XXX: 'to' mapping %v to %v", toQuery, to) } } } else { @@ -188,6 +188,8 @@ func OnIncomingMessagesRequest( device: device, } + logrus.Infof("XXX: Retrieving events %v to %v", from, to) + clientEvents, start, end, err := mReq.retrieveEvents() if err != nil { util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed") @@ -228,9 +230,6 @@ func OnIncomingMessagesRequest( End: end.String(), State: state, } - if emptyFromSupplied { - res.StartStream = fromStream.String() - } // Respond with the events. return util.JSONResponse{ @@ -273,7 +272,12 @@ func (r *messagesReq) retrieveEvents() ( } var events []*gomatrixserverlib.HeaderedEvent - util.GetLogger(r.ctx).WithField("start", start).WithField("end", end).Infof("Fetched %d events locally", len(streamEvents)) + util.GetLogger(r.ctx).WithFields(logrus.Fields{ + "from": *r.from, + "to": *r.to, + "start": start, + "end": end, + }).Infof("Fetched %d events locally", len(streamEvents)) // There can be two reasons for streamEvents to be empty: either we've // reached the oldest event in the room (or the most recent one, depending diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 5184aec14..b6ac5be19 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -103,8 +103,6 @@ type Database interface { // DeletePeek deletes all peeks for a given room by a given user // Returns an error if there was a problem communicating with the database. DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error) - // GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit. - GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error) // GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit. GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // EventPositionInTopology returns the depth and stream position of the given event. @@ -150,5 +148,5 @@ type Database interface { SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) - StreamToTopologicalPosition(ctx context.Context, streamPos types.StreamPosition) (types.TopologyToken, error) + StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error) } diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index a1ebe20a6..fbeb90e64 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -50,14 +50,14 @@ const insertEventInTopologySQL = "" + const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + - "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position > $2 AND topological_position <= $3) OR" + "(topological_position = $4 AND stream_position <= $5)" + ") ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + - "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position >= $2 AND topological_position < $3) OR" + "(topological_position = $4 AND stream_position <= $5)" + ") ORDER BY topological_position DESC, stream_position DESC LIMIT $6" @@ -76,17 +76,21 @@ const selectMaxPositionInTopologySQL = "" + const deleteTopologyForRoomSQL = "" + "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1" -const selectStreamToTopologicalPositionSQL = "" + - "SELECT topological_position FROM syncapi_output_room_events_topology WHERE stream_position = $1" +const selectStreamToTopologicalPositionAscSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;" + +const selectStreamToTopologicalPositionDescSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;" type outputRoomEventsTopologyStatements struct { - insertEventInTopologyStmt *sql.Stmt - selectEventIDsInRangeASCStmt *sql.Stmt - selectEventIDsInRangeDESCStmt *sql.Stmt - selectPositionInTopologyStmt *sql.Stmt - selectMaxPositionInTopologyStmt *sql.Stmt - deleteTopologyForRoomStmt *sql.Stmt - selectStreamToTopologicalPositionStmt *sql.Stmt + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt + selectMaxPositionInTopologyStmt *sql.Stmt + deleteTopologyForRoomStmt *sql.Stmt + selectStreamToTopologicalPositionAscStmt *sql.Stmt + selectStreamToTopologicalPositionDescStmt *sql.Stmt } func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -113,7 +117,10 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil { return nil, err } - if s.selectStreamToTopologicalPositionStmt, err = db.Prepare(selectStreamToTopologicalPositionSQL); err != nil { + if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { + return nil, err + } + if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { return nil, err } return s, nil @@ -177,12 +184,16 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( return } -// SelectStreamToTopologicalPosition returns the position of a given event +// SelectStreamToTopologicalPosition returns the closest position of a given event // in the topology of the room it belongs to from the given stream position. func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition( - ctx context.Context, txn *sql.Tx, streamPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, backwardOrdering bool, ) (topoPos types.StreamPosition, err error) { - err = s.selectStreamToTopologicalPositionStmt.QueryRowContext(ctx, streamPos).Scan(&topoPos) + if backwardOrdering { + err = s.selectStreamToTopologicalPositionDescStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } else { + err = s.selectStreamToTopologicalPositionAscStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } return } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 9cae540b2..2f4fa0390 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -155,37 +155,6 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse return d.StreamEventsToEvents(nil, streamEvents), nil } -// GetEventsInStreamingRange retrieves all of the events on a given ordering using the -// given extremities and limit. -func (d *Database) GetEventsInStreamingRange( - ctx context.Context, - from, to *types.StreamingToken, - roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - r := types.Range{ - From: from.PDUPosition, - To: to.PDUPosition, - Backwards: backwardOrdering, - } - if backwardOrdering { - // When using backward ordering, we want the most recent events first. - if events, _, err = d.OutputEvents.SelectRecentEvents( - ctx, nil, roomID, r, eventFilter, false, false, - ); err != nil { - return - } - } else { - // When using forward ordering, we want the least recent events first. - if events, err = d.OutputEvents.SelectEarlyEvents( - ctx, nil, roomID, r, eventFilter, - ); err != nil { - return - } - } - return events, err -} - func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { return d.CurrentRoomState.SelectJoinedUsers(ctx) } @@ -514,9 +483,9 @@ func (d *Database) EventPositionInTopology( } func (d *Database) StreamToTopologicalPosition( - ctx context.Context, streamPos types.StreamPosition, + ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool, ) (types.TopologyToken, error) { - topoPos, err := d.Topology.SelectStreamToTopologicalPosition(ctx, nil, streamPos) + topoPos, err := d.Topology.SelectStreamToTopologicalPosition(ctx, nil, roomID, streamPos, backwardOrdering) if err != nil { return types.TopologyToken{}, err } diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index a04fdfb59..c4673786d 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -46,14 +46,14 @@ const insertEventInTopologySQL = "" + const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + - "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position > $2 AND topological_position <= $3) OR" + "(topological_position = $4 AND stream_position <= $5)" + ") ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + - "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position >= $2 AND topological_position < $3) OR" + "(topological_position = $4 AND stream_position <= $5)" + ") ORDER BY topological_position DESC, stream_position DESC LIMIT $6" @@ -65,21 +65,22 @@ const selectMaxPositionInTopologySQL = "" + "SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 ORDER BY stream_position DESC" -const deleteTopologyForRoomSQL = "" + - "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1" +const selectStreamToTopologicalPositionAscSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;" -const selectStreamToTopologicalPositionSQL = "" + - "SELECT topological_position FROM syncapi_output_room_events_topology WHERE stream_position = $1" +const selectStreamToTopologicalPositionDescSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;" type outputRoomEventsTopologyStatements struct { - db *sql.DB - insertEventInTopologyStmt *sql.Stmt - selectEventIDsInRangeASCStmt *sql.Stmt - selectEventIDsInRangeDESCStmt *sql.Stmt - selectPositionInTopologyStmt *sql.Stmt - selectMaxPositionInTopologyStmt *sql.Stmt - deleteTopologyForRoomStmt *sql.Stmt - selectStreamToTopologicalPositionStmt *sql.Stmt + db *sql.DB + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt + selectMaxPositionInTopologyStmt *sql.Stmt + deleteTopologyForRoomStmt *sql.Stmt + selectStreamToTopologicalPositionAscStmt *sql.Stmt + selectStreamToTopologicalPositionDescStmt *sql.Stmt } func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -105,10 +106,10 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { return nil, err } - if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil { + if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { return nil, err } - if s.selectStreamToTopologicalPositionStmt, err = db.Prepare(selectStreamToTopologicalPositionSQL); err != nil { + if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { return nil, err } return s, nil @@ -170,12 +171,16 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( return } -// SelectStreamToTopologicalPosition returns the position of a given event +// SelectStreamToTopologicalPosition returns the closest position of a given event // in the topology of the room it belongs to from the given stream position. func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition( - ctx context.Context, txn *sql.Tx, streamPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, backwardOrdering bool, ) (topoPos types.StreamPosition, err error) { - err = s.selectStreamToTopologicalPositionStmt.QueryRowContext(ctx, streamPos).Scan(&topoPos) + if backwardOrdering { + err = s.selectStreamToTopologicalPositionDescStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } else { + err = s.selectStreamToTopologicalPositionAscStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } return } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 615b49104..e6bc1addf 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -87,8 +87,8 @@ type Topology interface { SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error) // DeleteTopologyForRoom removes all topological information for a room. This should only be done when removing the room entirely. DeleteTopologyForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) - // SelectStreamToTopologicalPosition converts a stream position to a topological position. - SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, streamPos types.StreamPosition) (topoPos types.StreamPosition, err error) + // SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room. + SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error) } type CurrentRoomState interface {