From 6fdc3b0e4c0dc441442eca0377cbaf7cafd83454 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 May 2022 18:11:37 +0100 Subject: [PATCH] Add a way to inject jetstream messages --- setup/jetstream/nats.go | 7 ++ syncapi/syncapi_test.go | 148 +++++++++++++++++++++++++--------------- test/base.go | 5 ++ test/jetstream.go | 35 ++++++++++ 4 files changed, 140 insertions(+), 55 deletions(-) create mode 100644 test/jetstream.go diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 8d5289697..90f162f82 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -29,6 +29,13 @@ func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Co return pc, js, jc } +func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) { + for _, stream := range streams { // streams are defined in streams.go + name := cfg.Prefixed(stream.Name) + js.DeleteStream(name) + } +} + func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 003280aeb..7185bac94 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -5,16 +5,15 @@ import ( "net/http" "net/http/httptest" "testing" + "time" keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" userapi "github.com/matrix-org/dendrite/userapi/api" -) - -var ( - alice = "@alice:localhost" - aliceAccessToken = "ALICE_BEARER_TOKEN" + "github.com/nats-io/nats.go" ) type syncRoomserverAPI struct { @@ -22,19 +21,16 @@ type syncRoomserverAPI struct { } type syncUserAPI struct { - userapi.UserInternalAPI + userapi.SyncUserAPI + accounts []userapi.Device } func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error { - if req.AccessToken == aliceAccessToken { - res.Device = &userapi.Device{ - ID: "ID", - UserID: alice, - AccessToken: aliceAccessToken, - AccountType: userapi.AccountTypeUser, - DisplayName: "Alice", + for _, acc := range s.accounts { + if acc.AccessToken == req.AccessToken { + res.Device = &acc + return nil } - return nil } res.Err = "unknown user" return nil @@ -50,46 +46,88 @@ type syncKeyAPI struct { func TestSyncAPI(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - base, close := test.CreateBaseDendrite(t, dbType) - defer close() - AddPublicRoutes(base, &syncUserAPI{}, &syncRoomserverAPI{}, &syncKeyAPI{}) - - testCases := []struct { - name string - req *http.Request - wantCode int - }{ - { - name: "missing access token", - req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ - "timeout": "0", - })), - wantCode: 401, - }, - { - name: "unknown access token", - req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ - "access_token": "foo", - "timeout": "0", - })), - wantCode: 401, - }, - { - name: "valid access token", - req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ - "access_token": aliceAccessToken, - "timeout": "0", - })), - wantCode: 200, - }, - } - - for _, tc := range testCases { - w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, tc.req) - if w.Code != tc.wantCode { - t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) - } - } + testSync(t, dbType) }) } + +func testSync(t *testing.T, dbType test.DBType) { + user := test.NewUser() + room := test.NewRoom(t, user) + alice := userapi.Device{ + ID: "ALICEID", + UserID: user.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "Alice", + AccountType: userapi.AccountTypeUser, + } + + base, close := test.CreateBaseDendrite(t, dbType) + defer close() + + jsctx, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + var msgs []*nats.Msg + for _, ev := range room.Events() { + msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{ + Type: rsapi.OutputTypeNewRoomEvent, + NewRoomEvent: &rsapi.OutputNewRoomEvent{ + Event: ev, + }, + })) + } + test.MustPublishMsgs(t, jsctx, msgs...) + + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &syncKeyAPI{}) + + testCases := []struct { + name string + req *http.Request + wantCode int + wantJoinedRooms []string + }{ + { + name: "missing access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "timeout": "0", + })), + wantCode: 401, + }, + { + name: "unknown access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": "foo", + "timeout": "0", + })), + wantCode: 401, + }, + { + name: "valid access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "timeout": "0", + })), + wantCode: 200, + wantJoinedRooms: []string{room.ID}, + }, + } + // TODO: find a better way + time.Sleep(100 * time.Millisecond) + + for _, tc := range testCases { + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, tc.req) + if w.Code != tc.wantCode { + t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) + } + /* + if tc.wantJoinedRooms != nil { + var res types.Response + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Fatalf("%s: failed to decode response body: %s", tc.name, err) + } + if len(res.Rooms.Join) != len(tc.wantJoinedRooms) { + t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res) + } + } */ + } +} diff --git a/test/base.go b/test/base.go index 972d86056..865e587bc 100644 --- a/test/base.go +++ b/test/base.go @@ -16,6 +16,7 @@ package test import ( "errors" + "fmt" "io/fs" "os" "strings" @@ -28,6 +29,10 @@ import ( func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) { var cfg config.Dendrite cfg.Defaults(false) + cfg.Global.JetStream.InMemory = true + // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use + // the file system event with InMemory=true :( + cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType) switch dbType { case DBTypePostgres: diff --git a/test/jetstream.go b/test/jetstream.go new file mode 100644 index 000000000..488c22beb --- /dev/null +++ b/test/jetstream.go @@ -0,0 +1,35 @@ +package test + +import ( + "encoding/json" + "testing" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/nats-io/nats.go" +) + +func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) { + t.Helper() + for _, msg := range msgs { + if _, err := jsctx.PublishMsg(msg); err != nil { + t.Fatalf("MustPublishMsgs: failed to publish message: %s", err) + } + } +} + +func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg { + t.Helper() + msg := &nats.Msg{ + Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + Header: nats.Header{}, + } + msg.Header.Set(jetstream.RoomID, roomID) + var err error + msg.Data, err = json.Marshal(update) + if err != nil { + t.Fatalf("failed to marshal update: %s", err) + } + return msg +}