mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 23:13:11 -06:00
Validate that the old jetstream consumers get deleted
This commit is contained in:
parent
099233770b
commit
c08df1603e
|
|
@ -23,6 +23,7 @@ import (
|
||||||
uapi "github.com/matrix-org/dendrite/userapi/api"
|
uapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
|
|
@ -520,12 +521,34 @@ func TestOutputAppserviceEvent(t *testing.T) {
|
||||||
// Create a dummy application service
|
// Create a dummy application service
|
||||||
cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as}
|
cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as}
|
||||||
|
|
||||||
|
// Prepare AS Streams on the old topic to validate that they get deleted
|
||||||
|
jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
||||||
|
|
||||||
|
token := jetstream.Tokenise(as.ID)
|
||||||
|
if err := jetstream.JetStreamConsumer(
|
||||||
|
processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
||||||
|
cfg.Global.JetStream.Durable("Appservice_"+token),
|
||||||
|
50, // maximum number of events to send in a single transaction
|
||||||
|
func(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Start the syncAPI to have `/joined_members` available
|
// Start the syncAPI to have `/joined_members` available
|
||||||
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics)
|
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics)
|
||||||
|
|
||||||
// start the consumer
|
// start the consumer
|
||||||
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
|
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
|
||||||
|
|
||||||
|
// At this point, the old JetStream consumers should be deleted
|
||||||
|
for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) {
|
||||||
|
if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" {
|
||||||
|
t.Fatalf("Consumer still exists")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create the room, this triggers the AS to receive an invite for Bob.
|
// Create the room, this triggers the AS to receive an invite for Bob.
|
||||||
if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
||||||
t.Fatalf("failed to send events: %v", err)
|
t.Fatalf("failed to send events: %v", err)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue