Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/eventsize

This commit is contained in:
Till Faelligen 2023-07-07 20:01:46 +02:00
commit 629f10f014
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
3 changed files with 28 additions and 4 deletions

View file

@ -112,7 +112,13 @@ func (m *Migrator) Up(ctx context.Context) error {
func (m *Migrator) insertMigration(ctx context.Context, txn *sql.Tx, migrationName string) error { func (m *Migrator) insertMigration(ctx context.Context, txn *sql.Tx, migrationName string) error {
if m.insertStmt == nil { if m.insertStmt == nil {
stmt, err := m.db.Prepare(insertVersionSQL) var stmt *sql.Stmt
var err error
if txn == nil {
stmt, err = m.db.PrepareContext(ctx, insertVersionSQL)
} else {
stmt, err = txn.PrepareContext(ctx, insertVersionSQL)
}
if err != nil { if err != nil {
return fmt.Errorf("unable to prepare insert statement: %w", err) return fmt.Errorf("unable to prepare insert statement: %w", err)
} }

View file

@ -87,6 +87,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
return js, nc return js, nc
} }
// nolint:gocyclo
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil { if nc == nil {
var err error var err error
@ -126,16 +127,32 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
subjects = []string{name, name + ".>"} subjects = []string{name, name + ".>"}
} }
if info != nil { if info != nil {
// If the stream config doesn't match what we expect, try to update
// it. If that doesn't work then try to blow it away and we'll then
// recreate it in the next section.
// Each specific option that we set must be checked by hand, as if
// you DeepEqual the whole config struct, it will always show that
// there's a difference because the NATS Server will return defaults
// in the stream info.
switch { switch {
case !reflect.DeepEqual(info.Config.Subjects, subjects): case !reflect.DeepEqual(info.Config.Subjects, subjects):
fallthrough fallthrough
case info.Config.Retention != stream.Retention: case info.Config.Retention != stream.Retention:
fallthrough fallthrough
case info.Config.Storage != stream.Storage: case info.Config.Storage != stream.Storage:
if err = s.DeleteStream(name); err != nil { fallthrough
logrus.WithError(err).Fatal("Unable to delete stream") case info.Config.MaxAge != stream.MaxAge:
// Try updating the stream first, as many things can be updated
// non-destructively.
if info, err = s.UpdateStream(stream); err != nil {
logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name)
// We failed to update the stream, this is a last attempt to get
// things working but may result in data loss.
if err = s.DeleteStream(name); err != nil {
logrus.WithError(err).Fatalf("Unable to delete stream %q", name)
}
info = nil
} }
info = nil
} }
} }
if info == nil { if info == nil {

View file

@ -48,6 +48,7 @@ var streams = []*nats.StreamConfig{
Name: InputRoomEvent, Name: InputRoomEvent,
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,
Storage: nats.FileStorage, Storage: nats.FileStorage,
MaxAge: time.Hour * 24,
}, },
{ {
Name: InputDeviceListUpdate, Name: InputDeviceListUpdate,