mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-03 12:13:09 -06:00
Merge branch 'matrix-org:main' into main
This commit is contained in:
commit
35444f5c6d
29
CHANGES.md
29
CHANGES.md
|
|
@ -1,5 +1,34 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## Dendrite 0.8.2 (2022-04-27)
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* Lazy-loading has been added to the `/sync` endpoint, which should speed up syncs considerably
|
||||||
|
* Filtering has been added to the `/messages` endpoint
|
||||||
|
* The room summary now contains "heroes" (up to 5 users in the room) for clients to display when no room name is set
|
||||||
|
* The existing lazy-loading caches will now be used by `/messages` and `/context` so that member events will not be sent to clients more times than necessary
|
||||||
|
* The account data stream now uses the provided filters
|
||||||
|
* The built-in NATS Server has been updated to version 2.8.0
|
||||||
|
* The `/state` and `/state_ids` endpoints will now return `M_NOT_FOUND` for rejected events
|
||||||
|
* Repeated calls to the `/redact` endpoint will now be idempotent when a transaction ID is given
|
||||||
|
* Dendrite should now be able to run as a Windows service under Service Control Manager
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
* Fictitious presence updates will no longer be created for users which have not sent us presence updates, which should speed up complete syncs considerably
|
||||||
|
* Uploading cross-signing device signatures should now be more reliable, fixing a number of bugs with cross-signing
|
||||||
|
* All account data should now be sent properly on a complete sync, which should eliminate problems with client settings or key backups appearing to be missing
|
||||||
|
* Account data will now be limited correctly on incremental syncs, returning the stream position of the most recent update rather than the latest stream position
|
||||||
|
* Account data will not be sent for parted rooms, which should reduce the number of left/forgotten rooms reappearing in clients as empty rooms
|
||||||
|
* The TURN username hash has been fixed which should help to resolve some problems when using TURN for voice calls (contributed by [fcwoknhenuxdfiyv](https://github.com/fcwoknhenuxdfiyv))
|
||||||
|
* Push rules can no longer be modified using the account data endpoints
|
||||||
|
* Querying account availability should now work properly in polylith deployments
|
||||||
|
* A number of bugs with sync filters have been fixed
|
||||||
|
* A default sync filter will now be used if the request contains a filter ID that does not exist
|
||||||
|
* The `pushkey_ts` field is now using seconds instead of milliseconds
|
||||||
|
* A race condition when gracefully shutting down has been fixed, so JetStream should no longer cause the process to exit before other Dendrite components are finished shutting down
|
||||||
|
|
||||||
## Dendrite 0.8.1 (2022-04-07)
|
## Dendrite 0.8.1 (2022-04-07)
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
|
||||||
// this destination queue. We'll then be able to retrieve the PDU
|
// this destination queue. We'll then be able to retrieve the PDU
|
||||||
// later.
|
// later.
|
||||||
if err := oq.db.AssociatePDUWithDestination(
|
if err := oq.db.AssociatePDUWithDestination(
|
||||||
context.TODO(),
|
oq.process.Context(),
|
||||||
"", // TODO: remove this, as we don't need to persist the transaction ID
|
"", // TODO: remove this, as we don't need to persist the transaction ID
|
||||||
oq.destination, // the destination server name
|
oq.destination, // the destination server name
|
||||||
receipt, // NIDs from federationapi_queue_json table
|
receipt, // NIDs from federationapi_queue_json table
|
||||||
|
|
@ -122,7 +122,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
|
||||||
// this destination queue. We'll then be able to retrieve the PDU
|
// this destination queue. We'll then be able to retrieve the PDU
|
||||||
// later.
|
// later.
|
||||||
if err := oq.db.AssociateEDUWithDestination(
|
if err := oq.db.AssociateEDUWithDestination(
|
||||||
context.TODO(),
|
oq.process.Context(),
|
||||||
oq.destination, // the destination server name
|
oq.destination, // the destination server name
|
||||||
receipt, // NIDs from federationapi_queue_json table
|
receipt, // NIDs from federationapi_queue_json table
|
||||||
event.Type,
|
event.Type,
|
||||||
|
|
@ -177,7 +177,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
// Check to see if there's anything to do for this server
|
// Check to see if there's anything to do for this server
|
||||||
// in the database.
|
// in the database.
|
||||||
retrieved := false
|
retrieved := false
|
||||||
ctx := context.Background()
|
ctx := oq.process.Context()
|
||||||
oq.pendingMutex.Lock()
|
oq.pendingMutex.Lock()
|
||||||
defer oq.pendingMutex.Unlock()
|
defer oq.pendingMutex.Unlock()
|
||||||
|
|
||||||
|
|
@ -271,6 +271,9 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// restarted automatically the next time we have an event to
|
// restarted automatically the next time we have an event to
|
||||||
// send.
|
// send.
|
||||||
return
|
return
|
||||||
|
case <-oq.process.Context().Done():
|
||||||
|
// The parent process is shutting down, so stop.
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we are backing off this server then wait for the
|
// If we are backing off this server then wait for the
|
||||||
|
|
@ -420,13 +423,13 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
// Clean up the transaction in the database.
|
// Clean up the transaction in the database.
|
||||||
if pduReceipts != nil {
|
if pduReceipts != nil {
|
||||||
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
|
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
|
||||||
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil {
|
if err = oq.db.CleanPDUs(oq.process.Context(), oq.destination, pduReceipts); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
|
logrus.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if eduReceipts != nil {
|
if eduReceipts != nil {
|
||||||
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
|
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
|
||||||
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil {
|
if err = oq.db.CleanEDUs(oq.process.Context(), oq.destination, eduReceipts); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
|
logrus.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,6 @@
|
||||||
package queue
|
package queue
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -105,14 +104,14 @@ func NewOutgoingQueues(
|
||||||
// Look up which servers we have pending items for and then rehydrate those queues.
|
// Look up which servers we have pending items for and then rehydrate those queues.
|
||||||
if !disabled {
|
if !disabled {
|
||||||
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
|
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
|
||||||
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
|
if names, err := db.GetPendingPDUServerNames(process.Context()); err == nil {
|
||||||
for _, serverName := range names {
|
for _, serverName := range names {
|
||||||
serverNames[serverName] = struct{}{}
|
serverNames[serverName] = struct{}{}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
|
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
|
||||||
}
|
}
|
||||||
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
|
if names, err := db.GetPendingEDUServerNames(process.Context()); err == nil {
|
||||||
for _, serverName := range names {
|
for _, serverName := range names {
|
||||||
serverNames[serverName] = struct{}{}
|
serverNames[serverName] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
@ -215,7 +214,7 @@ func (oqs *OutgoingQueues) SendEvent(
|
||||||
// Check if any of the destinations are prohibited by server ACLs.
|
// Check if any of the destinations are prohibited by server ACLs.
|
||||||
for destination := range destmap {
|
for destination := range destmap {
|
||||||
if api.IsServerBannedFromRoom(
|
if api.IsServerBannedFromRoom(
|
||||||
context.TODO(),
|
oqs.process.Context(),
|
||||||
oqs.rsAPI,
|
oqs.rsAPI,
|
||||||
ev.RoomID(),
|
ev.RoomID(),
|
||||||
destination,
|
destination,
|
||||||
|
|
@ -238,7 +237,7 @@ func (oqs *OutgoingQueues) SendEvent(
|
||||||
return fmt.Errorf("json.Marshal: %w", err)
|
return fmt.Errorf("json.Marshal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nid, err := oqs.db.StoreJSON(context.TODO(), string(headeredJSON))
|
nid, err := oqs.db.StoreJSON(oqs.process.Context(), string(headeredJSON))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -286,7 +285,7 @@ func (oqs *OutgoingQueues) SendEDU(
|
||||||
if result := gjson.GetBytes(e.Content, "room_id"); result.Exists() {
|
if result := gjson.GetBytes(e.Content, "room_id"); result.Exists() {
|
||||||
for destination := range destmap {
|
for destination := range destmap {
|
||||||
if api.IsServerBannedFromRoom(
|
if api.IsServerBannedFromRoom(
|
||||||
context.TODO(),
|
oqs.process.Context(),
|
||||||
oqs.rsAPI,
|
oqs.rsAPI,
|
||||||
result.Str,
|
result.Str,
|
||||||
destination,
|
destination,
|
||||||
|
|
@ -310,7 +309,7 @@ func (oqs *OutgoingQueues) SendEDU(
|
||||||
return fmt.Errorf("json.Marshal: %w", err)
|
return fmt.Errorf("json.Marshal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nid, err := oqs.db.StoreJSON(context.TODO(), string(ephemeralJSON))
|
nid, err := oqs.db.StoreJSON(oqs.process.Context(), string(ephemeralJSON))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ var build string
|
||||||
const (
|
const (
|
||||||
VersionMajor = 0
|
VersionMajor = 0
|
||||||
VersionMinor = 8
|
VersionMinor = 8
|
||||||
VersionPatch = 1
|
VersionPatch = 2
|
||||||
VersionTag = "" // example: "rc1"
|
VersionTag = "" // example: "rc1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -346,6 +346,9 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
||||||
Addr: string(externalAddr),
|
Addr: string(externalAddr),
|
||||||
WriteTimeout: HTTPServerTimeout,
|
WriteTimeout: HTTPServerTimeout,
|
||||||
Handler: externalRouter,
|
Handler: externalRouter,
|
||||||
|
BaseContext: func(_ net.Listener) context.Context {
|
||||||
|
return b.ProcessContext.Context()
|
||||||
|
},
|
||||||
}
|
}
|
||||||
internalServ := externalServ
|
internalServ := externalServ
|
||||||
|
|
||||||
|
|
@ -361,6 +364,9 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
||||||
internalServ = &http.Server{
|
internalServ = &http.Server{
|
||||||
Addr: string(internalAddr),
|
Addr: string(internalAddr),
|
||||||
Handler: h2c.NewHandler(internalRouter, internalH2S),
|
Handler: h2c.NewHandler(internalRouter, internalH2S),
|
||||||
|
BaseContext: func(_ net.Listener) context.Context {
|
||||||
|
return b.ProcessContext.Context()
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -463,13 +469,11 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
||||||
}
|
}
|
||||||
|
|
||||||
minwinsvc.SetOnExit(b.ProcessContext.ShutdownDendrite)
|
minwinsvc.SetOnExit(b.ProcessContext.ShutdownDendrite)
|
||||||
b.WaitForShutdown()
|
<-b.ProcessContext.WaitForShutdown()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
logrus.Infof("Stopping HTTP listeners")
|
||||||
cancel()
|
_ = internalServ.Shutdown(context.Background())
|
||||||
|
_ = externalServ.Shutdown(context.Background())
|
||||||
_ = internalServ.Shutdown(ctx)
|
|
||||||
_ = externalServ.Shutdown(ctx)
|
|
||||||
logrus.Infof("Stopped HTTP listeners")
|
logrus.Infof("Stopped HTTP listeners")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,16 @@ func JetStreamConsumer(
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
// If the parent context has given up then there's no point in
|
||||||
|
// carrying on doing anything, so stop the listener.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
if err := sub.Unsubscribe(); err != nil {
|
||||||
|
logrus.WithContext(ctx).Warnf("Failed to unsubscribe %q", durable)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
// The context behaviour here is surprising — we supply a context
|
// The context behaviour here is surprising — we supply a context
|
||||||
// so that we can interrupt the fetch if we want, but NATS will still
|
// so that we can interrupt the fetch if we want, but NATS will still
|
||||||
// enforce its own deadline (roughly 5 seconds by default). Therefore
|
// enforce its own deadline (roughly 5 seconds by default). Therefore
|
||||||
|
|
@ -65,18 +75,18 @@ func JetStreamConsumer(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
msg := msgs[0]
|
msg := msgs[0]
|
||||||
if err = msg.InProgress(); err != nil {
|
if err = msg.InProgress(nats.Context(ctx)); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if f(ctx, msg) {
|
if f(ctx, msg) {
|
||||||
if err = msg.AckSync(); err != nil {
|
if err = msg.AckSync(nats.Context(ctx)); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err = msg.Nak(); err != nil {
|
if err = msg.Nak(nats.Context(ctx)); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,6 @@ func (s *accountDataStatements) SelectAccountDataInRange(
|
||||||
accountDataEventFilter *gomatrixserverlib.EventFilter,
|
accountDataEventFilter *gomatrixserverlib.EventFilter,
|
||||||
) (data map[string][]string, pos types.StreamPosition, err error) {
|
) (data map[string][]string, pos types.StreamPosition, err error) {
|
||||||
data = make(map[string][]string)
|
data = make(map[string][]string)
|
||||||
pos = r.Low()
|
|
||||||
|
|
||||||
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(),
|
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(),
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)),
|
||||||
|
|
@ -135,6 +134,9 @@ func (s *accountDataStatements) SelectAccountDataInRange(
|
||||||
pos = id
|
pos = id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if pos == 0 {
|
||||||
|
pos = r.High()
|
||||||
|
}
|
||||||
return data, pos, rows.Err()
|
return data, pos, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,6 @@ func (s *accountDataStatements) SelectAccountDataInRange(
|
||||||
r types.Range,
|
r types.Range,
|
||||||
filter *gomatrixserverlib.EventFilter,
|
filter *gomatrixserverlib.EventFilter,
|
||||||
) (data map[string][]string, pos types.StreamPosition, err error) {
|
) (data map[string][]string, pos types.StreamPosition, err error) {
|
||||||
pos = r.Low()
|
|
||||||
data = make(map[string][]string)
|
data = make(map[string][]string)
|
||||||
stmt, params, err := prepareWithFilters(
|
stmt, params, err := prepareWithFilters(
|
||||||
s.db, nil, selectAccountDataInRangeSQL,
|
s.db, nil, selectAccountDataInRangeSQL,
|
||||||
|
|
@ -131,7 +130,9 @@ func (s *accountDataStatements) SelectAccountDataInRange(
|
||||||
pos = id
|
pos = id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if pos == 0 {
|
||||||
|
pos = r.High()
|
||||||
|
}
|
||||||
return data, pos, nil
|
return data, pos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue