mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 10:33:11 -06:00
Merge branch 'main' into s7evink/userprofile
This commit is contained in:
commit
72c3924b3e
|
|
@ -101,6 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
|
log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
|
||||||
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
|
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
|
// Only handle events we care about
|
||||||
|
receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
|
||||||
|
if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInviteEvent {
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -19,11 +19,12 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type uploadKeysRequest struct {
|
type uploadKeysRequest struct {
|
||||||
|
|
@ -77,7 +78,6 @@ func UploadKeys(req *http.Request, keyAPI api.ClientKeyAPI, device *userapi.Devi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
keyCount := make(map[string]int)
|
keyCount := make(map[string]int)
|
||||||
// we only return key counts when the client uploads OTKs
|
|
||||||
if len(uploadRes.OneTimeKeyCounts) > 0 {
|
if len(uploadRes.OneTimeKeyCounts) > 0 {
|
||||||
keyCount = uploadRes.OneTimeKeyCounts[0].KeyCount
|
keyCount = uploadRes.OneTimeKeyCounts[0].KeyCount
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,12 +18,17 @@ global:
|
||||||
private_key: matrix_key.pem
|
private_key: matrix_key.pem
|
||||||
|
|
||||||
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
|
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
|
||||||
# to old signing private keys that were formerly in use on this domain. These
|
# to old signing keys that were formerly in use on this domain name. These
|
||||||
# keys will not be used for federation request or event signing, but will be
|
# keys will not be used for federation request or event signing, but will be
|
||||||
# provided to any other homeserver that asks when trying to verify old events.
|
# provided to any other homeserver that asks when trying to verify old events.
|
||||||
old_private_keys:
|
old_private_keys:
|
||||||
|
# If the old private key file is available:
|
||||||
# - private_key: old_matrix_key.pem
|
# - private_key: old_matrix_key.pem
|
||||||
# expired_at: 1601024554498
|
# expired_at: 1601024554498
|
||||||
|
# If only the public key (in base64 format) and key ID are known:
|
||||||
|
# - public_key: mn59Kxfdq9VziYHSBzI7+EDPDcBS2Xl7jeUdiiQcOnM=
|
||||||
|
# key_id: ed25519:mykeyid
|
||||||
|
# expired_at: 1601024554498
|
||||||
|
|
||||||
# How long a remote server can cache our server signing key before requesting it
|
# How long a remote server can cache our server signing key before requesting it
|
||||||
# again. Increasing this number will reduce the number of requests made by other
|
# again. Increasing this number will reduce the number of requests made by other
|
||||||
|
|
|
||||||
|
|
@ -18,12 +18,17 @@ global:
|
||||||
private_key: matrix_key.pem
|
private_key: matrix_key.pem
|
||||||
|
|
||||||
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
|
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
|
||||||
# to old signing private keys that were formerly in use on this domain. These
|
# to old signing keys that were formerly in use on this domain name. These
|
||||||
# keys will not be used for federation request or event signing, but will be
|
# keys will not be used for federation request or event signing, but will be
|
||||||
# provided to any other homeserver that asks when trying to verify old events.
|
# provided to any other homeserver that asks when trying to verify old events.
|
||||||
old_private_keys:
|
old_private_keys:
|
||||||
|
# If the old private key file is available:
|
||||||
# - private_key: old_matrix_key.pem
|
# - private_key: old_matrix_key.pem
|
||||||
# expired_at: 1601024554498
|
# expired_at: 1601024554498
|
||||||
|
# If only the public key (in base64 format) and key ID are known:
|
||||||
|
# - public_key: mn59Kxfdq9VziYHSBzI7+EDPDcBS2Xl7jeUdiiQcOnM=
|
||||||
|
# key_id: ed25519:mykeyid
|
||||||
|
# expired_at: 1601024554498
|
||||||
|
|
||||||
# How long a remote server can cache our server signing key before requesting it
|
# How long a remote server can cache our server signing key before requesting it
|
||||||
# again. Increasing this number will reduce the number of requests made by other
|
# again. Increasing this number will reduce the number of requests made by other
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,41 @@ permalink: /installation/start/optimisation
|
||||||
Now that you have Dendrite running, the following tweaks will improve the reliability
|
Now that you have Dendrite running, the following tweaks will improve the reliability
|
||||||
and performance of your installation.
|
and performance of your installation.
|
||||||
|
|
||||||
|
## PostgreSQL connection limit
|
||||||
|
|
||||||
|
A PostgreSQL database engine is configured to allow only a certain number of connections.
|
||||||
|
This is typically controlled by the `max_connections` and `superuser_reserved_connections`
|
||||||
|
configuration items in `postgresql.conf`. Once these limits are violated, **PostgreSQL will
|
||||||
|
immediately stop accepting new connections** until some of the existing connections are closed.
|
||||||
|
This is a common source of misconfiguration and requires particular care.
|
||||||
|
|
||||||
|
If your PostgreSQL `max_connections` is set to `100` and `superuser_reserved_connections` is
|
||||||
|
set to `3` then you have an effective connection limit of 97 database connections. It is
|
||||||
|
therefore important to ensure that Dendrite doesn't violate that limit, otherwise database
|
||||||
|
queries will unexpectedly fail and this will cause problems both within Dendrite and for users.
|
||||||
|
|
||||||
|
If you are also running other software that uses the same PostgreSQL database engine, then you
|
||||||
|
must also take into account that some connections will be already used by your other software
|
||||||
|
and therefore will not be available to Dendrite. Check the configuration of any other software
|
||||||
|
using the same database engine for their configured connection limits and adjust your calculations
|
||||||
|
accordingly.
|
||||||
|
|
||||||
|
Dendrite has a `max_open_conns` configuration item in each `database` block to control how many
|
||||||
|
connections it will open to the database.
|
||||||
|
|
||||||
|
**If you are using the `global` database pool** then you only need to configure the
|
||||||
|
`max_open_conns` setting once in the `global` section.
|
||||||
|
|
||||||
|
**If you are defining a `database` config per component** then you will need to ensure that
|
||||||
|
the **sum total** of all configured `max_open_conns` to a given database server do not exceed
|
||||||
|
the connection limit. If you configure a total that adds up to more connections than are available
|
||||||
|
then this will cause database queries to fail.
|
||||||
|
|
||||||
|
You may wish to raise the `max_connections` limit on your PostgreSQL server to accommodate
|
||||||
|
additional connections, in which case you should also update the `max_open_conns` in your
|
||||||
|
Dendrite configuration accordingly. However be aware that this is only advisable on particularly
|
||||||
|
powerful servers that can handle the concurrent load of additional queries running at one time.
|
||||||
|
|
||||||
## File descriptor limit
|
## File descriptor limit
|
||||||
|
|
||||||
Most platforms have a limit on how many file descriptors a single process can open. All
|
Most platforms have a limit on how many file descriptors a single process can open. All
|
||||||
|
|
|
||||||
|
|
@ -79,6 +79,13 @@ func (s *OutputRoomEventConsumer) Start() error {
|
||||||
// realises that it cannot update the room state using the deltas.
|
// realises that it cannot update the room state using the deltas.
|
||||||
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
|
receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
|
||||||
|
|
||||||
|
// Only handle events we care about
|
||||||
|
if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInboundPeek {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -259,7 +259,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
|
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
|
||||||
}
|
}
|
||||||
logrus.WithField("hosts", joinedHosts).WithField("room", roomID).Info("Joined federated room with hosts")
|
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
|
||||||
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
|
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
|
||||||
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
|
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -160,7 +160,7 @@ func localKeys(cfg *config.FederationAPI, validUntil time.Time) (*gomatrixserver
|
||||||
for _, oldVerifyKey := range cfg.Matrix.OldVerifyKeys {
|
for _, oldVerifyKey := range cfg.Matrix.OldVerifyKeys {
|
||||||
keys.OldVerifyKeys[oldVerifyKey.KeyID] = gomatrixserverlib.OldVerifyKey{
|
keys.OldVerifyKeys[oldVerifyKey.KeyID] = gomatrixserverlib.OldVerifyKey{
|
||||||
VerifyKey: gomatrixserverlib.VerifyKey{
|
VerifyKey: gomatrixserverlib.VerifyKey{
|
||||||
Key: gomatrixserverlib.Base64Bytes(oldVerifyKey.PrivateKey.Public().(ed25519.PublicKey)),
|
Key: oldVerifyKey.PublicKey,
|
||||||
},
|
},
|
||||||
ExpiredTS: oldVerifyKey.ExpiredAt,
|
ExpiredTS: oldVerifyKey.ExpiredAt,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,29 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/blevesearch/bleve/v2"
|
"github.com/blevesearch/bleve/v2"
|
||||||
|
|
||||||
|
// side effect imports to allow all possible languages
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/ar"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/cjk"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/ckb"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/da"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/de"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/en"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/es"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/fa"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/fi"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/fr"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/hi"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/hr"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/hu"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/it"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/nl"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/no"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/pt"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/ro"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/ru"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/sv"
|
||||||
|
_ "github.com/blevesearch/bleve/v2/analysis/lang/tr"
|
||||||
"github.com/blevesearch/bleve/v2/mapping"
|
"github.com/blevesearch/bleve/v2/mapping"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package sqlutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
|
||||||
|
|
@ -9,6 +10,8 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var skipSanityChecks = flag.Bool("skip-db-sanity", false, "Ignore sanity checks on the database connections (NOT RECOMMENDED!)")
|
||||||
|
|
||||||
// Open opens a database specified by its database driver name and a driver-specific data source name,
|
// Open opens a database specified by its database driver name and a driver-specific data source name,
|
||||||
// usually consisting of at least a database name and connection information. Includes tracing driver
|
// usually consisting of at least a database name and connection information. Includes tracing driver
|
||||||
// if DENDRITE_TRACE_SQL=1
|
// if DENDRITE_TRACE_SQL=1
|
||||||
|
|
@ -37,15 +40,39 @@ func Open(dbProperties *config.DatabaseOptions, writer Writer) (*sql.DB, error)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if driverName != "sqlite3" {
|
if driverName != "sqlite3" {
|
||||||
logrus.WithFields(logrus.Fields{
|
logger := logrus.WithFields(logrus.Fields{
|
||||||
"MaxOpenConns": dbProperties.MaxOpenConns(),
|
"max_open_conns": dbProperties.MaxOpenConns(),
|
||||||
"MaxIdleConns": dbProperties.MaxIdleConns(),
|
"max_idle_conns": dbProperties.MaxIdleConns(),
|
||||||
"ConnMaxLifetime": dbProperties.ConnMaxLifetime(),
|
"conn_max_lifetime": dbProperties.ConnMaxLifetime(),
|
||||||
"dataSourceName": regexp.MustCompile(`://[^@]*@`).ReplaceAllLiteralString(dsn, "://"),
|
"data_source_name": regexp.MustCompile(`://[^@]*@`).ReplaceAllLiteralString(dsn, "://"),
|
||||||
}).Debug("Setting DB connection limits")
|
})
|
||||||
|
logger.Debug("Setting DB connection limits")
|
||||||
db.SetMaxOpenConns(dbProperties.MaxOpenConns())
|
db.SetMaxOpenConns(dbProperties.MaxOpenConns())
|
||||||
db.SetMaxIdleConns(dbProperties.MaxIdleConns())
|
db.SetMaxIdleConns(dbProperties.MaxIdleConns())
|
||||||
db.SetConnMaxLifetime(dbProperties.ConnMaxLifetime())
|
db.SetConnMaxLifetime(dbProperties.ConnMaxLifetime())
|
||||||
|
|
||||||
|
if !*skipSanityChecks {
|
||||||
|
if dbProperties.MaxOpenConns() == 0 {
|
||||||
|
logrus.Warnf("WARNING: Configuring 'max_open_conns' to be unlimited is not recommended. This can result in bad performance or deadlocks.")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch driverName {
|
||||||
|
case "postgres":
|
||||||
|
// Perform a quick sanity check if possible that we aren't trying to use more database
|
||||||
|
// connections than PostgreSQL is willing to give us.
|
||||||
|
var max, reserved int
|
||||||
|
if err := db.QueryRow("SELECT setting::integer FROM pg_settings WHERE name='max_connections';").Scan(&max); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to find maximum connections: %w", err)
|
||||||
|
}
|
||||||
|
if err := db.QueryRow("SELECT setting::integer FROM pg_settings WHERE name='superuser_reserved_connections';").Scan(&reserved); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to find reserved connections: %w", err)
|
||||||
|
}
|
||||||
|
if configured, allowed := dbProperties.MaxOpenConns(), max-reserved; configured > allowed {
|
||||||
|
logrus.Errorf("ERROR: The configured 'max_open_conns' is greater than the %d non-superuser connections that PostgreSQL is configured to allow. This can result in bad performance or deadlocks. Please pay close attention to your configured database connection counts. If you REALLY know what you are doing and want to override this error, pass the --skip-db-sanity option to Dendrite.", allowed)
|
||||||
|
return nil, fmt.Errorf("database sanity checks failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -424,7 +424,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
"succeeded": successCount,
|
"succeeded": successCount,
|
||||||
"failed": len(userIDs) - successCount,
|
"failed": len(userIDs) - successCount,
|
||||||
"wait_time": waitTime,
|
"wait_time": waitTime,
|
||||||
}).Warn("Failed to query device keys for some users")
|
}).Debug("Failed to query device keys for some users")
|
||||||
}
|
}
|
||||||
return waitTime, !allUsersSucceeded
|
return waitTime, !allUsersSucceeded
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,11 @@ func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.Perform
|
||||||
if len(req.OneTimeKeys) > 0 {
|
if len(req.OneTimeKeys) > 0 {
|
||||||
a.uploadOneTimeKeys(ctx, req, res)
|
a.uploadOneTimeKeys(ctx, req, res)
|
||||||
}
|
}
|
||||||
|
otks, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
res.OneTimeKeyCounts = []api.OneTimeKeysCount{*otks}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -278,6 +278,7 @@ type QuerySharedUsersRequest struct {
|
||||||
OtherUserIDs []string
|
OtherUserIDs []string
|
||||||
ExcludeRoomIDs []string
|
ExcludeRoomIDs []string
|
||||||
IncludeRoomIDs []string
|
IncludeRoomIDs []string
|
||||||
|
LocalOnly bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type QuerySharedUsersResponse struct {
|
type QuerySharedUsersResponse struct {
|
||||||
|
|
|
||||||
|
|
@ -173,12 +173,15 @@ func (r *Inputer) processRoomEvent(
|
||||||
for _, server := range serverRes.ServerNames {
|
for _, server := range serverRes.ServerNames {
|
||||||
servers[server] = struct{}{}
|
servers[server] = struct{}{}
|
||||||
}
|
}
|
||||||
|
// Don't try to talk to ourselves.
|
||||||
|
delete(servers, r.Cfg.Matrix.ServerName)
|
||||||
|
// Now build up the list of servers.
|
||||||
serverRes.ServerNames = serverRes.ServerNames[:0]
|
serverRes.ServerNames = serverRes.ServerNames[:0]
|
||||||
if input.Origin != "" {
|
if input.Origin != "" && input.Origin != r.Cfg.Matrix.ServerName {
|
||||||
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
|
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
|
||||||
delete(servers, input.Origin)
|
delete(servers, input.Origin)
|
||||||
}
|
}
|
||||||
if senderDomain != input.Origin {
|
if senderDomain != input.Origin && senderDomain != r.Cfg.Matrix.ServerName {
|
||||||
serverRes.ServerNames = append(serverRes.ServerNames, senderDomain)
|
serverRes.ServerNames = append(serverRes.ServerNames, senderDomain)
|
||||||
delete(servers, senderDomain)
|
delete(servers, senderDomain)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -799,7 +799,7 @@ func (r *Queryer) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUser
|
||||||
}
|
}
|
||||||
roomIDs = roomIDs[:j]
|
roomIDs = roomIDs[:j]
|
||||||
|
|
||||||
users, err := r.DB.JoinedUsersSetInRooms(ctx, roomIDs, req.OtherUserIDs)
|
users, err := r.DB.JoinedUsersSetInRooms(ctx, roomIDs, req.OtherUserIDs, req.LocalOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,12 +17,13 @@ package producers
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/acls"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
)
|
)
|
||||||
|
|
||||||
var keyContentFields = map[string]string{
|
var keyContentFields = map[string]string{
|
||||||
|
|
@ -40,10 +41,8 @@ type RoomEventProducer struct {
|
||||||
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
|
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
|
||||||
var err error
|
var err error
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
msg := &nats.Msg{
|
msg := nats.NewMsg(r.Topic)
|
||||||
Subject: r.Topic,
|
msg.Header.Set(jetstream.RoomEventType, string(update.Type))
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
msg.Header.Set(jetstream.RoomID, roomID)
|
msg.Header.Set(jetstream.RoomID, roomID)
|
||||||
msg.Data, err = json.Marshal(update)
|
msg.Data, err = json.Marshal(update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,7 @@ type Database interface {
|
||||||
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
|
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
|
||||||
GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error)
|
GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error)
|
||||||
// JoinedUsersSetInRooms returns how many times each of the given users appears across the given rooms.
|
// JoinedUsersSetInRooms returns how many times each of the given users appears across the given rooms.
|
||||||
JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string) (map[string]int, error)
|
JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string, localOnly bool) (map[string]int, error)
|
||||||
// GetLocalServerInRoom returns true if we think we're in a given room or false otherwise.
|
// GetLocalServerInRoom returns true if we think we're in a given room or false otherwise.
|
||||||
GetLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error)
|
GetLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error)
|
||||||
// GetServerInRoom returns true if we think a server is in a given room or false otherwise.
|
// GetServerInRoom returns true if we think a server is in a given room or false otherwise.
|
||||||
|
|
|
||||||
|
|
@ -68,14 +68,18 @@ CREATE TABLE IF NOT EXISTS roomserver_membership (
|
||||||
|
|
||||||
var selectJoinedUsersSetForRoomsAndUserSQL = "" +
|
var selectJoinedUsersSetForRoomsAndUserSQL = "" +
|
||||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||||
" WHERE room_nid = ANY($1) AND target_nid = ANY($2) AND" +
|
" WHERE (target_local OR $1 = false)" +
|
||||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
" AND room_nid = ANY($2) AND target_nid = ANY($3)" +
|
||||||
|
" AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) +
|
||||||
|
" AND forgotten = false" +
|
||||||
" GROUP BY target_nid"
|
" GROUP BY target_nid"
|
||||||
|
|
||||||
var selectJoinedUsersSetForRoomsSQL = "" +
|
var selectJoinedUsersSetForRoomsSQL = "" +
|
||||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||||
" WHERE room_nid = ANY($1) AND" +
|
" WHERE (target_local OR $1 = false) " +
|
||||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
" AND room_nid = ANY($2)" +
|
||||||
|
" AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) +
|
||||||
|
" AND forgotten = false" +
|
||||||
" GROUP BY target_nid"
|
" GROUP BY target_nid"
|
||||||
|
|
||||||
// Insert a row in to membership table so that it can be locked by the
|
// Insert a row in to membership table so that it can be locked by the
|
||||||
|
|
@ -334,6 +338,7 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
roomNIDs []types.RoomNID,
|
roomNIDs []types.RoomNID,
|
||||||
userNIDs []types.EventStateKeyNID,
|
userNIDs []types.EventStateKeyNID,
|
||||||
|
localOnly bool,
|
||||||
) (map[types.EventStateKeyNID]int, error) {
|
) (map[types.EventStateKeyNID]int, error) {
|
||||||
var (
|
var (
|
||||||
rows *sql.Rows
|
rows *sql.Rows
|
||||||
|
|
@ -342,9 +347,9 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms(
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsStmt)
|
stmt := sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsStmt)
|
||||||
if len(userNIDs) > 0 {
|
if len(userNIDs) > 0 {
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsAndUserStmt)
|
stmt = sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsAndUserStmt)
|
||||||
rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs), pq.Array(userNIDs))
|
rows, err = stmt.QueryContext(ctx, localOnly, pq.Array(roomNIDs), pq.Array(userNIDs))
|
||||||
} else {
|
} else {
|
||||||
rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs))
|
rows, err = stmt.QueryContext(ctx, localOnly, pq.Array(roomNIDs))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -1280,7 +1280,7 @@ func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tu
|
||||||
}
|
}
|
||||||
|
|
||||||
// JoinedUsersSetInRooms returns a map of how many times the given users appear in the specified rooms.
|
// JoinedUsersSetInRooms returns a map of how many times the given users appear in the specified rooms.
|
||||||
func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string) (map[string]int, error) {
|
func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string, localOnly bool) (map[string]int, error) {
|
||||||
roomNIDs, err := d.RoomsTable.BulkSelectRoomNIDs(ctx, nil, roomIDs)
|
roomNIDs, err := d.RoomsTable.BulkSelectRoomNIDs(ctx, nil, roomIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -1295,7 +1295,7 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs [
|
||||||
userNIDs = append(userNIDs, nid)
|
userNIDs = append(userNIDs, nid)
|
||||||
nidToUserID[nid] = id
|
nidToUserID[nid] = id
|
||||||
}
|
}
|
||||||
userNIDToCount, err := d.MembershipTable.SelectJoinedUsersSetForRooms(ctx, nil, roomNIDs, userNIDs)
|
userNIDToCount, err := d.MembershipTable.SelectJoinedUsersSetForRooms(ctx, nil, roomNIDs, userNIDs, localOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,14 +44,18 @@ const membershipSchema = `
|
||||||
|
|
||||||
var selectJoinedUsersSetForRoomsAndUserSQL = "" +
|
var selectJoinedUsersSetForRoomsAndUserSQL = "" +
|
||||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||||
" WHERE room_nid IN ($1) AND target_nid IN ($2) AND" +
|
" WHERE (target_local OR $1 = false)" +
|
||||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
" AND room_nid IN ($2) AND target_nid IN ($3)" +
|
||||||
|
" AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) +
|
||||||
|
" AND forgotten = false" +
|
||||||
" GROUP BY target_nid"
|
" GROUP BY target_nid"
|
||||||
|
|
||||||
var selectJoinedUsersSetForRoomsSQL = "" +
|
var selectJoinedUsersSetForRoomsSQL = "" +
|
||||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||||
" WHERE room_nid IN ($1) AND " +
|
" WHERE (target_local OR $1 = false)" +
|
||||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
" AND room_nid IN ($2)" +
|
||||||
|
" AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) +
|
||||||
|
" AND forgotten = false" +
|
||||||
" GROUP BY target_nid"
|
" GROUP BY target_nid"
|
||||||
|
|
||||||
// Insert a row in to membership table so that it can be locked by the
|
// Insert a row in to membership table so that it can be locked by the
|
||||||
|
|
@ -305,8 +309,9 @@ func (s *membershipStatements) SelectRoomsWithMembership(
|
||||||
return roomNIDs, nil
|
return roomNIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]int, error) {
|
func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, localOnly bool) (map[types.EventStateKeyNID]int, error) {
|
||||||
params := make([]interface{}, 0, len(roomNIDs)+len(userNIDs))
|
params := make([]interface{}, 0, 1+len(roomNIDs)+len(userNIDs))
|
||||||
|
params = append(params, localOnly)
|
||||||
for _, v := range roomNIDs {
|
for _, v := range roomNIDs {
|
||||||
params = append(params, v)
|
params = append(params, v)
|
||||||
}
|
}
|
||||||
|
|
@ -314,10 +319,10 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context,
|
||||||
params = append(params, v)
|
params = append(params, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1)
|
query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($2)", sqlutil.QueryVariadicOffset(len(roomNIDs), 1), 1)
|
||||||
if len(userNIDs) > 0 {
|
if len(userNIDs) > 0 {
|
||||||
query = strings.Replace(selectJoinedUsersSetForRoomsAndUserSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1)
|
query = strings.Replace(selectJoinedUsersSetForRoomsAndUserSQL, "($2)", sqlutil.QueryVariadicOffset(len(roomNIDs), 1), 1)
|
||||||
query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)), 1)
|
query = strings.Replace(query, "($3)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)+1), 1)
|
||||||
}
|
}
|
||||||
var rows *sql.Rows
|
var rows *sql.Rows
|
||||||
var err error
|
var err error
|
||||||
|
|
|
||||||
|
|
@ -137,7 +137,7 @@ type Membership interface {
|
||||||
UpdateMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, senderUserNID types.EventStateKeyNID, membership MembershipState, eventNID types.EventNID, forgotten bool) (bool, error)
|
UpdateMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, senderUserNID types.EventStateKeyNID, membership MembershipState, eventNID types.EventNID, forgotten bool) (bool, error)
|
||||||
SelectRoomsWithMembership(ctx context.Context, txn *sql.Tx, userID types.EventStateKeyNID, membershipState MembershipState) ([]types.RoomNID, error)
|
SelectRoomsWithMembership(ctx context.Context, txn *sql.Tx, userID types.EventStateKeyNID, membershipState MembershipState) ([]types.RoomNID, error)
|
||||||
// SelectJoinedUsersSetForRooms returns how many times each of the given users appears across the given rooms.
|
// SelectJoinedUsersSetForRooms returns how many times each of the given users appears across the given rooms.
|
||||||
SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]int, error)
|
SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, localOnly bool) (map[types.EventStateKeyNID]int, error)
|
||||||
SelectKnownUsers(ctx context.Context, txn *sql.Tx, userID types.EventStateKeyNID, searchString string, limit int) ([]string, error)
|
SelectKnownUsers(ctx context.Context, txn *sql.Tx, userID types.EventStateKeyNID, searchString string, limit int) ([]string, error)
|
||||||
UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error
|
UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error
|
||||||
SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error)
|
SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error)
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ func TestMembershipTable(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, inRoom)
|
assert.True(t, inRoom)
|
||||||
|
|
||||||
userJoinedToRooms, err := tab.SelectJoinedUsersSetForRooms(ctx, nil, []types.RoomNID{1}, userNIDs)
|
userJoinedToRooms, err := tab.SelectJoinedUsersSetForRooms(ctx, nil, []types.RoomNID{1}, userNIDs, false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 1, len(userJoinedToRooms))
|
assert.Equal(t, 1, len(userJoinedToRooms))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -231,24 +231,40 @@ func loadConfig(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, oldPrivateKey := range c.Global.OldVerifyKeys {
|
for _, key := range c.Global.OldVerifyKeys {
|
||||||
var oldPrivateKeyData []byte
|
switch {
|
||||||
|
case key.PrivateKeyPath != "":
|
||||||
|
var oldPrivateKeyData []byte
|
||||||
|
oldPrivateKeyPath := absPath(basePath, key.PrivateKeyPath)
|
||||||
|
oldPrivateKeyData, err = readFile(oldPrivateKeyPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read %q: %w", oldPrivateKeyPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
oldPrivateKeyPath := absPath(basePath, oldPrivateKey.PrivateKeyPath)
|
// NOTSPEC: Ordinarily we should enforce key ID formatting, but since there are
|
||||||
oldPrivateKeyData, err = readFile(oldPrivateKeyPath)
|
// a number of private keys out there with non-compatible symbols in them due
|
||||||
if err != nil {
|
// to lack of validation in Synapse, we won't enforce that for old verify keys.
|
||||||
return nil, err
|
keyID, privateKey, perr := readKeyPEM(oldPrivateKeyPath, oldPrivateKeyData, false)
|
||||||
|
if perr != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse %q: %w", oldPrivateKeyPath, perr)
|
||||||
|
}
|
||||||
|
|
||||||
|
key.KeyID = keyID
|
||||||
|
key.PrivateKey = privateKey
|
||||||
|
key.PublicKey = gomatrixserverlib.Base64Bytes(privateKey.Public().(ed25519.PublicKey))
|
||||||
|
|
||||||
|
case key.KeyID == "":
|
||||||
|
return nil, fmt.Errorf("'key_id' must be specified if 'public_key' is specified")
|
||||||
|
|
||||||
|
case len(key.PublicKey) == ed25519.PublicKeySize:
|
||||||
|
continue
|
||||||
|
|
||||||
|
case len(key.PublicKey) > 0:
|
||||||
|
return nil, fmt.Errorf("the supplied 'public_key' is the wrong length")
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("either specify a 'private_key' path or supply both 'public_key' and 'key_id'")
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTSPEC: Ordinarily we should enforce key ID formatting, but since there are
|
|
||||||
// a number of private keys out there with non-compatible symbols in them due
|
|
||||||
// to lack of validation in Synapse, we won't enforce that for old verify keys.
|
|
||||||
keyID, privateKey, perr := readKeyPEM(oldPrivateKeyPath, oldPrivateKeyData, false)
|
|
||||||
if perr != nil {
|
|
||||||
return nil, perr
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Global.OldVerifyKeys[i].KeyID, c.Global.OldVerifyKeys[i].PrivateKey = keyID, privateKey
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.MediaAPI.AbsBasePath = Path(absPath(basePath, c.MediaAPI.BasePath))
|
c.MediaAPI.AbsBasePath = Path(absPath(basePath, c.MediaAPI.BasePath))
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ type Global struct {
|
||||||
// Information about old private keys that used to be used to sign requests and
|
// Information about old private keys that used to be used to sign requests and
|
||||||
// events on this domain. They will not be used but will be advertised to other
|
// events on this domain. They will not be used but will be advertised to other
|
||||||
// servers that ask for them to help verify old events.
|
// servers that ask for them to help verify old events.
|
||||||
OldVerifyKeys []OldVerifyKeys `yaml:"old_private_keys"`
|
OldVerifyKeys []*OldVerifyKeys `yaml:"old_private_keys"`
|
||||||
|
|
||||||
// How long a remote server can cache our server key for before requesting it again.
|
// How long a remote server can cache our server key for before requesting it again.
|
||||||
// Increasing this number will reduce the number of requests made by remote servers
|
// Increasing this number will reduce the number of requests made by remote servers
|
||||||
|
|
@ -127,8 +127,11 @@ type OldVerifyKeys struct {
|
||||||
// The private key itself.
|
// The private key itself.
|
||||||
PrivateKey ed25519.PrivateKey `yaml:"-"`
|
PrivateKey ed25519.PrivateKey `yaml:"-"`
|
||||||
|
|
||||||
|
// The public key, in case only that part is known.
|
||||||
|
PublicKey gomatrixserverlib.Base64Bytes `yaml:"public_key"`
|
||||||
|
|
||||||
// The key ID of the private key.
|
// The key ID of the private key.
|
||||||
KeyID gomatrixserverlib.KeyID `yaml:"-"`
|
KeyID gomatrixserverlib.KeyID `yaml:"key_id"`
|
||||||
|
|
||||||
// When the private key was designed as "expired", as a UNIX timestamp
|
// When the private key was designed as "expired", as a UNIX timestamp
|
||||||
// in millisecond precision.
|
// in millisecond precision.
|
||||||
|
|
|
||||||
|
|
@ -9,9 +9,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
UserID = "user_id"
|
UserID = "user_id"
|
||||||
RoomID = "room_id"
|
RoomID = "room_id"
|
||||||
EventID = "event_id"
|
EventID = "event_id"
|
||||||
|
RoomEventType = "output_room_event_type"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,8 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
|
||||||
// work out who we need to notify about the new key
|
// work out who we need to notify about the new key
|
||||||
var queryRes roomserverAPI.QuerySharedUsersResponse
|
var queryRes roomserverAPI.QuerySharedUsersResponse
|
||||||
err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{
|
err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{
|
||||||
UserID: output.UserID,
|
UserID: output.UserID,
|
||||||
|
LocalOnly: true,
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
|
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
|
||||||
|
|
@ -135,7 +136,8 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
|
||||||
// work out who we need to notify about the new key
|
// work out who we need to notify about the new key
|
||||||
var queryRes roomserverAPI.QuerySharedUsersResponse
|
var queryRes roomserverAPI.QuerySharedUsersResponse
|
||||||
err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{
|
err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{
|
||||||
UserID: output.UserID,
|
UserID: output.UserID,
|
||||||
|
LocalOnly: true,
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
|
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
|
||||||
|
|
|
||||||
|
|
@ -170,9 +170,12 @@ func joinResponseWithRooms(syncResponse *types.Response, userID string, roomIDs
|
||||||
Content: []byte(`{"membership":"join"}`),
|
Content: []byte(`{"membership":"join"}`),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
jr, ok := syncResponse.Rooms.Join[roomID]
|
||||||
jr := syncResponse.Rooms.Join[roomID]
|
if !ok {
|
||||||
jr.State.Events = roomEvents
|
jr = types.NewJoinResponse()
|
||||||
|
}
|
||||||
|
jr.Timeline = &types.Timeline{}
|
||||||
|
jr.State = &types.ClientEvents{Events: roomEvents}
|
||||||
syncResponse.Rooms.Join[roomID] = jr
|
syncResponse.Rooms.Join[roomID] = jr
|
||||||
}
|
}
|
||||||
return syncResponse
|
return syncResponse
|
||||||
|
|
@ -191,8 +194,11 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
lr := syncResponse.Rooms.Leave[roomID]
|
lr, ok := syncResponse.Rooms.Leave[roomID]
|
||||||
lr.Timeline.Events = roomEvents
|
if !ok {
|
||||||
|
lr = types.NewLeaveResponse()
|
||||||
|
}
|
||||||
|
lr.Timeline = &types.Timeline{Events: roomEvents}
|
||||||
syncResponse.Rooms.Leave[roomID] = lr
|
syncResponse.Rooms.Leave[roomID] = lr
|
||||||
}
|
}
|
||||||
return syncResponse
|
return syncResponse
|
||||||
|
|
@ -328,9 +334,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
jr := syncResponse.Rooms.Join[roomID]
|
jr, ok := syncResponse.Rooms.Join[roomID]
|
||||||
jr.State.Events = roomStateEvents
|
if !ok {
|
||||||
jr.Timeline.Events = roomTimelineEvents
|
jr = types.NewJoinResponse()
|
||||||
|
}
|
||||||
|
|
||||||
|
jr.State = &types.ClientEvents{Events: roomStateEvents}
|
||||||
|
jr.Timeline = &types.Timeline{Events: roomTimelineEvents}
|
||||||
syncResponse.Rooms.Join[roomID] = jr
|
syncResponse.Rooms.Join[roomID] = jr
|
||||||
|
|
||||||
rsAPI := &mockRoomserverAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
|
|
@ -442,8 +452,11 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
lr := syncResponse.Rooms.Leave[roomID]
|
lr, ok := syncResponse.Rooms.Leave[roomID]
|
||||||
lr.Timeline.Events = roomEvents
|
if !ok {
|
||||||
|
lr = types.NewLeaveResponse()
|
||||||
|
}
|
||||||
|
lr.Timeline = &types.Timeline{Events: roomEvents}
|
||||||
syncResponse.Rooms.Leave[roomID] = lr
|
syncResponse.Rooms.Leave[roomID] = lr
|
||||||
|
|
||||||
rsAPI := &mockRoomserverAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
|
|
|
||||||
|
|
@ -90,9 +90,9 @@ func (p *AccountDataStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
|
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
|
||||||
joinData := *types.NewJoinResponse()
|
joinData, ok := req.Response.Rooms.Join[roomID]
|
||||||
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
|
if !ok {
|
||||||
joinData = existing
|
joinData = types.NewJoinResponse()
|
||||||
}
|
}
|
||||||
joinData.AccountData.Events = append(
|
joinData.AccountData.Events = append(
|
||||||
joinData.AccountData.Events,
|
joinData.AccountData.Events,
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ func (p *InviteStreamProvider) IncrementalSync(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ir := types.NewInviteResponse(inviteEvent)
|
ir := types.NewInviteResponse(inviteEvent)
|
||||||
req.Response.Rooms.Invite[roomID] = *ir
|
req.Response.Rooms.Invite[roomID] = ir
|
||||||
}
|
}
|
||||||
|
|
||||||
// When doing an initial sync, we don't want to add retired invites, as this
|
// When doing an initial sync, we don't want to add retired invites, as this
|
||||||
|
|
@ -87,7 +87,7 @@ func (p *InviteStreamProvider) IncrementalSync(
|
||||||
Type: "m.room.member",
|
Type: "m.room.member",
|
||||||
Content: gomatrixserverlib.RawJSON(`{"membership":"leave"}`),
|
Content: gomatrixserverlib.RawJSON(`{"membership":"leave"}`),
|
||||||
})
|
})
|
||||||
req.Response.Rooms.Leave[roomID] = *lr
|
req.Response.Rooms.Leave[roomID] = lr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
req.Response.Rooms.Join[roomID] = *jr
|
req.Response.Rooms.Join[roomID] = jr
|
||||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -129,7 +129,7 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
req.Response.Rooms.Peek[peek.RoomID] = *jr
|
req.Response.Rooms.Peek[peek.RoomID] = jr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -320,7 +320,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
// didn't "remove" events, return that the response is limited.
|
// didn't "remove" events, return that the response is limited.
|
||||||
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
|
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Join[delta.RoomID] = *jr
|
res.Rooms.Join[delta.RoomID] = jr
|
||||||
|
|
||||||
case gomatrixserverlib.Peek:
|
case gomatrixserverlib.Peek:
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
|
|
@ -329,7 +329,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = limited
|
jr.Timeline.Limited = limited
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Peek[delta.RoomID] = *jr
|
res.Rooms.Peek[delta.RoomID] = jr
|
||||||
|
|
||||||
case gomatrixserverlib.Leave:
|
case gomatrixserverlib.Leave:
|
||||||
fallthrough // transitions to leave are the same as ban
|
fallthrough // transitions to leave are the same as ban
|
||||||
|
|
@ -342,7 +342,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
// didn't "remove" events, return that the response is limited.
|
// didn't "remove" events, return that the response is limited.
|
||||||
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
|
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
|
||||||
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Leave[delta.RoomID] = *lr
|
res.Rooms.Leave[delta.RoomID] = lr
|
||||||
}
|
}
|
||||||
|
|
||||||
return latestPosition, nil
|
return latestPosition, nil
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ReceiptStreamProvider struct {
|
type ReceiptStreamProvider struct {
|
||||||
|
|
@ -76,9 +77,9 @@ func (p *ReceiptStreamProvider) IncrementalSync(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
jr := *types.NewJoinResponse()
|
jr, ok := req.Response.Rooms.Join[roomID]
|
||||||
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
|
if !ok {
|
||||||
jr = existing
|
jr = types.NewJoinResponse()
|
||||||
}
|
}
|
||||||
|
|
||||||
ev := gomatrixserverlib.ClientEvent{
|
ev := gomatrixserverlib.ClientEvent{
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type TypingStreamProvider struct {
|
type TypingStreamProvider struct {
|
||||||
|
|
@ -35,9 +36,9 @@ func (p *TypingStreamProvider) IncrementalSync(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
jr := *types.NewJoinResponse()
|
jr, ok := req.Response.Rooms.Join[roomID]
|
||||||
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
|
if !ok {
|
||||||
jr = existing
|
jr = types.NewJoinResponse()
|
||||||
}
|
}
|
||||||
|
|
||||||
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
|
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
|
||||||
|
|
|
||||||
|
|
@ -407,7 +407,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.PDUStreamProvider.IncrementalSync(
|
return rp.streams.PDUStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.PDUPosition, currentPos.PDUPosition,
|
syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -416,7 +416,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.TypingStreamProvider.IncrementalSync(
|
return rp.streams.TypingStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.TypingPosition, currentPos.TypingPosition,
|
syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -425,7 +425,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.ReceiptStreamProvider.IncrementalSync(
|
return rp.streams.ReceiptStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
|
syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -434,7 +434,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.InviteStreamProvider.IncrementalSync(
|
return rp.streams.InviteStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.InvitePosition, currentPos.InvitePosition,
|
syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -443,7 +443,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
|
return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
|
syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -452,7 +452,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.AccountDataStreamProvider.IncrementalSync(
|
return rp.streams.AccountDataStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
|
syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -461,7 +461,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.NotificationDataStreamProvider.IncrementalSync(
|
return rp.streams.NotificationDataStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
|
syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -470,7 +470,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.DeviceListStreamProvider.IncrementalSync(
|
return rp.streams.DeviceListStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
|
syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
@ -479,7 +479,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||||
return rp.streams.PresenceStreamProvider.IncrementalSync(
|
return rp.streams.PresenceStreamProvider.IncrementalSync(
|
||||||
syncReq.Context, txn, syncReq,
|
syncReq.Context, txn, syncReq,
|
||||||
syncReq.Since.PresencePosition, currentPos.PresencePosition,
|
syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
|
||||||
|
|
@ -327,29 +327,57 @@ type PrevEventRef struct {
|
||||||
PrevSender string `json:"prev_sender"`
|
PrevSender string `json:"prev_sender"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type DeviceLists struct {
|
||||||
|
Changed []string `json:"changed,omitempty"`
|
||||||
|
Left []string `json:"left,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RoomsResponse struct {
|
||||||
|
Join map[string]*JoinResponse `json:"join,omitempty"`
|
||||||
|
Peek map[string]*JoinResponse `json:"peek,omitempty"`
|
||||||
|
Invite map[string]*InviteResponse `json:"invite,omitempty"`
|
||||||
|
Leave map[string]*LeaveResponse `json:"leave,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ToDeviceResponse struct {
|
||||||
|
Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||||
type Response struct {
|
type Response struct {
|
||||||
NextBatch StreamingToken `json:"next_batch"`
|
NextBatch StreamingToken `json:"next_batch"`
|
||||||
AccountData struct {
|
AccountData *ClientEvents `json:"account_data,omitempty"`
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
|
Presence *ClientEvents `json:"presence,omitempty"`
|
||||||
} `json:"account_data,omitempty"`
|
Rooms *RoomsResponse `json:"rooms,omitempty"`
|
||||||
Presence struct {
|
ToDevice *ToDeviceResponse `json:"to_device,omitempty"`
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
|
DeviceLists *DeviceLists `json:"device_lists,omitempty"`
|
||||||
} `json:"presence,omitempty"`
|
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
|
||||||
Rooms struct {
|
}
|
||||||
Join map[string]JoinResponse `json:"join,omitempty"`
|
|
||||||
Peek map[string]JoinResponse `json:"peek,omitempty"`
|
func (r Response) MarshalJSON() ([]byte, error) {
|
||||||
Invite map[string]InviteResponse `json:"invite,omitempty"`
|
type alias Response
|
||||||
Leave map[string]LeaveResponse `json:"leave,omitempty"`
|
a := alias(r)
|
||||||
} `json:"rooms,omitempty"`
|
if r.AccountData != nil && len(r.AccountData.Events) == 0 {
|
||||||
ToDevice struct {
|
a.AccountData = nil
|
||||||
Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"`
|
}
|
||||||
} `json:"to_device,omitempty"`
|
if r.Presence != nil && len(r.Presence.Events) == 0 {
|
||||||
DeviceLists struct {
|
a.Presence = nil
|
||||||
Changed []string `json:"changed,omitempty"`
|
}
|
||||||
Left []string `json:"left,omitempty"`
|
if r.DeviceLists != nil {
|
||||||
} `json:"device_lists,omitempty"`
|
if len(r.DeviceLists.Left) == 0 && len(r.DeviceLists.Changed) == 0 {
|
||||||
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
|
a.DeviceLists = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.Rooms != nil {
|
||||||
|
if len(r.Rooms.Join) == 0 && len(r.Rooms.Peek) == 0 &&
|
||||||
|
len(r.Rooms.Invite) == 0 && len(r.Rooms.Leave) == 0 {
|
||||||
|
a.Rooms = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.ToDevice != nil && len(r.ToDevice.Events) == 0 {
|
||||||
|
a.ToDevice = nil
|
||||||
|
}
|
||||||
|
return json.Marshal(a)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Response) HasUpdates() bool {
|
func (r *Response) HasUpdates() bool {
|
||||||
|
|
@ -370,18 +398,21 @@ func NewResponse() *Response {
|
||||||
res := Response{}
|
res := Response{}
|
||||||
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
|
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
|
||||||
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
|
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
|
||||||
res.Rooms.Join = map[string]JoinResponse{}
|
res.Rooms = &RoomsResponse{
|
||||||
res.Rooms.Peek = map[string]JoinResponse{}
|
Join: map[string]*JoinResponse{},
|
||||||
res.Rooms.Invite = map[string]InviteResponse{}
|
Peek: map[string]*JoinResponse{},
|
||||||
res.Rooms.Leave = map[string]LeaveResponse{}
|
Invite: map[string]*InviteResponse{},
|
||||||
|
Leave: map[string]*LeaveResponse{},
|
||||||
|
}
|
||||||
|
|
||||||
// Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
|
// Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
|
||||||
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
|
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
|
||||||
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
|
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
|
||||||
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
|
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
|
||||||
res.AccountData.Events = []gomatrixserverlib.ClientEvent{}
|
res.AccountData = &ClientEvents{}
|
||||||
res.Presence.Events = []gomatrixserverlib.ClientEvent{}
|
res.Presence = &ClientEvents{}
|
||||||
res.ToDevice.Events = []gomatrixserverlib.SendToDeviceEvent{}
|
res.DeviceLists = &DeviceLists{}
|
||||||
|
res.ToDevice = &ToDeviceResponse{}
|
||||||
res.DeviceListsOTKCount = map[string]int{}
|
res.DeviceListsOTKCount = map[string]int{}
|
||||||
|
|
||||||
return &res
|
return &res
|
||||||
|
|
@ -403,38 +434,73 @@ type UnreadNotifications struct {
|
||||||
NotificationCount int `json:"notification_count"`
|
NotificationCount int `json:"notification_count"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClientEvents struct {
|
||||||
|
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Timeline struct {
|
||||||
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
|
Limited bool `json:"limited"`
|
||||||
|
PrevBatch *TopologyToken `json:"prev_batch,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Summary struct {
|
||||||
|
Heroes []string `json:"m.heroes,omitempty"`
|
||||||
|
JoinedMemberCount *int `json:"m.joined_member_count,omitempty"`
|
||||||
|
InvitedMemberCount *int `json:"m.invited_member_count,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
|
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
|
||||||
type JoinResponse struct {
|
type JoinResponse struct {
|
||||||
Summary struct {
|
Summary *Summary `json:"summary,omitempty"`
|
||||||
Heroes []string `json:"m.heroes,omitempty"`
|
State *ClientEvents `json:"state,omitempty"`
|
||||||
JoinedMemberCount *int `json:"m.joined_member_count,omitempty"`
|
Timeline *Timeline `json:"timeline,omitempty"`
|
||||||
InvitedMemberCount *int `json:"m.invited_member_count,omitempty"`
|
Ephemeral *ClientEvents `json:"ephemeral,omitempty"`
|
||||||
} `json:"summary"`
|
AccountData *ClientEvents `json:"account_data,omitempty"`
|
||||||
State struct {
|
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
||||||
} `json:"state"`
|
|
||||||
Timeline struct {
|
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
||||||
Limited bool `json:"limited"`
|
|
||||||
PrevBatch *TopologyToken `json:"prev_batch,omitempty"`
|
|
||||||
} `json:"timeline"`
|
|
||||||
Ephemeral struct {
|
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
||||||
} `json:"ephemeral"`
|
|
||||||
AccountData struct {
|
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
||||||
} `json:"account_data"`
|
|
||||||
*UnreadNotifications `json:"unread_notifications,omitempty"`
|
*UnreadNotifications `json:"unread_notifications,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (jr JoinResponse) MarshalJSON() ([]byte, error) {
|
||||||
|
type alias JoinResponse
|
||||||
|
a := alias(jr)
|
||||||
|
if jr.State != nil && len(jr.State.Events) == 0 {
|
||||||
|
a.State = nil
|
||||||
|
}
|
||||||
|
if jr.Ephemeral != nil && len(jr.Ephemeral.Events) == 0 {
|
||||||
|
a.Ephemeral = nil
|
||||||
|
}
|
||||||
|
if jr.AccountData != nil && len(jr.AccountData.Events) == 0 {
|
||||||
|
a.AccountData = nil
|
||||||
|
}
|
||||||
|
if jr.Timeline != nil && len(jr.Timeline.Events) == 0 {
|
||||||
|
a.Timeline = nil
|
||||||
|
}
|
||||||
|
if jr.Summary != nil {
|
||||||
|
var nilPtr int
|
||||||
|
joinedEmpty := jr.Summary.JoinedMemberCount == nil || jr.Summary.JoinedMemberCount == &nilPtr
|
||||||
|
invitedEmpty := jr.Summary.InvitedMemberCount == nil || jr.Summary.InvitedMemberCount == &nilPtr
|
||||||
|
if joinedEmpty && invitedEmpty && len(jr.Summary.Heroes) == 0 {
|
||||||
|
a.Summary = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
if jr.UnreadNotifications != nil &&
|
||||||
|
jr.UnreadNotifications.NotificationCount == 0 && jr.UnreadNotifications.HighlightCount == 0 {
|
||||||
|
a.UnreadNotifications = nil
|
||||||
|
}
|
||||||
|
return json.Marshal(a)
|
||||||
|
}
|
||||||
|
|
||||||
// NewJoinResponse creates an empty response with initialised arrays.
|
// NewJoinResponse creates an empty response with initialised arrays.
|
||||||
func NewJoinResponse() *JoinResponse {
|
func NewJoinResponse() *JoinResponse {
|
||||||
res := JoinResponse{}
|
return &JoinResponse{
|
||||||
res.State.Events = []gomatrixserverlib.ClientEvent{}
|
Summary: &Summary{},
|
||||||
res.Timeline.Events = []gomatrixserverlib.ClientEvent{}
|
State: &ClientEvents{},
|
||||||
res.Ephemeral.Events = []gomatrixserverlib.ClientEvent{}
|
Timeline: &Timeline{},
|
||||||
res.AccountData.Events = []gomatrixserverlib.ClientEvent{}
|
Ephemeral: &ClientEvents{},
|
||||||
return &res
|
AccountData: &ClientEvents{},
|
||||||
|
UnreadNotifications: &UnreadNotifications{},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
||||||
|
|
@ -469,21 +535,28 @@ func NewInviteResponse(event *gomatrixserverlib.HeaderedEvent) *InviteResponse {
|
||||||
|
|
||||||
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
|
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
|
||||||
type LeaveResponse struct {
|
type LeaveResponse struct {
|
||||||
State struct {
|
State *ClientEvents `json:"state,omitempty"`
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
Timeline *Timeline `json:"timeline,omitempty"`
|
||||||
} `json:"state"`
|
}
|
||||||
Timeline struct {
|
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
func (lr LeaveResponse) MarshalJSON() ([]byte, error) {
|
||||||
Limited bool `json:"limited"`
|
type alias LeaveResponse
|
||||||
PrevBatch *TopologyToken `json:"prev_batch,omitempty"`
|
a := alias(lr)
|
||||||
} `json:"timeline"`
|
if lr.State != nil && len(lr.State.Events) == 0 {
|
||||||
|
a.State = nil
|
||||||
|
}
|
||||||
|
if lr.Timeline != nil && len(lr.Timeline.Events) == 0 {
|
||||||
|
a.Timeline = nil
|
||||||
|
}
|
||||||
|
return json.Marshal(a)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLeaveResponse creates an empty response with initialised arrays.
|
// NewLeaveResponse creates an empty response with initialised arrays.
|
||||||
func NewLeaveResponse() *LeaveResponse {
|
func NewLeaveResponse() *LeaveResponse {
|
||||||
res := LeaveResponse{}
|
res := LeaveResponse{
|
||||||
res.State.Events = []gomatrixserverlib.ClientEvent{}
|
State: &ClientEvents{},
|
||||||
res.Timeline.Events = []gomatrixserverlib.ClientEvent{}
|
Timeline: &Timeline{},
|
||||||
|
}
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,11 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/base"
|
"github.com/matrix-org/dendrite/setup/base"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
|
func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
|
||||||
|
|
@ -21,10 +22,8 @@ func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Ms
|
||||||
|
|
||||||
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
|
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
msg := &nats.Msg{
|
msg := nats.NewMsg(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
|
||||||
Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
msg.Header.Set(jetstream.RoomEventType, string(update.Type))
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
msg.Header.Set(jetstream.RoomID, roomID)
|
msg.Header.Set(jetstream.RoomID, roomID)
|
||||||
var err error
|
var err error
|
||||||
msg.Data, err = json.Marshal(update)
|
msg.Data, err = json.Marshal(update)
|
||||||
|
|
|
||||||
|
|
@ -72,15 +72,16 @@ func (s *OutputRoomEventConsumer) Start() error {
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
|
// Only handle events we care about
|
||||||
|
if rsapi.OutputType(msg.Header.Get(jetstream.RoomEventType)) != rsapi.OutputTypeNewRoomEvent {
|
||||||
|
return true
|
||||||
|
}
|
||||||
var output rsapi.OutputEvent
|
var output rsapi.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if output.Type != rsapi.OutputTypeNewRoomEvent {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
event := output.NewRoomEvent.Event
|
event := output.NewRoomEvent.Event
|
||||||
if event == nil {
|
if event == nil {
|
||||||
log.Errorf("userapi consumer: expected event")
|
log.Errorf("userapi consumer: expected event")
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue