mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 06:23:10 -06:00
Add stream recreation test
This commit is contained in:
parent
e067630887
commit
d8f31484a4
|
|
@ -12,7 +12,9 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
|
|
@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) {
|
|||
assert.Equal(t, false, banned)
|
||||
})
|
||||
}
|
||||
|
||||
// Validate that changing the AckPolicy/AckWait of room consumers
|
||||
// results in their recreation
|
||||
func TestRoomConsumerRecreation(t *testing.T) {
|
||||
|
||||
alice := test.NewUser(t)
|
||||
room := test.NewRoom(t, alice)
|
||||
|
||||
// As this is DB unrelated, just use SQLite
|
||||
cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite)
|
||||
defer closeDB()
|
||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||
natsInstance := &jetstream.NATSInstance{}
|
||||
|
||||
// Prepare a stream and consumer using the old configuration
|
||||
jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
||||
|
||||
streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent)
|
||||
consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID))
|
||||
subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID))
|
||||
|
||||
consumerConfig := &nats.ConsumerConfig{
|
||||
Durable: consumer,
|
||||
AckPolicy: nats.AckAllPolicy,
|
||||
DeliverPolicy: nats.DeliverAllPolicy,
|
||||
FilterSubject: subject,
|
||||
AckWait: (time.Minute * 2) + (time.Second * 10),
|
||||
InactiveThreshold: time.Hour * 24,
|
||||
}
|
||||
|
||||
// Create the consumer with the old config
|
||||
_, err := jsCtx.AddConsumer(streamName, consumerConfig)
|
||||
assert.NoError(t, err)
|
||||
|
||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||
// start JetStream listeners
|
||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
||||
rsAPI.SetFederationAPI(nil, nil)
|
||||
|
||||
// let the RS create the events, this also recreates the Consumers
|
||||
err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Validate that AckPolicy and AckWait has changed
|
||||
info, err := jsCtx.ConsumerInfo(streamName, consumer)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy)
|
||||
|
||||
wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10)
|
||||
assert.Equal(t, wantAckWait, info.Config.AckWait)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue