diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index eca63371d..a5130e342 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -14,8 +14,17 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/dendrite/syncapi" + uapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" "github.com/stretchr/testify/assert" + "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice/api" @@ -407,3 +416,167 @@ func TestRoomserverConsumerOneInvite(t *testing.T) { close(evChan) }) } + +// Note: If this test panics, it is because we timed out waiting for the +// join event to come through to the appservice and we close the DB/shutdown Dendrite. This makes the +// syncAPI unhappy, as it is unable to write to the database. +func TestOutputAppserviceEvent(t *testing.T) { + alice := test.NewUser(t) + bob := test.NewUser(t) + room := test.NewRoom(t, alice) + + // Invite Bob + room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{ + "membership": "invite", + }, test.WithStateKey(bob.ID)) + + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + evChan := make(chan struct{}) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // Create required internal APIs + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // Create the router, so we can hit `/joined_members` + routers := httputil.NewRouters() + + accessTokens := map[*test.User]userDevice{ + bob: {}, + } + + usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) + clientapi.AddPublicRoutes(processCtx, routers, cfg, natsInstance, nil, rsAPI, nil, nil, nil, usrAPI, nil, nil, caching.DisableMetrics) + createAccessTokens(t, accessTokens, usrAPI, processCtx.Context(), routers) + + // create a dummy AS url, handling the events + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var txn consumers.ApplicationServiceTransaction + err := json.NewDecoder(r.Body).Decode(&txn) + if err != nil { + t.Fatal(err) + } + for _, ev := range txn.Events { + if ev.Type != spec.MRoomMember { + continue + } + if ev.StateKey != nil && *ev.StateKey == bob.ID { + membership := gjson.GetBytes(ev.Content, "membership").Str + t.Logf("Processing membership: %s", membership) + switch membership { + case spec.Invite: + // Accept the invite + joinEv := room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{ + "membership": "join", + }, test.WithStateKey(bob.ID)) + + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, []*types.HeaderedEvent{joinEv}, "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + case spec.Join: // the AS has received the join event, now hit `/joined_members` to validate that + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/_matrix/client/v3/rooms/"+room.ID+"/joined_members", nil) + req.Header.Set("Authorization", "Bearer "+accessTokens[bob].accessToken) + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected HTTP 200, got %d: %s", rec.Code, rec.Body.String()) + } + + // Both Alice and Bob should be joined. If not, we have a race condition + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+alice.ID).Exists() { + t.Errorf("Alice is not joined to the room") // in theory should not happen + } + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+bob.ID).Exists() { + t.Errorf("Bob is not joined to the room") + } + evChan <- struct{}{} + default: + t.Fatalf("Unexpected membership: %s", membership) + } + } + } + })) + defer srv.Close() + + as := &config.ApplicationService{ + ID: "someID", + URL: srv.URL, + ASToken: "", + HSToken: "", + SenderLocalpart: "senderLocalPart", + NamespaceMap: map[string][]config.ApplicationServiceNamespace{ + "users": {{RegexpObject: regexp.MustCompile(bob.ID)}}, + "aliases": {{RegexpObject: regexp.MustCompile(room.ID)}}, + }, + } + as.CreateHTTPClient(cfg.AppServiceAPI.DisableTLSValidation) + + // Create a dummy application service + cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as} + + // Start the syncAPI to have `/joined_members` available + syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics) + + // start the consumer + appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI) + + // Create the room, this triggers the AS to receive an invite for Bob. + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + select { + // Pretty generous timeout duration... + case <-time.After(time.Millisecond * 1000): // wait for the AS to process the events + t.Errorf("Timed out waiting for join event") + case <-evChan: + } + close(evChan) + }) +} + +type userDevice struct { + accessToken string + deviceID string + password string +} + +func createAccessTokens(t *testing.T, accessTokens map[*test.User]userDevice, userAPI uapi.UserInternalAPI, ctx context.Context, routers httputil.Routers) { + t.Helper() + for u := range accessTokens { + localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID) + userRes := &uapi.PerformAccountCreationResponse{} + password := util.RandomString(8) + if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{ + AccountType: u.AccountType, + Localpart: localpart, + ServerName: serverName, + Password: password, + }, userRes); err != nil { + t.Errorf("failed to create account: %s", err) + } + req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{ + "type": authtypes.LoginTypePassword, + "identifier": map[string]interface{}{ + "type": "m.id.user", + "user": u.ID, + }, + "password": password, + })) + rec := httptest.NewRecorder() + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("failed to login: %s", rec.Body.String()) + } + accessTokens[u] = userDevice{ + accessToken: gjson.GetBytes(rec.Body.Bytes(), "access_token").String(), + deviceID: gjson.GetBytes(rec.Body.Bytes(), "device_id").String(), + password: password, + } + } +}