diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index d7e39704d..270b0ee95 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -210,15 +210,16 @@ func (r *messagesReq) retrieveEvents() ( } // Sort the events to ensure we send them in the right order. - events = gomatrixserverlib.HeaderedReverseTopologicalOrdering( - events, - gomatrixserverlib.TopologicalOrderByPrevEvents, - ) if r.backwardOrdering { // This reverses the array from old->new to new->old - sort.SliceStable(events, func(i, j int) bool { - return true - }) + reversed := func(in []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent { + out := make([]gomatrixserverlib.HeaderedEvent, len(in)) + for i := 0; i < len(in); i++ { + out[i] = in[len(in)-i-1] + } + return out + } + events = reversed(events) } // Convert all of the events into client events. @@ -259,6 +260,7 @@ func (r *messagesReq) retrieveEvents() ( // to them by the event on their left, therefore we need to decrement the // end position we send in the response if we're going backward. end.PDUPosition-- + end.EDUTypingPosition += 1000 } // The lowest token value is 1, therefore we need to manually set it to that @@ -345,10 +347,23 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent // Append the events ve previously retrieved locally. events = append(events, r.db.StreamEventsToEvents(nil, streamEvents)...) + sort.Sort(eventsByDepth(events)) return } +type eventsByDepth []gomatrixserverlib.HeaderedEvent + +func (e eventsByDepth) Len() int { + return len(e) +} +func (e eventsByDepth) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} +func (e eventsByDepth) Less(i, j int) bool { + return e[i].Depth() < e[j].Depth() +} + // backfill performs a backfill request over the federation on another // homeserver in the room. // See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid @@ -375,17 +390,24 @@ func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) // Currently, this can race with live events for the room and cause problems. It's also just a bit unclear // when you have multiple entry points to write events. + // we have to order these by depth, starting with the lowest because otherwise the topology tokens + // will skip over events that have the same depth but different stream positions due to the query which is: + // - anything less than the depth OR + // - anything with the same depth and a lower stream position. + sort.Sort(eventsByDepth(res.Events)) + // Store the events in the database, while marking them as unfit to show // up in responses to sync requests. for i := range res.Events { - if _, err = r.db.WriteEvent( + _, err = r.db.WriteEvent( r.ctx, &res.Events[i], []gomatrixserverlib.HeaderedEvent{}, []string{}, []string{}, nil, true, - ); err != nil { + ) + if err != nil { return nil, err } } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 425073a7b..314ea2aa3 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -434,6 +434,7 @@ func (d *SyncServerDatasource) syncPositionTx( } sp.PDUPosition = types.StreamPosition(maxEventID) sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition()) + sp.Type = types.PaginationTokenTypeStream return } @@ -658,6 +659,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( backwardTopologyPos = types.StreamPosition(1) } else { backwardTopologyPos-- + backwardTopologyStreamPos += 1000 // this has to be bigger than the number of events we backfill per request } // We don't include a device here as we don't need to send down @@ -817,11 +819,13 @@ func (d *SyncServerDatasource) getBackwardTopologyPos( if len(events) > 0 { pos, spos, _ = d.topology.selectPositionInTopology(ctx, txn, events[0].EventID()) } - // TODO: I have no idea what this is doing. + // go to the previous position so we don't pull out the same event twice + // FIXME: This could be done more nicely by being explicit with inclusive/exclusive rules if pos-1 <= 0 { pos = types.StreamPosition(1) } else { pos = pos - 1 + spos += 1000 // this has to be bigger than the number of events we backfill per request } return } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 378c1fe35..b951efa45 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -118,6 +118,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []gomatrixserverl if err != nil { t.Fatalf("WriteEvent failed: %s", err) } + fmt.Println("Event ID", ev.EventID(), " spos=", pos, "depth=", ev.Depth()) positions = append(positions, pos) } return @@ -407,6 +408,64 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { } } +// The purpose of this test is to make sure that events are returned in the right *order* when they have been inserted in a manner similar to +// how any kind of backfill operation will insert the events. This test inserts the SimpleRoom events in a manner similar to how backfill over +// federation would: +// - First inserts join event of test user C +// - Inserts chunks of history in strata e.g (25-30, 20-25, 15-20, 10-15, 5-10, 0-5). +// The test then does a backfill to ensure that the response is ordered correctly according to depth. +func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) { + t.Parallel() + db := MustCreateDatabase(t) + events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB) + + // "federation" join + userC := fmt.Sprintf("@radiance:%s", testOrigin) + joinEvent := MustCreateEvent(t, testRoomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ + Content: []byte(fmt.Sprintf(`{"membership":"join"}`)), + Type: "m.room.member", + StateKey: &userC, + Sender: userC, + Depth: int64(len(events) + 1), + }) + MustWriteEvents(t, db, []gomatrixserverlib.HeaderedEvent{joinEvent}) + + // Sync will return this for the prev_batch + from := topologyTokenBefore(t, db, joinEvent.EventID()) + + // inject events in batches as if they were from backfill + // e.g [1,2,3,4,5,6] => [4,5,6] , [1,2,3] + chunkSize := 5 + for i := len(events); i >= 0; i -= chunkSize { + start := i - chunkSize + if start < 0 { + start = 0 + } + backfill := events[start:i] + MustWriteEvents(t, db, backfill) + } + + // head towards the beginning of time + to := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0) + + // starting at `from`, backpaginate to the beginning of time, asserting as we go. + chunkSize = 3 + events = reversed(events) + for i := 0; i < len(events); i += chunkSize { + paginatedEvents, err := db.GetEventsInRange(ctx, from, to, testRoomID, chunkSize, true) + if err != nil { + t.Fatalf("GetEventsInRange returned an error: %s", err) + } + gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll) + endi := i + chunkSize + if endi > len(events) { + endi = len(events) + } + assertEventsEqual(t, from.String(), true, gots, events[i:endi]) + from = topologyTokenBefore(t, db, paginatedEvents[len(paginatedEvents)-1].EventID()) + } +} + func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []gomatrixserverlib.HeaderedEvent) { if len(gots) != len(wants) { t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants)) @@ -447,6 +506,21 @@ func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatr } } +func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *types.PaginationToken { + pos, spos, err := db.EventPositionInTopology(ctx, eventID) + if err != nil { + t.Fatalf("failed to get EventPositionInTopology: %s", err) + } + + if pos-1 <= 0 { + pos = types.StreamPosition(1) + } else { + pos = pos - 1 + spos += 1000 // this has to be bigger than the chunk limit + } + return types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos, spos) +} + func reversed(in []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent { out := make([]gomatrixserverlib.HeaderedEvent, len(in)) for i := 0; i < len(in); i++ {