From a378a3a78dcab5c99b2fff6d14170f3458dea3df Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Wed, 12 Jul 2023 10:56:18 +0200 Subject: [PATCH] Return start/end from DB, only hit DB again if we backfilled --- syncapi/routing/messages.go | 46 +++++++---------- syncapi/storage/interface.go | 2 +- .../output_room_events_topology_table.go | 24 ++++++--- syncapi/storage/shared/storage_sync.go | 18 +------ .../output_room_events_topology_table.go | 23 +++++++-- syncapi/storage/storage_test.go | 15 +++--- syncapi/storage/tables/interface.go | 6 +-- syncapi/storage/tables/topology_test.go | 50 +++++++++++-------- 8 files changed, 99 insertions(+), 85 deletions(-) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 6c232c7fc..674f5105e 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -16,14 +16,12 @@ package routing import ( "context" - "errors" "fmt" "math" "net/http" "sort" "time" - "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" @@ -55,6 +53,7 @@ type messagesReq struct { wasToProvided bool backwardOrdering bool filter *synctypes.RoomEventFilter + didBackfill bool } type messagesResp struct { @@ -265,6 +264,7 @@ func OnIncomingMessagesRequest( "backwards": backwardOrdering, "response_start": start.String(), "response_end": end.String(), + "backfilled": mReq.didBackfill, }).Info("Responding") res := messagesResp{ @@ -325,25 +325,12 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv ) { emptyToken := types.TopologyToken{} // Retrieve the events from the local database. - streamEvents, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering) + streamEvents, start, end, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering) if err != nil { - // While there are events in the topological range, the provided filter - // removed all of them. This means there are no events for this user - // anymore. Let them know. - if errors.Is(err, shared.ErrNoEventsForFilter) { - if len(streamEvents) == 0 { - return []synctypes.ClientEvent{}, *r.from, emptyToken, nil - } - // We possibly received events, try to get start/end from them - start, end, err = r.getStartEnd(r.snapshot.StreamEventsToEvents(ctx, r.device, streamEvents, rsAPI)) - if err != nil { - return []synctypes.ClientEvent{}, *r.from, *r.to, err - } - return []synctypes.ClientEvent{}, start, end, nil - } err = fmt.Errorf("GetEventsInRange: %w", err) return []synctypes.ClientEvent{}, *r.from, emptyToken, err } + end.Decrement() var events []*rstypes.HeaderedEvent util.GetLogger(r.ctx).WithFields(logrus.Fields{ @@ -388,19 +375,23 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv return []synctypes.ClientEvent{}, *r.from, emptyToken, nil } - // Get the position of the first and the last event in the room's topology. - // This position is currently determined by the event's depth, so we could - // also use it instead of retrieving from the database. However, if we ever - // change the way topological positions are defined (as depth isn't the most - // reliable way to define it), it would be easier and less troublesome to - // only have to change it in one place, i.e. the database. - start, end, err = r.getStartEnd(filteredEvents) - if err != nil { - return []synctypes.ClientEvent{}, *r.from, *r.to, err + // If we backfilled in the process of getting events, we need + // to re-fetch the start/end positions + if r.didBackfill { + start, end, err = r.getStartEnd(filteredEvents) + if err != nil { + return []synctypes.ClientEvent{}, *r.from, *r.to, err + } } // Sort the events to ensure we send them in the right order. if r.backwardOrdering { + if events[len(events)-1].Type() == spec.MRoomCreate { + // NOTSPEC: We've hit the beginning of the room so there's really nowhere + // else to go. This seems to fix Element iOS from looping on /messages endlessly. + end = types.TopologyToken{} + } + // This reverses the array from old->new to new->old reversed := func(in []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent { out := make([]*rstypes.HeaderedEvent, len(in)) @@ -467,6 +458,7 @@ func (r *messagesReq) handleEmptyEventsSlice() ( if err != nil { return } + r.didBackfill = true } else { // If not, it means the slice was empty because we reached the room's // creation, so return an empty slice. @@ -516,7 +508,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent if err != nil { return } - + r.didBackfill = true // Append the PDUs to the list to send back to the client. events = append(events, pdus...) } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 913049e83..dca5d1a14 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -85,7 +85,7 @@ type DatabaseTransaction interface { // If backwardsOrdering is true, the most recent event must be first, else last. // Returns the filtered StreamEvents on success. Returns **unfiltered** StreamEvents and ErrNoEventsForFilter if // the provided filter removed all events, this can be used to still calculate the start/end position. (e.g for `/messages`) - GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, filter *synctypes.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error) + GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, filter *synctypes.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, start, end types.TopologyToken, err error) // EventPositionInTopology returns the depth and stream position of the given event. EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error) // BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events. diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 7140a92fc..b281f3300 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -48,14 +48,14 @@ const insertEventInTopologySQL = "" + " RETURNING topological_position" const selectEventIDsInRangeASCSQL = "" + - "SELECT event_id FROM syncapi_output_room_events_topology" + + "SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + "(topological_position > $2 AND topological_position < $3) OR" + "(topological_position = $4 AND stream_position >= $5)" + ") ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + - "SELECT event_id FROM syncapi_output_room_events_topology" + + "SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + "(topological_position > $2 AND topological_position < $3) OR" + "(topological_position = $4 AND stream_position <= $5)" + @@ -113,12 +113,13 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( } // SelectEventIDsInRange selects the IDs of events which positions are within a -// given range in a given room's topological order. +// given range in a given room's topological order. Returns the start/end topological tokens for +// the returned eventIDs. // Returns an empty slice if no events match the given range. func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool, -) (eventIDs []string, err error) { +) (eventIDs []string, start, end types.TopologyToken, err error) { // Decide on the selection's order according to whether chronological order // is requested or not. var stmt *sql.Stmt @@ -132,7 +133,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit) if err == sql.ErrNoRows { // If no event matched the request, return an empty slice. - return []string{}, nil + return []string{}, start, end, nil } else if err != nil { return } @@ -140,14 +141,23 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( // Return the IDs. var eventID string + var token types.TopologyToken + var tokens []types.TopologyToken for rows.Next() { - if err = rows.Scan(&eventID); err != nil { + if err = rows.Scan(&eventID, &token.Depth, &token.PDUPosition); err != nil { return } eventIDs = append(eventIDs, eventID) + tokens = append(tokens, token) } - return eventIDs, rows.Err() + // The values are already ordered by SQL, so we can use them as is. + if len(tokens) > 0 { + start = tokens[0] + end = tokens[len(tokens)-1] + } + + return eventIDs, start, end, rows.Err() } // SelectPositionInTopology returns the position of a given event in the diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index ddfd2465c..cd17fdc69 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -3,7 +3,6 @@ package shared import ( "context" "database/sql" - "errors" "fmt" "math" @@ -232,15 +231,13 @@ func (d *DatabaseTransaction) GetAccountDataInRange( return d.AccountData.SelectAccountDataInRange(ctx, d.txn, userID, r, accountDataFilterPart) } -var ErrNoEventsForFilter = errors.New("no events returned using the provided filter") - func (d *DatabaseTransaction) GetEventsInTopologicalRange( ctx context.Context, from, to *types.TopologyToken, roomID string, filter *synctypes.RoomEventFilter, backwardOrdering bool, -) (events []types.StreamEvent, err error) { +) (events []types.StreamEvent, start, end types.TopologyToken, err error) { var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition if backwardOrdering { // Backward ordering means the 'from' token has a higher depth than the 'to' token @@ -258,7 +255,7 @@ func (d *DatabaseTransaction) GetEventsInTopologicalRange( // Select the event IDs from the defined range. var eIDs []string - eIDs, err = d.Topology.SelectEventIDsInRange( + eIDs, start, end, err = d.Topology.SelectEventIDsInRange( ctx, d.txn, roomID, minDepth, maxDepth, maxStreamPosForMaxDepth, filter.Limit, !backwardOrdering, ) if err != nil { @@ -270,17 +267,6 @@ func (d *DatabaseTransaction) GetEventsInTopologicalRange( if err != nil { return } - // Check if we should be able to return events. - // If we received 0 events, this most likely means that the provided filter removed them. - if len(eIDs) > 0 && len(events) == 0 { - // We try to fetch the events without a filter, so we can tell the client if there - // are more events earlier than the requested and filtered. - events, err = d.OutputEvents.SelectEvents(ctx, d.txn, eIDs, nil, true) - if err != nil { - return - } - return events, ErrNoEventsForFilter - } return } diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index 68b75f5b1..614e1df9e 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -44,14 +44,14 @@ const insertEventInTopologySQL = "" + " ON CONFLICT DO NOTHING" const selectEventIDsInRangeASCSQL = "" + - "SELECT event_id FROM syncapi_output_room_events_topology" + + "SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + "(topological_position > $2 AND topological_position < $3) OR" + "(topological_position = $4 AND stream_position >= $5)" + ") ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + - "SELECT event_id FROM syncapi_output_room_events_topology" + + "SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + "(topological_position > $2 AND topological_position < $3) OR" + "(topological_position = $4 AND stream_position <= $5)" + @@ -111,11 +111,15 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( return types.StreamPosition(event.Depth()), err } +// SelectEventIDsInRange selects the IDs of events which positions are within a +// given range in a given room's topological order. Returns the start/end topological tokens for +// the returned eventIDs. +// Returns an empty slice if no events match the given range. func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool, -) (eventIDs []string, err error) { +) (eventIDs []string, start, end types.TopologyToken, err error) { // Decide on the selection's order according to whether chronological order // is requested or not. var stmt *sql.Stmt @@ -129,18 +133,27 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit) if err == sql.ErrNoRows { // If no event matched the request, return an empty slice. - return []string{}, nil + return []string{}, start, end, nil } else if err != nil { return } // Return the IDs. var eventID string + var token types.TopologyToken + var tokens []types.TopologyToken for rows.Next() { - if err = rows.Scan(&eventID); err != nil { + if err = rows.Scan(&eventID, &token.Depth, &token.PDUPosition); err != nil { return } eventIDs = append(eventIDs, eventID) + tokens = append(tokens, token) + } + + // The values are already ordered by SQL, so we can use them as is. + if len(tokens) > 0 { + start = tokens[0] + end = tokens[len(tokens)-1] } return diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 26f661aff..ce7ca3fc7 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -14,7 +14,6 @@ import ( rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/test" @@ -214,12 +213,14 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { // backpaginate 5 messages starting at the latest position. filter := &synctypes.RoomEventFilter{Limit: 5} - paginatedEvents, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true) + paginatedEvents, start, end, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true) if err != nil { t.Fatalf("GetEventsInTopologicalRange returned an error: %s", err) } gots := snapshot.StreamEventsToEvents(context.Background(), nil, paginatedEvents, nil) test.AssertEventsEqual(t, gots, test.Reversed(events[len(events)-5:])) + assert.Equal(t, types.TopologyToken{Depth: 15, PDUPosition: 15}, start) + assert.Equal(t, types.TopologyToken{Depth: 11, PDUPosition: 11}, end) }) }) } @@ -248,10 +249,12 @@ func TestGetEventsInRangeWithTopologyTokenNoEventsForFilter(t *testing.T) { notTypes := []string{spec.MRoomRedaction} senders := []string{alice.ID} filter := &synctypes.RoomEventFilter{Limit: 20, NotTypes: ¬Types, Senders: &senders} - paginatedEvents, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true) - assert.Equal(t, shared.ErrNoEventsForFilter, err) - gots := snapshot.StreamEventsToEvents(context.Background(), nil, paginatedEvents, nil) - test.AssertEventsEqual(t, gots, test.Reversed(events[len(events)-15:])) + paginatedEvents, start, end, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true) + assert.NoError(t, err) + assert.Equal(t, 0, len(paginatedEvents)) + // Even if we didn't get anything back due to the filter, we should still have start/end + assert.Equal(t, types.TopologyToken{Depth: 15, PDUPosition: 15}, start) + assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 1}, end) }) }) } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 854292bd2..f5c66c42d 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -89,11 +89,11 @@ type Topology interface { // InsertEventInTopology inserts the given event in the room's topology, based on the event's depth. // `pos` is the stream position of this event in the events table, and is used to order events which have the same depth. InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error) - // SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order. - // Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`. + // SelectEventIDsInRange selects the IDs and the topological position of events whose depths are within a given range in a given room's topological order. + // Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`. Returns the eventIDs and start/end topological tokens. // `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned. // Returns an empty slice if no events match the given range. - SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error) + SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, start, end types.TopologyToken, err error) // SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to. SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error) // SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room. diff --git a/syncapi/storage/tables/topology_test.go b/syncapi/storage/tables/topology_test.go index f4f75bdf3..7691cc5f8 100644 --- a/syncapi/storage/tables/topology_test.go +++ b/syncapi/storage/tables/topology_test.go @@ -13,6 +13,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/test" + "github.com/stretchr/testify/assert" ) func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.DB, func()) { @@ -60,28 +61,37 @@ func TestTopologyTable(t *testing.T) { highestPos = topoPos + 1 } // check ordering works without limit - eventIDs, err := tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, true) - if err != nil { - return fmt.Errorf("failed to SelectEventIDsInRange: %s", err) - } + eventIDs, start, end, err := tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, true) + assert.NoError(t, err, "failed to SelectEventIDsInRange") test.AssertEventIDsEqual(t, eventIDs, events[:]) - eventIDs, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, false) - if err != nil { - return fmt.Errorf("failed to SelectEventIDsInRange: %s", err) - } - test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[:])) - // check ordering works with limit - eventIDs, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, true) - if err != nil { - return fmt.Errorf("failed to SelectEventIDsInRange: %s", err) - } - test.AssertEventIDsEqual(t, eventIDs, events[:3]) - eventIDs, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, false) - if err != nil { - return fmt.Errorf("failed to SelectEventIDsInRange: %s", err) - } - test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[len(events)-3:])) + assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 0}, start) + assert.Equal(t, types.TopologyToken{Depth: 5, PDUPosition: 4}, end) + eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, false) + assert.NoError(t, err, "failed to SelectEventIDsInRange") + test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[:])) + assert.Equal(t, types.TopologyToken{Depth: 5, PDUPosition: 4}, start) + assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 0}, end) + + // check ordering works with limit + eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, true) + assert.NoError(t, err, "failed to SelectEventIDsInRange") + test.AssertEventIDsEqual(t, eventIDs, events[:3]) + assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 0}, start) + assert.Equal(t, types.TopologyToken{Depth: 3, PDUPosition: 2}, end) + + eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, false) + assert.NoError(t, err, "failed to SelectEventIDsInRange") + test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[len(events)-3:])) + assert.Equal(t, types.TopologyToken{Depth: 5, PDUPosition: 4}, start) + assert.Equal(t, types.TopologyToken{Depth: 3, PDUPosition: 2}, end) + + // Check that we return no values for invalid rooms + eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, "!doesnotexist:localhost", 0, highestPos, highestPos, 10, false) + assert.NoError(t, err, "failed to SelectEventIDsInRange") + assert.Equal(t, 0, len(eventIDs)) + assert.Equal(t, types.TopologyToken{}, start) + assert.Equal(t, types.TopologyToken{}, end) return nil }) if err != nil {