Fix ordering when backfilling (#1000)
* Fix ordering when backfilling The problem was that we weren't sorting the returned events by depth when sending them back to the caller, instead we were sorting by prev_events which is not the same thing. * Fixup tests
This commit is contained in:
parent
f7cfa75886
commit
36bbb25561
|
@ -210,15 +210,16 @@ func (r *messagesReq) retrieveEvents() (
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort the events to ensure we send them in the right order.
|
// Sort the events to ensure we send them in the right order.
|
||||||
events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(
|
|
||||||
events,
|
|
||||||
gomatrixserverlib.TopologicalOrderByPrevEvents,
|
|
||||||
)
|
|
||||||
if r.backwardOrdering {
|
if r.backwardOrdering {
|
||||||
// This reverses the array from old->new to new->old
|
// This reverses the array from old->new to new->old
|
||||||
sort.SliceStable(events, func(i, j int) bool {
|
reversed := func(in []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
|
||||||
return true
|
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
|
||||||
})
|
for i := 0; i < len(in); i++ {
|
||||||
|
out[i] = in[len(in)-i-1]
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
events = reversed(events)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert all of the events into client events.
|
// Convert all of the events into client events.
|
||||||
|
@ -259,6 +260,7 @@ func (r *messagesReq) retrieveEvents() (
|
||||||
// to them by the event on their left, therefore we need to decrement the
|
// to them by the event on their left, therefore we need to decrement the
|
||||||
// end position we send in the response if we're going backward.
|
// end position we send in the response if we're going backward.
|
||||||
end.PDUPosition--
|
end.PDUPosition--
|
||||||
|
end.EDUTypingPosition += 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
// The lowest token value is 1, therefore we need to manually set it to that
|
// The lowest token value is 1, therefore we need to manually set it to that
|
||||||
|
@ -345,10 +347,23 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
||||||
|
|
||||||
// Append the events ve previously retrieved locally.
|
// Append the events ve previously retrieved locally.
|
||||||
events = append(events, r.db.StreamEventsToEvents(nil, streamEvents)...)
|
events = append(events, r.db.StreamEventsToEvents(nil, streamEvents)...)
|
||||||
|
sort.Sort(eventsByDepth(events))
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type eventsByDepth []gomatrixserverlib.HeaderedEvent
|
||||||
|
|
||||||
|
func (e eventsByDepth) Len() int {
|
||||||
|
return len(e)
|
||||||
|
}
|
||||||
|
func (e eventsByDepth) Swap(i, j int) {
|
||||||
|
e[i], e[j] = e[j], e[i]
|
||||||
|
}
|
||||||
|
func (e eventsByDepth) Less(i, j int) bool {
|
||||||
|
return e[i].Depth() < e[j].Depth()
|
||||||
|
}
|
||||||
|
|
||||||
// backfill performs a backfill request over the federation on another
|
// backfill performs a backfill request over the federation on another
|
||||||
// homeserver in the room.
|
// homeserver in the room.
|
||||||
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
|
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
|
||||||
|
@ -375,17 +390,24 @@ func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int)
|
||||||
// Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
|
// Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
|
||||||
// when you have multiple entry points to write events.
|
// when you have multiple entry points to write events.
|
||||||
|
|
||||||
|
// we have to order these by depth, starting with the lowest because otherwise the topology tokens
|
||||||
|
// will skip over events that have the same depth but different stream positions due to the query which is:
|
||||||
|
// - anything less than the depth OR
|
||||||
|
// - anything with the same depth and a lower stream position.
|
||||||
|
sort.Sort(eventsByDepth(res.Events))
|
||||||
|
|
||||||
// Store the events in the database, while marking them as unfit to show
|
// Store the events in the database, while marking them as unfit to show
|
||||||
// up in responses to sync requests.
|
// up in responses to sync requests.
|
||||||
for i := range res.Events {
|
for i := range res.Events {
|
||||||
if _, err = r.db.WriteEvent(
|
_, err = r.db.WriteEvent(
|
||||||
r.ctx,
|
r.ctx,
|
||||||
&res.Events[i],
|
&res.Events[i],
|
||||||
[]gomatrixserverlib.HeaderedEvent{},
|
[]gomatrixserverlib.HeaderedEvent{},
|
||||||
[]string{},
|
[]string{},
|
||||||
[]string{},
|
[]string{},
|
||||||
nil, true,
|
nil, true,
|
||||||
); err != nil {
|
)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -434,6 +434,7 @@ func (d *SyncServerDatasource) syncPositionTx(
|
||||||
}
|
}
|
||||||
sp.PDUPosition = types.StreamPosition(maxEventID)
|
sp.PDUPosition = types.StreamPosition(maxEventID)
|
||||||
sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition())
|
sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition())
|
||||||
|
sp.Type = types.PaginationTokenTypeStream
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -658,6 +659,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
|
||||||
backwardTopologyPos = types.StreamPosition(1)
|
backwardTopologyPos = types.StreamPosition(1)
|
||||||
} else {
|
} else {
|
||||||
backwardTopologyPos--
|
backwardTopologyPos--
|
||||||
|
backwardTopologyStreamPos += 1000 // this has to be bigger than the number of events we backfill per request
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't include a device here as we don't need to send down
|
// We don't include a device here as we don't need to send down
|
||||||
|
@ -817,11 +819,13 @@ func (d *SyncServerDatasource) getBackwardTopologyPos(
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
pos, spos, _ = d.topology.selectPositionInTopology(ctx, txn, events[0].EventID())
|
pos, spos, _ = d.topology.selectPositionInTopology(ctx, txn, events[0].EventID())
|
||||||
}
|
}
|
||||||
// TODO: I have no idea what this is doing.
|
// go to the previous position so we don't pull out the same event twice
|
||||||
|
// FIXME: This could be done more nicely by being explicit with inclusive/exclusive rules
|
||||||
if pos-1 <= 0 {
|
if pos-1 <= 0 {
|
||||||
pos = types.StreamPosition(1)
|
pos = types.StreamPosition(1)
|
||||||
} else {
|
} else {
|
||||||
pos = pos - 1
|
pos = pos - 1
|
||||||
|
spos += 1000 // this has to be bigger than the number of events we backfill per request
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,6 +118,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []gomatrixserverl
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("WriteEvent failed: %s", err)
|
t.Fatalf("WriteEvent failed: %s", err)
|
||||||
}
|
}
|
||||||
|
fmt.Println("Event ID", ev.EventID(), " spos=", pos, "depth=", ev.Depth())
|
||||||
positions = append(positions, pos)
|
positions = append(positions, pos)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -407,6 +408,64 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The purpose of this test is to make sure that events are returned in the right *order* when they have been inserted in a manner similar to
|
||||||
|
// how any kind of backfill operation will insert the events. This test inserts the SimpleRoom events in a manner similar to how backfill over
|
||||||
|
// federation would:
|
||||||
|
// - First inserts join event of test user C
|
||||||
|
// - Inserts chunks of history in strata e.g (25-30, 20-25, 15-20, 10-15, 5-10, 0-5).
|
||||||
|
// The test then does a backfill to ensure that the response is ordered correctly according to depth.
|
||||||
|
func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
db := MustCreateDatabase(t)
|
||||||
|
events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB)
|
||||||
|
|
||||||
|
// "federation" join
|
||||||
|
userC := fmt.Sprintf("@radiance:%s", testOrigin)
|
||||||
|
joinEvent := MustCreateEvent(t, testRoomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
|
||||||
|
Content: []byte(fmt.Sprintf(`{"membership":"join"}`)),
|
||||||
|
Type: "m.room.member",
|
||||||
|
StateKey: &userC,
|
||||||
|
Sender: userC,
|
||||||
|
Depth: int64(len(events) + 1),
|
||||||
|
})
|
||||||
|
MustWriteEvents(t, db, []gomatrixserverlib.HeaderedEvent{joinEvent})
|
||||||
|
|
||||||
|
// Sync will return this for the prev_batch
|
||||||
|
from := topologyTokenBefore(t, db, joinEvent.EventID())
|
||||||
|
|
||||||
|
// inject events in batches as if they were from backfill
|
||||||
|
// e.g [1,2,3,4,5,6] => [4,5,6] , [1,2,3]
|
||||||
|
chunkSize := 5
|
||||||
|
for i := len(events); i >= 0; i -= chunkSize {
|
||||||
|
start := i - chunkSize
|
||||||
|
if start < 0 {
|
||||||
|
start = 0
|
||||||
|
}
|
||||||
|
backfill := events[start:i]
|
||||||
|
MustWriteEvents(t, db, backfill)
|
||||||
|
}
|
||||||
|
|
||||||
|
// head towards the beginning of time
|
||||||
|
to := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0)
|
||||||
|
|
||||||
|
// starting at `from`, backpaginate to the beginning of time, asserting as we go.
|
||||||
|
chunkSize = 3
|
||||||
|
events = reversed(events)
|
||||||
|
for i := 0; i < len(events); i += chunkSize {
|
||||||
|
paginatedEvents, err := db.GetEventsInRange(ctx, from, to, testRoomID, chunkSize, true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEventsInRange returned an error: %s", err)
|
||||||
|
}
|
||||||
|
gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll)
|
||||||
|
endi := i + chunkSize
|
||||||
|
if endi > len(events) {
|
||||||
|
endi = len(events)
|
||||||
|
}
|
||||||
|
assertEventsEqual(t, from.String(), true, gots, events[i:endi])
|
||||||
|
from = topologyTokenBefore(t, db, paginatedEvents[len(paginatedEvents)-1].EventID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []gomatrixserverlib.HeaderedEvent) {
|
func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []gomatrixserverlib.HeaderedEvent) {
|
||||||
if len(gots) != len(wants) {
|
if len(gots) != len(wants) {
|
||||||
t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants))
|
t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants))
|
||||||
|
@ -447,6 +506,21 @@ func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *types.PaginationToken {
|
||||||
|
pos, spos, err := db.EventPositionInTopology(ctx, eventID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to get EventPositionInTopology: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pos-1 <= 0 {
|
||||||
|
pos = types.StreamPosition(1)
|
||||||
|
} else {
|
||||||
|
pos = pos - 1
|
||||||
|
spos += 1000 // this has to be bigger than the chunk limit
|
||||||
|
}
|
||||||
|
return types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos, spos)
|
||||||
|
}
|
||||||
|
|
||||||
func reversed(in []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
|
func reversed(in []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
|
||||||
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
|
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
|
||||||
for i := 0; i < len(in); i++ {
|
for i := 0; i < len(in); i++ {
|
||||||
|
|
Loading…
Reference in a new issue