mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-09 15:13:12 -06:00
Return start/end from DB, only hit DB again if we backfilled
This commit is contained in:
parent
307effe4aa
commit
a378a3a78d
|
|
@ -16,14 +16,12 @@ package routing
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -55,6 +53,7 @@ type messagesReq struct {
|
|||
wasToProvided bool
|
||||
backwardOrdering bool
|
||||
filter *synctypes.RoomEventFilter
|
||||
didBackfill bool
|
||||
}
|
||||
|
||||
type messagesResp struct {
|
||||
|
|
@ -265,6 +264,7 @@ func OnIncomingMessagesRequest(
|
|||
"backwards": backwardOrdering,
|
||||
"response_start": start.String(),
|
||||
"response_end": end.String(),
|
||||
"backfilled": mReq.didBackfill,
|
||||
}).Info("Responding")
|
||||
|
||||
res := messagesResp{
|
||||
|
|
@ -325,25 +325,12 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv
|
|||
) {
|
||||
emptyToken := types.TopologyToken{}
|
||||
// Retrieve the events from the local database.
|
||||
streamEvents, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
|
||||
streamEvents, start, end, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
|
||||
if err != nil {
|
||||
// While there are events in the topological range, the provided filter
|
||||
// removed all of them. This means there are no events for this user
|
||||
// anymore. Let them know.
|
||||
if errors.Is(err, shared.ErrNoEventsForFilter) {
|
||||
if len(streamEvents) == 0 {
|
||||
return []synctypes.ClientEvent{}, *r.from, emptyToken, nil
|
||||
}
|
||||
// We possibly received events, try to get start/end from them
|
||||
start, end, err = r.getStartEnd(r.snapshot.StreamEventsToEvents(ctx, r.device, streamEvents, rsAPI))
|
||||
if err != nil {
|
||||
return []synctypes.ClientEvent{}, *r.from, *r.to, err
|
||||
}
|
||||
return []synctypes.ClientEvent{}, start, end, nil
|
||||
}
|
||||
err = fmt.Errorf("GetEventsInRange: %w", err)
|
||||
return []synctypes.ClientEvent{}, *r.from, emptyToken, err
|
||||
}
|
||||
end.Decrement()
|
||||
|
||||
var events []*rstypes.HeaderedEvent
|
||||
util.GetLogger(r.ctx).WithFields(logrus.Fields{
|
||||
|
|
@ -388,19 +375,23 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv
|
|||
return []synctypes.ClientEvent{}, *r.from, emptyToken, nil
|
||||
}
|
||||
|
||||
// Get the position of the first and the last event in the room's topology.
|
||||
// This position is currently determined by the event's depth, so we could
|
||||
// also use it instead of retrieving from the database. However, if we ever
|
||||
// change the way topological positions are defined (as depth isn't the most
|
||||
// reliable way to define it), it would be easier and less troublesome to
|
||||
// only have to change it in one place, i.e. the database.
|
||||
start, end, err = r.getStartEnd(filteredEvents)
|
||||
if err != nil {
|
||||
return []synctypes.ClientEvent{}, *r.from, *r.to, err
|
||||
// If we backfilled in the process of getting events, we need
|
||||
// to re-fetch the start/end positions
|
||||
if r.didBackfill {
|
||||
start, end, err = r.getStartEnd(filteredEvents)
|
||||
if err != nil {
|
||||
return []synctypes.ClientEvent{}, *r.from, *r.to, err
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the events to ensure we send them in the right order.
|
||||
if r.backwardOrdering {
|
||||
if events[len(events)-1].Type() == spec.MRoomCreate {
|
||||
// NOTSPEC: We've hit the beginning of the room so there's really nowhere
|
||||
// else to go. This seems to fix Element iOS from looping on /messages endlessly.
|
||||
end = types.TopologyToken{}
|
||||
}
|
||||
|
||||
// This reverses the array from old->new to new->old
|
||||
reversed := func(in []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
|
||||
out := make([]*rstypes.HeaderedEvent, len(in))
|
||||
|
|
@ -467,6 +458,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
r.didBackfill = true
|
||||
} else {
|
||||
// If not, it means the slice was empty because we reached the room's
|
||||
// creation, so return an empty slice.
|
||||
|
|
@ -516,7 +508,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.didBackfill = true
|
||||
// Append the PDUs to the list to send back to the client.
|
||||
events = append(events, pdus...)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ type DatabaseTransaction interface {
|
|||
// If backwardsOrdering is true, the most recent event must be first, else last.
|
||||
// Returns the filtered StreamEvents on success. Returns **unfiltered** StreamEvents and ErrNoEventsForFilter if
|
||||
// the provided filter removed all events, this can be used to still calculate the start/end position. (e.g for `/messages`)
|
||||
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, filter *synctypes.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error)
|
||||
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, filter *synctypes.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, start, end types.TopologyToken, err error)
|
||||
// EventPositionInTopology returns the depth and stream position of the given event.
|
||||
EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
|
||||
// BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events.
|
||||
|
|
|
|||
|
|
@ -48,14 +48,14 @@ const insertEventInTopologySQL = "" +
|
|||
" RETURNING topological_position"
|
||||
|
||||
const selectEventIDsInRangeASCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
||||
" WHERE room_id = $1 AND (" +
|
||||
"(topological_position > $2 AND topological_position < $3) OR" +
|
||||
"(topological_position = $4 AND stream_position >= $5)" +
|
||||
") ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
|
||||
|
||||
const selectEventIDsInRangeDESCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
||||
" WHERE room_id = $1 AND (" +
|
||||
"(topological_position > $2 AND topological_position < $3) OR" +
|
||||
"(topological_position = $4 AND stream_position <= $5)" +
|
||||
|
|
@ -113,12 +113,13 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
|||
}
|
||||
|
||||
// SelectEventIDsInRange selects the IDs of events which positions are within a
|
||||
// given range in a given room's topological order.
|
||||
// given range in a given room's topological order. Returns the start/end topological tokens for
|
||||
// the returned eventIDs.
|
||||
// Returns an empty slice if no events match the given range.
|
||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||
ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition,
|
||||
limit int, chronologicalOrder bool,
|
||||
) (eventIDs []string, err error) {
|
||||
) (eventIDs []string, start, end types.TopologyToken, err error) {
|
||||
// Decide on the selection's order according to whether chronological order
|
||||
// is requested or not.
|
||||
var stmt *sql.Stmt
|
||||
|
|
@ -132,7 +133,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
|||
rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit)
|
||||
if err == sql.ErrNoRows {
|
||||
// If no event matched the request, return an empty slice.
|
||||
return []string{}, nil
|
||||
return []string{}, start, end, nil
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
@ -140,14 +141,23 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
|||
|
||||
// Return the IDs.
|
||||
var eventID string
|
||||
var token types.TopologyToken
|
||||
var tokens []types.TopologyToken
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(&eventID); err != nil {
|
||||
if err = rows.Scan(&eventID, &token.Depth, &token.PDUPosition); err != nil {
|
||||
return
|
||||
}
|
||||
eventIDs = append(eventIDs, eventID)
|
||||
tokens = append(tokens, token)
|
||||
}
|
||||
|
||||
return eventIDs, rows.Err()
|
||||
// The values are already ordered by SQL, so we can use them as is.
|
||||
if len(tokens) > 0 {
|
||||
start = tokens[0]
|
||||
end = tokens[len(tokens)-1]
|
||||
}
|
||||
|
||||
return eventIDs, start, end, rows.Err()
|
||||
}
|
||||
|
||||
// SelectPositionInTopology returns the position of a given event in the
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package shared
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
|
|
@ -232,15 +231,13 @@ func (d *DatabaseTransaction) GetAccountDataInRange(
|
|||
return d.AccountData.SelectAccountDataInRange(ctx, d.txn, userID, r, accountDataFilterPart)
|
||||
}
|
||||
|
||||
var ErrNoEventsForFilter = errors.New("no events returned using the provided filter")
|
||||
|
||||
func (d *DatabaseTransaction) GetEventsInTopologicalRange(
|
||||
ctx context.Context,
|
||||
from, to *types.TopologyToken,
|
||||
roomID string,
|
||||
filter *synctypes.RoomEventFilter,
|
||||
backwardOrdering bool,
|
||||
) (events []types.StreamEvent, err error) {
|
||||
) (events []types.StreamEvent, start, end types.TopologyToken, err error) {
|
||||
var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition
|
||||
if backwardOrdering {
|
||||
// Backward ordering means the 'from' token has a higher depth than the 'to' token
|
||||
|
|
@ -258,7 +255,7 @@ func (d *DatabaseTransaction) GetEventsInTopologicalRange(
|
|||
|
||||
// Select the event IDs from the defined range.
|
||||
var eIDs []string
|
||||
eIDs, err = d.Topology.SelectEventIDsInRange(
|
||||
eIDs, start, end, err = d.Topology.SelectEventIDsInRange(
|
||||
ctx, d.txn, roomID, minDepth, maxDepth, maxStreamPosForMaxDepth, filter.Limit, !backwardOrdering,
|
||||
)
|
||||
if err != nil {
|
||||
|
|
@ -270,17 +267,6 @@ func (d *DatabaseTransaction) GetEventsInTopologicalRange(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Check if we should be able to return events.
|
||||
// If we received 0 events, this most likely means that the provided filter removed them.
|
||||
if len(eIDs) > 0 && len(events) == 0 {
|
||||
// We try to fetch the events without a filter, so we can tell the client if there
|
||||
// are more events earlier than the requested and filtered.
|
||||
events, err = d.OutputEvents.SelectEvents(ctx, d.txn, eIDs, nil, true)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return events, ErrNoEventsForFilter
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,14 +44,14 @@ const insertEventInTopologySQL = "" +
|
|||
" ON CONFLICT DO NOTHING"
|
||||
|
||||
const selectEventIDsInRangeASCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
||||
" WHERE room_id = $1 AND (" +
|
||||
"(topological_position > $2 AND topological_position < $3) OR" +
|
||||
"(topological_position = $4 AND stream_position >= $5)" +
|
||||
") ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
|
||||
|
||||
const selectEventIDsInRangeDESCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
||||
" WHERE room_id = $1 AND (" +
|
||||
"(topological_position > $2 AND topological_position < $3) OR" +
|
||||
"(topological_position = $4 AND stream_position <= $5)" +
|
||||
|
|
@ -111,11 +111,15 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
|||
return types.StreamPosition(event.Depth()), err
|
||||
}
|
||||
|
||||
// SelectEventIDsInRange selects the IDs of events which positions are within a
|
||||
// given range in a given room's topological order. Returns the start/end topological tokens for
|
||||
// the returned eventIDs.
|
||||
// Returns an empty slice if no events match the given range.
|
||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
minDepth, maxDepth, maxStreamPos types.StreamPosition,
|
||||
limit int, chronologicalOrder bool,
|
||||
) (eventIDs []string, err error) {
|
||||
) (eventIDs []string, start, end types.TopologyToken, err error) {
|
||||
// Decide on the selection's order according to whether chronological order
|
||||
// is requested or not.
|
||||
var stmt *sql.Stmt
|
||||
|
|
@ -129,18 +133,27 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
|||
rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit)
|
||||
if err == sql.ErrNoRows {
|
||||
// If no event matched the request, return an empty slice.
|
||||
return []string{}, nil
|
||||
return []string{}, start, end, nil
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Return the IDs.
|
||||
var eventID string
|
||||
var token types.TopologyToken
|
||||
var tokens []types.TopologyToken
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(&eventID); err != nil {
|
||||
if err = rows.Scan(&eventID, &token.Depth, &token.PDUPosition); err != nil {
|
||||
return
|
||||
}
|
||||
eventIDs = append(eventIDs, eventID)
|
||||
tokens = append(tokens, token)
|
||||
}
|
||||
|
||||
// The values are already ordered by SQL, so we can use them as is.
|
||||
if len(tokens) > 0 {
|
||||
start = tokens[0]
|
||||
end = tokens[len(tokens)-1]
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import (
|
|||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
|
|
@ -214,12 +213,14 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
|
|||
|
||||
// backpaginate 5 messages starting at the latest position.
|
||||
filter := &synctypes.RoomEventFilter{Limit: 5}
|
||||
paginatedEvents, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true)
|
||||
paginatedEvents, start, end, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEventsInTopologicalRange returned an error: %s", err)
|
||||
}
|
||||
gots := snapshot.StreamEventsToEvents(context.Background(), nil, paginatedEvents, nil)
|
||||
test.AssertEventsEqual(t, gots, test.Reversed(events[len(events)-5:]))
|
||||
assert.Equal(t, types.TopologyToken{Depth: 15, PDUPosition: 15}, start)
|
||||
assert.Equal(t, types.TopologyToken{Depth: 11, PDUPosition: 11}, end)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
@ -248,10 +249,12 @@ func TestGetEventsInRangeWithTopologyTokenNoEventsForFilter(t *testing.T) {
|
|||
notTypes := []string{spec.MRoomRedaction}
|
||||
senders := []string{alice.ID}
|
||||
filter := &synctypes.RoomEventFilter{Limit: 20, NotTypes: ¬Types, Senders: &senders}
|
||||
paginatedEvents, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true)
|
||||
assert.Equal(t, shared.ErrNoEventsForFilter, err)
|
||||
gots := snapshot.StreamEventsToEvents(context.Background(), nil, paginatedEvents, nil)
|
||||
test.AssertEventsEqual(t, gots, test.Reversed(events[len(events)-15:]))
|
||||
paginatedEvents, start, end, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, len(paginatedEvents))
|
||||
// Even if we didn't get anything back due to the filter, we should still have start/end
|
||||
assert.Equal(t, types.TopologyToken{Depth: 15, PDUPosition: 15}, start)
|
||||
assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 1}, end)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,11 +89,11 @@ type Topology interface {
|
|||
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
||||
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
|
||||
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
||||
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
||||
// SelectEventIDsInRange selects the IDs and the topological position of events whose depths are within a given range in a given room's topological order.
|
||||
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`. Returns the eventIDs and start/end topological tokens.
|
||||
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
||||
// Returns an empty slice if no events match the given range.
|
||||
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
|
||||
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, start, end types.TopologyToken, err error)
|
||||
// SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to.
|
||||
SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error)
|
||||
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.DB, func()) {
|
||||
|
|
@ -60,28 +61,37 @@ func TestTopologyTable(t *testing.T) {
|
|||
highestPos = topoPos + 1
|
||||
}
|
||||
// check ordering works without limit
|
||||
eventIDs, err := tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to SelectEventIDsInRange: %s", err)
|
||||
}
|
||||
eventIDs, start, end, err := tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, true)
|
||||
assert.NoError(t, err, "failed to SelectEventIDsInRange")
|
||||
test.AssertEventIDsEqual(t, eventIDs, events[:])
|
||||
eventIDs, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to SelectEventIDsInRange: %s", err)
|
||||
}
|
||||
test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[:]))
|
||||
// check ordering works with limit
|
||||
eventIDs, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to SelectEventIDsInRange: %s", err)
|
||||
}
|
||||
test.AssertEventIDsEqual(t, eventIDs, events[:3])
|
||||
eventIDs, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to SelectEventIDsInRange: %s", err)
|
||||
}
|
||||
test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[len(events)-3:]))
|
||||
assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 0}, start)
|
||||
assert.Equal(t, types.TopologyToken{Depth: 5, PDUPosition: 4}, end)
|
||||
|
||||
eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 100, false)
|
||||
assert.NoError(t, err, "failed to SelectEventIDsInRange")
|
||||
test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[:]))
|
||||
assert.Equal(t, types.TopologyToken{Depth: 5, PDUPosition: 4}, start)
|
||||
assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 0}, end)
|
||||
|
||||
// check ordering works with limit
|
||||
eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, true)
|
||||
assert.NoError(t, err, "failed to SelectEventIDsInRange")
|
||||
test.AssertEventIDsEqual(t, eventIDs, events[:3])
|
||||
assert.Equal(t, types.TopologyToken{Depth: 1, PDUPosition: 0}, start)
|
||||
assert.Equal(t, types.TopologyToken{Depth: 3, PDUPosition: 2}, end)
|
||||
|
||||
eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, room.ID, 0, highestPos, highestPos, 3, false)
|
||||
assert.NoError(t, err, "failed to SelectEventIDsInRange")
|
||||
test.AssertEventIDsEqual(t, eventIDs, test.Reversed(events[len(events)-3:]))
|
||||
assert.Equal(t, types.TopologyToken{Depth: 5, PDUPosition: 4}, start)
|
||||
assert.Equal(t, types.TopologyToken{Depth: 3, PDUPosition: 2}, end)
|
||||
|
||||
// Check that we return no values for invalid rooms
|
||||
eventIDs, start, end, err = tab.SelectEventIDsInRange(ctx, txn, "!doesnotexist:localhost", 0, highestPos, highestPos, 10, false)
|
||||
assert.NoError(t, err, "failed to SelectEventIDsInRange")
|
||||
assert.Equal(t, 0, len(eventIDs))
|
||||
assert.Equal(t, types.TopologyToken{}, start)
|
||||
assert.Equal(t, types.TopologyToken{}, end)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue