From c08df1603e09a719e7462d0f7cae67415366ddb4 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Tue, 12 Dec 2023 11:55:01 +0100 Subject: [PATCH] Validate that the old jetstream consumers get deleted --- appservice/appservice_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index 8b5c1a32e..aa2af9f24 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -23,6 +23,7 @@ import ( uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -520,12 +521,34 @@ func TestOutputAppserviceEvent(t *testing.T) { // Create a dummy application service cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as} + // Prepare AS Streams on the old topic to validate that they get deleted + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + token := jetstream.Tokenise(as.ID) + if err := jetstream.JetStreamConsumer( + processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + cfg.Global.JetStream.Durable("Appservice_"+token), + 50, // maximum number of events to send in a single transaction + func(ctx context.Context, msgs []*nats.Msg) bool { + return true + }, + ); err != nil { + t.Fatal(err) + } + // Start the syncAPI to have `/joined_members` available syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics) // start the consumer appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI) + // At this point, the old JetStream consumers should be deleted + for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) { + if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" { + t.Fatalf("Consumer still exists") + } + } + // Create the room, this triggers the AS to receive an invite for Bob. if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err)