From 9599b3686e02356e48a537a820b075523252ac64 Mon Sep 17 00:00:00 2001 From: kegsay Date: Wed, 11 May 2022 13:44:32 +0100 Subject: [PATCH] More syncapi tests (#2451) * WIP tests for flakey create event * Uncomment all database test --- syncapi/syncapi_test.go | 148 +++++++++++++++++++++++++++++++++++----- test/event.go | 3 +- 2 files changed, 132 insertions(+), 19 deletions(-) diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 12b5178d8..7809cdaba 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -11,10 +11,12 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/test" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" ) @@ -39,6 +41,14 @@ func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req * return nil // TODO: return state } +func (s *syncRoomserverAPI) QuerySharedUsers(ctx context.Context, req *rsapi.QuerySharedUsersRequest, res *rsapi.QuerySharedUsersResponse) error { + res.UserIDsToCount = make(map[string]int) + return nil +} +func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsapi.QueryBulkStateContentRequest, res *rsapi.QueryBulkStateContentResponse) error { + return nil +} + type syncUserAPI struct { userapi.SyncUserAPI accounts []userapi.Device @@ -60,16 +70,22 @@ func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.Pe } type syncKeyAPI struct { - keyapi.KeyInternalAPI + keyapi.SyncKeyAPI } -func TestSyncAPI(t *testing.T) { +func (s *syncKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyChangesRequest, res *keyapi.QueryKeyChangesResponse) { +} +func (s *syncKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneTimeKeysRequest, res *keyapi.QueryOneTimeKeysResponse) { + +} + +func TestSyncAPIAccessTokens(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - testSync(t, dbType) + testSyncAccessTokens(t, dbType) }) } -func testSync(t *testing.T, dbType test.DBType) { +func testSyncAccessTokens(t *testing.T, dbType test.DBType) { user := test.NewUser() room := test.NewRoom(t, user) alice := userapi.Device{ @@ -85,20 +101,7 @@ func testSync(t *testing.T, dbType test.DBType) { jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) - var msgs []*nats.Msg - for _, ev := range room.Events() { - var addsStateIDs []string - if ev.StateKey() != nil { - addsStateIDs = append(addsStateIDs, ev.EventID()) - } - msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{ - Type: rsapi.OutputTypeNewRoomEvent, - NewRoomEvent: &rsapi.OutputNewRoomEvent{ - Event: ev, - AddsStateEventIDs: addsStateIDs, - }, - })) - } + msgs := toNATSMsgs(t, base, room.Events()) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) test.MustPublishMsgs(t, jsctx, msgs...) @@ -160,3 +163,112 @@ func testSync(t *testing.T, dbType test.DBType) { } } } + +// Tests what happens when we create a room and then /sync before all events from /createRoom have +// been sent to the syncapi +func TestSyncAPICreateRoomSyncEarly(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + testSyncAPICreateRoomSyncEarly(t, dbType) + }) +} + +func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { + user := test.NewUser() + room := test.NewRoom(t, user) + alice := userapi.Device{ + ID: "ALICEID", + UserID: user.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "Alice", + AccountType: userapi.AccountTypeUser, + } + + base, close := test.CreateBaseDendrite(t, dbType) + defer close() + + jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + // order is: + // m.room.create + // m.room.member + // m.room.power_levels + // m.room.join_rules + // m.room.history_visibility + msgs := toNATSMsgs(t, base, room.Events()) + sinceTokens := make([]string, len(msgs)) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) + for i, msg := range msgs { + test.MustPublishMsgs(t, jsctx, msg) + time.Sleep(50 * time.Millisecond) + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "timeout": "0", + }))) + if w.Code != 200 { + t.Errorf("got HTTP %d want 200", w.Code) + continue + } + var res types.Response + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Errorf("failed to decode response body: %s", err) + } + sinceTokens[i] = res.NextBatch.String() + if i == 0 { // create event does not produce a room section + if len(res.Rooms.Join) != 0 { + t.Fatalf("i=%v got %d joined rooms, want 0", i, len(res.Rooms.Join)) + } + } else { // we should have that room somewhere + if len(res.Rooms.Join) != 1 { + t.Fatalf("i=%v got %d joined rooms, want 1", i, len(res.Rooms.Join)) + } + } + } + + // sync with no token "" and with the penultimate token and this should neatly return room events in the timeline block + sinceTokens = append([]string{""}, sinceTokens[:len(sinceTokens)-1]...) + + t.Logf("waited for events to be consumed; syncing with %v", sinceTokens) + for i, since := range sinceTokens { + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "timeout": "0", + "since": since, + }))) + if w.Code != 200 { + t.Errorf("since=%s got HTTP %d want 200", since, w.Code) + } + var res types.Response + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Errorf("failed to decode response body: %s", err) + } + if len(res.Rooms.Join) != 1 { + t.Fatalf("since=%s got %d joined rooms, want 1", since, len(res.Rooms.Join)) + } + t.Logf("since=%s res state:%+v res timeline:%+v", since, res.Rooms.Join[room.ID].State.Events, res.Rooms.Join[room.ID].Timeline.Events) + gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events)) + for j, ev := range res.Rooms.Join[room.ID].Timeline.Events { + gotEventIDs[j] = ev.EventID + } + test.AssertEventIDsEqual(t, gotEventIDs, room.Events()[i:]) + } +} + +func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg { + result := make([]*nats.Msg, len(input)) + for i, ev := range input { + var addsStateIDs []string + if ev.StateKey() != nil { + addsStateIDs = append(addsStateIDs, ev.EventID()) + } + result[i] = test.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{ + Type: rsapi.OutputTypeNewRoomEvent, + NewRoomEvent: &rsapi.OutputNewRoomEvent{ + Event: ev, + AddsStateEventIDs: addsStateIDs, + }, + }) + } + return result +} diff --git a/test/event.go b/test/event.go index b2e2805ba..40cb8f0e1 100644 --- a/test/event.go +++ b/test/event.go @@ -64,7 +64,8 @@ func Reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.Header func AssertEventIDsEqual(t *testing.T, gotEventIDs []string, wants []*gomatrixserverlib.HeaderedEvent) { t.Helper() if len(gotEventIDs) != len(wants) { - t.Fatalf("length mismatch: got %d events, want %d", len(gotEventIDs), len(wants)) + t.Errorf("length mismatch: got %d events, want %d", len(gotEventIDs), len(wants)) + return } for i := range wants { w := wants[i].EventID()