diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 437c4540c..f31271fe6 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -9,6 +9,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" @@ -164,6 +165,9 @@ func TestPurgeRoom(t *testing.T) { base, db, close := mustCreateDatabase(t, dbType) defer close() + jsCtx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream) + fedClient := base.CreateFederationClient() rsAPI := roomserver.NewInternalAPI(base) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fedClient, rsAPI) @@ -248,9 +252,17 @@ func TestPurgeRoom(t *testing.T) { if err = rsAPI.PerformAdminPurgeRoom(ctx, &api.PerformAdminPurgeRoomRequest{RoomID: room.ID}, purgeResp); err != nil { t.Fatal(err) } - // TODO: Find a better solution, e.g. "hook" into the JetStream stream. - // Gives the SyncAPI and FederationAPI some time to delete their entries - time.Sleep(time.Millisecond * 100) + + // wait for all consumers to process the purge event + var sum = 1 + for sum > 0 { + sum = 0 + consumerCh := jsCtx.Consumers(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) + for x := range consumerCh { + sum += x.NumAckPending + } + time.Sleep(time.Millisecond) + } roomInfo, err = db.RoomInfo(ctx, room.ID) if err != nil {