Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/usagestats

This commit is contained in:
Till Faelligen 2022-10-07 16:56:57 +02:00
commit 9d0e2c1073
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
30 changed files with 361 additions and 112 deletions

View file

@ -1,5 +1,27 @@
# Changelog
## Dendrite 0.10.2 (2022-10-07)
### Features
* Dendrite will now fail to start if there is an obvious problem with the configured `max_open_conns` when using PostgreSQL database backends, since this can lead to instability and performance issues
* More information on this is available [in the documentation](https://matrix-org.github.io/dendrite/installation/start/optimisation#postgresql-connection-limit)
* Unnecessary/empty fields will no longer be sent in `/sync` responses
* It is now possible to configure `old_private_keys` from previous Matrix installations on the same domain if only public key is known, to make it easier to expire old keys correctly
* You can configure either just the `private_key` path, or you can supply both the `public_key` and `key_id`
### Fixes
* The sync transaction behaviour has been modified further so that errors in one stream should not propagate to other streams unnecessarily
* Rooms should now be classified as DM rooms correctly by passing through `is_direct` and unsigned hints
* A bug which caused marking device lists as stale to consume lots of CPU has been fixed
* Users accepting invites should no longer cause unnecessary federated joins if there are already other local users in the room
* The sync API state range queries have been optimised by adding missing indexes
* It should now be possible to configure non-English languages for full-text search in `search.language`
* The roomserver will no longer attempt to perform federated requests to the local server when trying to fetch missing events
* The `/keys/upload` endpoint will now always return the `one_time_keys_counts`, which may help with E2EE reliability
* The sync API will now retrieve the latest stream position before processing each stream rather than at the beginning of the request, to hopefully reduce the number of round-trips to `/sync`
## Dendrite 0.10.1 (2022-09-30)
### Features

View file

@ -79,7 +79,7 @@ $ ./bin/dendrite-monolith-server --tls-cert server.crt --tls-key server.key --co
# Create an user account (add -admin for an admin user).
# Specify the localpart only, e.g. 'alice' for '@alice:domain.com'
$ ./bin/create-account --config dendrite.yaml --url http://localhost:8008 --username alice
$ ./bin/create-account --config dendrite.yaml --username alice
```
Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`.
@ -90,7 +90,7 @@ We use a script called Are We Synapse Yet which checks Sytest compliance rates.
test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it
updates with CI. As of August 2022 we're at around 90% CS API coverage and 95% Federation coverage, though check
CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse
servers such as matrix.org reasonably well, although there are still some missing features (like Search).
servers such as matrix.org reasonably well, although there are still some missing features (like SSO and Third-party ID APIs).
We are prioritising features that will benefit single-user homeservers first (e.g Receipts, E2E) rather
than features that massive deployments may be interested in (OpenID, Guests, Admin APIs, AS API).
@ -112,6 +112,7 @@ This means Dendrite supports amongst others:
- Guests
- User Directory
- Presence
- Fulltext search
## Contributing

View file

@ -19,11 +19,12 @@ import (
"net/http"
"time"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
type uploadKeysRequest struct {
@ -77,7 +78,6 @@ func UploadKeys(req *http.Request, keyAPI api.ClientKeyAPI, device *userapi.Devi
}
}
keyCount := make(map[string]int)
// we only return key counts when the client uploads OTKs
if len(uploadRes.OneTimeKeyCounts) > 0 {
keyCount = uploadRes.OneTimeKeyCounts[0].KeyCount
}

View file

@ -18,12 +18,17 @@ global:
private_key: matrix_key.pem
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
# to old signing private keys that were formerly in use on this domain. These
# to old signing keys that were formerly in use on this domain name. These
# keys will not be used for federation request or event signing, but will be
# provided to any other homeserver that asks when trying to verify old events.
old_private_keys:
# If the old private key file is available:
# - private_key: old_matrix_key.pem
# expired_at: 1601024554498
# If only the public key (in base64 format) and key ID are known:
# - public_key: mn59Kxfdq9VziYHSBzI7+EDPDcBS2Xl7jeUdiiQcOnM=
# key_id: ed25519:mykeyid
# expired_at: 1601024554498
# How long a remote server can cache our server signing key before requesting it
# again. Increasing this number will reduce the number of requests made by other

View file

@ -18,12 +18,17 @@ global:
private_key: matrix_key.pem
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
# to old signing private keys that were formerly in use on this domain. These
# to old signing keys that were formerly in use on this domain name. These
# keys will not be used for federation request or event signing, but will be
# provided to any other homeserver that asks when trying to verify old events.
old_private_keys:
# If the old private key file is available:
# - private_key: old_matrix_key.pem
# expired_at: 1601024554498
# If only the public key (in base64 format) and key ID are known:
# - public_key: mn59Kxfdq9VziYHSBzI7+EDPDcBS2Xl7jeUdiiQcOnM=
# key_id: ed25519:mykeyid
# expired_at: 1601024554498
# How long a remote server can cache our server signing key before requesting it
# again. Increasing this number will reduce the number of requests made by other

View file

@ -159,6 +159,7 @@ type PerformJoinRequest struct {
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
Content map[string]interface{} `json:"content"`
Unsigned map[string]interface{} `json:"unsigned"`
}
type PerformJoinResponse struct {

View file

@ -7,14 +7,15 @@ import (
"fmt"
"time"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
)
// PerformLeaveRequest implements api.FederationInternalAPI
@ -95,6 +96,7 @@ func (r *FederationInternalAPI) PerformJoin(
request.Content,
serverName,
supportedVersions,
request.Unsigned,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
@ -139,6 +141,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
content map[string]interface{},
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
unsigned map[string]interface{},
) error {
// Try to perform a make_join using the information supplied in the
// request.
@ -259,7 +262,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
if err != nil {
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
}
logrus.WithField("hosts", joinedHosts).WithField("room", roomID).Info("Joined federated room with hosts")
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
}
@ -267,6 +270,14 @@ func (r *FederationInternalAPI) performJoinUsingServer(
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if unsigned != nil {
event, err = event.SetUnsigned(unsigned)
if err != nil {
// non-fatal, log and continue
logrus.WithError(err).Errorf("Failed to set unsigned content")
}
}
if err = roomserverAPI.SendEventWithState(
context.Background(),
r.rsAPI,

View file

@ -160,7 +160,7 @@ func localKeys(cfg *config.FederationAPI, validUntil time.Time) (*gomatrixserver
for _, oldVerifyKey := range cfg.Matrix.OldVerifyKeys {
keys.OldVerifyKeys[oldVerifyKey.KeyID] = gomatrixserverlib.OldVerifyKey{
VerifyKey: gomatrixserverlib.VerifyKey{
Key: gomatrixserverlib.Base64Bytes(oldVerifyKey.PrivateKey.Public().(ed25519.PublicKey)),
Key: oldVerifyKey.PublicKey,
},
ExpiredTS: oldVerifyKey.ExpiredAt,
}

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 10
VersionPatch = 1
VersionPatch = 2
VersionTag = "" // example: "rc1"
)

View file

@ -70,6 +70,11 @@ func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.Perform
if len(req.OneTimeKeys) > 0 {
a.uploadOneTimeKeys(ctx, req, res)
}
otks, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
if err != nil {
return err
}
res.OneTimeKeyCounts = []api.OneTimeKeysCount{*otks}
return nil
}
@ -207,15 +212,13 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query
return nil
}
maxStreamID := int64(0)
// remove deleted devices
var result []api.DeviceMessage
for _, m := range msgs {
if m.StreamID > maxStreamID {
maxStreamID = m.StreamID
}
}
// remove deleted devices
var result []api.DeviceMessage
for _, m := range msgs {
if m.KeyJSON == nil {
if m.KeyJSON == nil || len(m.KeyJSON) == 0 {
continue
}
result = append(result, m)

View file

@ -0,0 +1,156 @@
package internal_test
import (
"context"
"reflect"
"testing"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
)
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
t.Helper()
connStr, close := test.PrepareDBConnectionString(t, dbType)
db, err := storage.NewDatabase(nil, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
})
if err != nil {
t.Fatalf("failed to create new user db: %v", err)
}
return db, close
}
func Test_QueryDeviceMessages(t *testing.T) {
alice := test.NewUser(t)
type args struct {
req *api.QueryDeviceMessagesRequest
res *api.QueryDeviceMessagesResponse
}
tests := []struct {
name string
args args
wantErr bool
want *api.QueryDeviceMessagesResponse
}{
{
name: "no existing keys",
args: args{
req: &api.QueryDeviceMessagesRequest{
UserID: "@doesNotExist:localhost",
},
res: &api.QueryDeviceMessagesResponse{},
},
want: &api.QueryDeviceMessagesResponse{},
},
{
name: "existing user returns devices",
args: args{
req: &api.QueryDeviceMessagesRequest{
UserID: alice.ID,
},
res: &api.QueryDeviceMessagesResponse{},
},
want: &api.QueryDeviceMessagesResponse{
StreamID: 6,
Devices: []api.DeviceMessage{
{
Type: api.TypeDeviceKeyUpdate, StreamID: 5, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
DisplayName: "first device",
UserID: alice.ID,
KeyJSON: []byte("ghi"),
},
},
{
Type: api.TypeDeviceKeyUpdate, StreamID: 6, DeviceKeys: &api.DeviceKeys{
DeviceID: "mySecondDevice",
DisplayName: "second device",
UserID: alice.ID,
KeyJSON: []byte("jkl"),
}, // streamID 6
},
},
},
},
}
deviceMessages := []api.DeviceMessage{
{ // not the user we're looking for
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
UserID: "@doesNotExist:localhost",
},
// streamID 1 for this user
},
{ // empty keyJSON will be ignored
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
}, // streamID 1
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
KeyJSON: []byte("abc"),
}, // streamID 2
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
KeyJSON: []byte("def"),
}, // streamID 3
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
KeyJSON: []byte(""),
}, // streamID 4
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
DisplayName: "first device",
UserID: alice.ID,
KeyJSON: []byte("ghi"),
}, // streamID 5
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "mySecondDevice",
UserID: alice.ID,
KeyJSON: []byte("jkl"),
DisplayName: "second device",
}, // streamID 6
},
}
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, closeDB := mustCreateDatabase(t, dbType)
defer closeDB()
if err := db.StoreLocalDeviceKeys(ctx, deviceMessages); err != nil {
t.Fatalf("failed to store local devicesKeys")
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &internal.KeyInternalAPI{
DB: db,
}
if err := a.QueryDeviceMessages(ctx, tt.args.req, tt.args.res); (err != nil) != tt.wantErr {
t.Errorf("QueryDeviceMessages() error = %v, wantErr %v", err, tt.wantErr)
}
got := tt.args.res
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("QueryDeviceMessages(): got:\n%+v, want:\n%+v", got, tt.want)
}
})
}
})
}

View file

@ -20,6 +20,7 @@ import (
"time"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/api"
@ -204,20 +205,17 @@ func (s *deviceKeysStatements) SelectBatchDeviceKeys(ctx context.Context, userID
deviceIDMap[d] = true
}
var result []api.DeviceMessage
var displayName sql.NullString
for rows.Next() {
dk := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{},
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
UserID: userID,
},
}
dk.UserID = userID
var keyJSON string
var streamID int64
var displayName sql.NullString
if err := rows.Scan(&dk.DeviceID, &keyJSON, &streamID, &displayName); err != nil {
if err := rows.Scan(&dk.DeviceID, &dk.KeyJSON, &dk.StreamID, &displayName); err != nil {
return nil, err
}
dk.KeyJSON = []byte(keyJSON)
dk.StreamID = streamID
if displayName.Valid {
dk.DisplayName = displayName.String
}

View file

@ -137,21 +137,17 @@ func (s *deviceKeysStatements) SelectBatchDeviceKeys(ctx context.Context, userID
}
defer internal.CloseAndLogIfError(ctx, rows, "selectBatchDeviceKeysStmt: rows.close() failed")
var result []api.DeviceMessage
var displayName sql.NullString
for rows.Next() {
dk := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{},
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
UserID: userID,
},
}
dk.Type = api.TypeDeviceKeyUpdate
dk.UserID = userID
var keyJSON string
var streamID int64
var displayName sql.NullString
if err := rows.Scan(&dk.DeviceID, &keyJSON, &streamID, &displayName); err != nil {
if err := rows.Scan(&dk.DeviceID, &dk.KeyJSON, &dk.StreamID, &displayName); err != nil {
return nil, err
}
dk.KeyJSON = []byte(keyJSON)
dk.StreamID = streamID
if displayName.Valid {
dk.DisplayName = displayName.String
}

View file

@ -80,6 +80,7 @@ type PerformJoinRequest struct {
UserID string `json:"user_id"`
Content map[string]interface{} `json:"content"`
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
Unsigned map[string]interface{} `json:"unsigned"`
}
type PerformJoinResponse struct {

View file

@ -7,6 +7,9 @@ import (
"fmt"
"strings"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/auth"
"github.com/matrix-org/dendrite/roomserver/state"
@ -14,8 +17,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// TODO: temporary package which has helper functions used by both internal/perform packages.
@ -97,35 +98,35 @@ func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverNam
func IsInvitePending(
ctx context.Context, db storage.Database,
roomID, userID string,
) (bool, string, string, error) {
) (bool, string, string, *gomatrixserverlib.Event, error) {
// Look up the room NID for the supplied room ID.
info, err := db.RoomInfo(ctx, roomID)
if err != nil {
return false, "", "", fmt.Errorf("r.DB.RoomInfo: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.RoomInfo: %w", err)
}
if info == nil {
return false, "", "", fmt.Errorf("cannot get RoomInfo: unknown room ID %s", roomID)
return false, "", "", nil, fmt.Errorf("cannot get RoomInfo: unknown room ID %s", roomID)
}
// Look up the state key NID for the supplied user ID.
targetUserNIDs, err := db.EventStateKeyNIDs(ctx, []string{userID})
if err != nil {
return false, "", "", fmt.Errorf("r.DB.EventStateKeyNIDs: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.EventStateKeyNIDs: %w", err)
}
targetUserNID, targetUserFound := targetUserNIDs[userID]
if !targetUserFound {
return false, "", "", fmt.Errorf("missing NID for user %q (%+v)", userID, targetUserNIDs)
return false, "", "", nil, fmt.Errorf("missing NID for user %q (%+v)", userID, targetUserNIDs)
}
// Let's see if we have an event active for the user in the room. If
// we do then it will contain a server name that we can direct the
// send_leave to.
senderUserNIDs, eventIDs, err := db.GetInvitesForUser(ctx, info.RoomNID, targetUserNID)
senderUserNIDs, eventIDs, eventJSON, err := db.GetInvitesForUser(ctx, info.RoomNID, targetUserNID)
if err != nil {
return false, "", "", fmt.Errorf("r.DB.GetInvitesForUser: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.GetInvitesForUser: %w", err)
}
if len(senderUserNIDs) == 0 {
return false, "", "", nil
return false, "", "", nil, nil
}
userNIDToEventID := make(map[types.EventStateKeyNID]string)
for i, nid := range senderUserNIDs {
@ -135,18 +136,20 @@ func IsInvitePending(
// Look up the user ID from the NID.
senderUsers, err := db.EventStateKeys(ctx, senderUserNIDs)
if err != nil {
return false, "", "", fmt.Errorf("r.DB.EventStateKeys: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.EventStateKeys: %w", err)
}
if len(senderUsers) == 0 {
return false, "", "", fmt.Errorf("no senderUsers")
return false, "", "", nil, fmt.Errorf("no senderUsers")
}
senderUser, senderUserFound := senderUsers[senderUserNIDs[0]]
if !senderUserFound {
return false, "", "", fmt.Errorf("missing user for NID %d (%+v)", senderUserNIDs[0], senderUsers)
return false, "", "", nil, fmt.Errorf("missing user for NID %d (%+v)", senderUserNIDs[0], senderUsers)
}
return true, senderUser, userNIDToEventID[senderUserNIDs[0]], nil
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false, info.RoomVersion)
return true, senderUser, userNIDToEventID[senderUserNIDs[0]], event, err
}
// GetMembershipsAtState filters the state events to

View file

@ -173,12 +173,15 @@ func (r *Inputer) processRoomEvent(
for _, server := range serverRes.ServerNames {
servers[server] = struct{}{}
}
// Don't try to talk to ourselves.
delete(servers, r.Cfg.Matrix.ServerName)
// Now build up the list of servers.
serverRes.ServerNames = serverRes.ServerNames[:0]
if input.Origin != "" {
if input.Origin != "" && input.Origin != r.Cfg.Matrix.ServerName {
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
delete(servers, input.Origin)
}
if senderDomain != input.Origin {
if senderDomain != input.Origin && senderDomain != r.Cfg.Matrix.ServerName {
serverRes.ServerNames = append(serverRes.ServerNames, senderDomain)
delete(servers, senderDomain)
}

View file

@ -22,6 +22,10 @@ import (
"time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -32,8 +36,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type Joiner struct {
@ -236,7 +238,7 @@ func (r *Joiner) performJoinRoomByID(
// Force a federated join if we're dealing with a pending invite
// and we aren't in the room.
isInvitePending, inviteSender, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID)
isInvitePending, inviteSender, _, inviteEvent, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID)
if err == nil && !serverInRoom && isInvitePending {
_, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
if ierr != nil {
@ -248,6 +250,17 @@ func (r *Joiner) performJoinRoomByID(
if inviterDomain != r.Cfg.Matrix.ServerName {
req.ServerNames = append(req.ServerNames, inviterDomain)
forceFederatedJoin = true
memberEvent := gjson.Parse(string(inviteEvent.JSON()))
// only set unsigned if we've got a content.membership, which we _should_
if memberEvent.Get("content.membership").Exists() {
req.Unsigned = map[string]interface{}{
"prev_sender": memberEvent.Get("sender").Str,
"prev_content": map[string]interface{}{
"is_direct": memberEvent.Get("content.is_direct").Bool(),
"membership": memberEvent.Get("content.membership").Str,
},
}
}
}
}
@ -348,6 +361,7 @@ func (r *Joiner) performFederatedJoinRoomByID(
UserID: req.UserID, // the user ID joining the room
ServerNames: req.ServerNames, // the server to try joining with
Content: req.Content, // the membership event content
Unsigned: req.Unsigned, // the unsigned event content, if any
}
fedRes := fsAPI.PerformJoinResponse{}
r.FSAPI.PerformJoin(ctx, &fedReq, &fedRes)

View file

@ -79,7 +79,7 @@ func (r *Leaver) performLeaveRoomByID(
) ([]api.OutputEvent, error) {
// If there's an invite outstanding for the room then respond to
// that.
isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID)
isInvitePending, senderUser, eventID, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID)
if err == nil && isInvitePending {
_, senderDomain, serr := gomatrixserverlib.SplitID('@', senderUser)
if serr != nil {

View file

@ -872,7 +872,7 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
// but we don't specify an authorised via user, since the event auth
// will allow the join anyway.
var pending bool
if pending, _, _, err = helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID); err != nil {
if pending, _, _, _, err = helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID); err != nil {
return fmt.Errorf("helpers.IsInvitePending: %w", err)
} else if pending {
res.Allowed = true

View file

@ -17,10 +17,11 @@ package storage
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
@ -104,7 +105,7 @@ type Database interface {
// Look up the active invites targeting a user in a room and return the
// numeric state key IDs for the user IDs who sent them along with the event IDs for the invites.
// Returns an error if there was a problem talking to the database.
GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, err error)
GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, inviteEventJSON []byte, err error)
// Save a given room alias with the room ID it refers to.
// Returns an error if there was a problem talking to the database.
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error

View file

@ -61,7 +61,7 @@ const insertInviteEventSQL = "" +
" ON CONFLICT DO NOTHING"
const selectInviteActiveForUserInRoomSQL = "" +
"SELECT invite_event_id, sender_nid FROM roomserver_invites" +
"SELECT invite_event_id, sender_nid, invite_event_json FROM roomserver_invites" +
" WHERE target_nid = $1 AND room_nid = $2" +
" AND NOT retired"
@ -141,25 +141,26 @@ func (s *inviteStatements) UpdateInviteRetired(
func (s *inviteStatements) SelectInviteActiveForUserInRoom(
ctx context.Context, txn *sql.Tx,
targetUserNID types.EventStateKeyNID, roomNID types.RoomNID,
) ([]types.EventStateKeyNID, []string, error) {
) ([]types.EventStateKeyNID, []string, []byte, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteActiveForUserInRoomStmt)
rows, err := stmt.QueryContext(
ctx, targetUserNID, roomNID,
)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteActiveForUserInRoom: rows.close() failed")
var result []types.EventStateKeyNID
var eventIDs []string
var inviteEventID string
var senderUserNID int64
var eventJSON []byte
for rows.Next() {
if err := rows.Scan(&inviteEventID, &senderUserNID); err != nil {
return nil, nil, err
if err := rows.Scan(&inviteEventID, &senderUserNID, &eventJSON); err != nil {
return nil, nil, nil, err
}
result = append(result, types.EventStateKeyNID(senderUserNID))
eventIDs = append(eventIDs, inviteEventID)
}
return result, eventIDs, rows.Err()
return result, eventIDs, eventJSON, rows.Err()
}

View file

@ -7,13 +7,14 @@ import (
"fmt"
"sort"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
)
// Ideally, when we have both events we should redact the event JSON and forget about the redaction, but we currently
@ -445,7 +446,7 @@ func (d *Database) GetInvitesForUser(
ctx context.Context,
roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID,
) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, err error) {
) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, inviteEventJSON []byte, err error) {
return d.InvitesTable.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
}

View file

@ -44,7 +44,7 @@ const insertInviteEventSQL = "" +
" ON CONFLICT DO NOTHING"
const selectInviteActiveForUserInRoomSQL = "" +
"SELECT invite_event_id, sender_nid FROM roomserver_invites" +
"SELECT invite_event_id, sender_nid, invite_event_json FROM roomserver_invites" +
" WHERE target_nid = $1 AND room_nid = $2" +
" AND NOT retired"
@ -136,25 +136,26 @@ func (s *inviteStatements) UpdateInviteRetired(
func (s *inviteStatements) SelectInviteActiveForUserInRoom(
ctx context.Context, txn *sql.Tx,
targetUserNID types.EventStateKeyNID, roomNID types.RoomNID,
) ([]types.EventStateKeyNID, []string, error) {
) ([]types.EventStateKeyNID, []string, []byte, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteActiveForUserInRoomStmt)
rows, err := stmt.QueryContext(
ctx, targetUserNID, roomNID,
)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteActiveForUserInRoom: rows.close() failed")
var result []types.EventStateKeyNID
var eventIDs []string
var eventID string
var senderUserNID int64
var eventJSON []byte
for rows.Next() {
if err := rows.Scan(&eventID, &senderUserNID); err != nil {
return nil, nil, err
if err := rows.Scan(&eventID, &senderUserNID, &eventJSON); err != nil {
return nil, nil, nil, err
}
result = append(result, types.EventStateKeyNID(senderUserNID))
eventIDs = append(eventIDs, eventID)
}
return result, eventIDs, nil
return result, eventIDs, eventJSON, nil
}

View file

@ -116,7 +116,7 @@ type Invites interface {
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string, roomNID types.RoomNID, targetUserNID, senderUserNID types.EventStateKeyNID, inviteEventJSON []byte) (bool, error)
UpdateInviteRetired(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) ([]string, error)
// SelectInviteActiveForUserInRoom returns a list of sender state key NIDs and invite event IDs matching those nids.
SelectInviteActiveForUserInRoom(ctx context.Context, txn *sql.Tx, targetUserNID types.EventStateKeyNID, roomNID types.RoomNID) ([]types.EventStateKeyNID, []string, error)
SelectInviteActiveForUserInRoom(ctx context.Context, txn *sql.Tx, targetUserNID types.EventStateKeyNID, roomNID types.RoomNID) ([]types.EventStateKeyNID, []string, []byte, error)
}
type MembershipState int64

View file

@ -4,6 +4,9 @@ import (
"context"
"testing"
"github.com/matrix-org/util"
"github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
@ -11,8 +14,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/util"
"github.com/stretchr/testify/assert"
)
func mustCreateInviteTable(t *testing.T, dbType test.DBType) (tables.Invites, func()) {
@ -67,7 +68,7 @@ func TestInviteTable(t *testing.T) {
assert.NoError(t, err)
assert.True(t, newInvite)
stateKeyNIDs, eventIDs, err := tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
stateKeyNIDs, eventIDs, _, err := tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
assert.NoError(t, err)
assert.Equal(t, []string{eventID1, eventID2}, eventIDs)
assert.Equal(t, []types.EventStateKeyNID{2, 2}, stateKeyNIDs)
@ -78,13 +79,13 @@ func TestInviteTable(t *testing.T) {
assert.Equal(t, []string{eventID1, eventID2}, retiredEventIDs)
// This should now be empty
stateKeyNIDs, eventIDs, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
stateKeyNIDs, eventIDs, _, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
assert.NoError(t, err)
assert.Empty(t, eventIDs)
assert.Empty(t, stateKeyNIDs)
// Non-existent targetUserNID
stateKeyNIDs, eventIDs, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, types.EventStateKeyNID(10), roomNID)
stateKeyNIDs, eventIDs, _, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, types.EventStateKeyNID(10), roomNID)
assert.NoError(t, err)
assert.Empty(t, stateKeyNIDs)
assert.Empty(t, eventIDs)

View file

@ -231,24 +231,40 @@ func loadConfig(
return nil, err
}
for i, oldPrivateKey := range c.Global.OldVerifyKeys {
var oldPrivateKeyData []byte
for _, key := range c.Global.OldVerifyKeys {
switch {
case key.PrivateKeyPath != "":
var oldPrivateKeyData []byte
oldPrivateKeyPath := absPath(basePath, key.PrivateKeyPath)
oldPrivateKeyData, err = readFile(oldPrivateKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to read %q: %w", oldPrivateKeyPath, err)
}
oldPrivateKeyPath := absPath(basePath, oldPrivateKey.PrivateKeyPath)
oldPrivateKeyData, err = readFile(oldPrivateKeyPath)
if err != nil {
return nil, err
// NOTSPEC: Ordinarily we should enforce key ID formatting, but since there are
// a number of private keys out there with non-compatible symbols in them due
// to lack of validation in Synapse, we won't enforce that for old verify keys.
keyID, privateKey, perr := readKeyPEM(oldPrivateKeyPath, oldPrivateKeyData, false)
if perr != nil {
return nil, fmt.Errorf("failed to parse %q: %w", oldPrivateKeyPath, perr)
}
key.KeyID = keyID
key.PrivateKey = privateKey
key.PublicKey = gomatrixserverlib.Base64Bytes(privateKey.Public().(ed25519.PublicKey))
case key.KeyID == "":
return nil, fmt.Errorf("'key_id' must be specified if 'public_key' is specified")
case len(key.PublicKey) == ed25519.PublicKeySize:
continue
case len(key.PublicKey) > 0:
return nil, fmt.Errorf("the supplied 'public_key' is the wrong length")
default:
return nil, fmt.Errorf("either specify a 'private_key' path or supply both 'public_key' and 'key_id'")
}
// NOTSPEC: Ordinarily we should enforce key ID formatting, but since there are
// a number of private keys out there with non-compatible symbols in them due
// to lack of validation in Synapse, we won't enforce that for old verify keys.
keyID, privateKey, perr := readKeyPEM(oldPrivateKeyPath, oldPrivateKeyData, false)
if perr != nil {
return nil, perr
}
c.Global.OldVerifyKeys[i].KeyID, c.Global.OldVerifyKeys[i].PrivateKey = keyID, privateKey
}
c.MediaAPI.AbsBasePath = Path(absPath(basePath, c.MediaAPI.BasePath))

View file

@ -27,7 +27,7 @@ type Global struct {
// Information about old private keys that used to be used to sign requests and
// events on this domain. They will not be used but will be advertised to other
// servers that ask for them to help verify old events.
OldVerifyKeys []OldVerifyKeys `yaml:"old_private_keys"`
OldVerifyKeys []*OldVerifyKeys `yaml:"old_private_keys"`
// How long a remote server can cache our server key for before requesting it again.
// Increasing this number will reduce the number of requests made by remote servers
@ -127,8 +127,11 @@ type OldVerifyKeys struct {
// The private key itself.
PrivateKey ed25519.PrivateKey `yaml:"-"`
// The public key, in case only that part is known.
PublicKey gomatrixserverlib.Base64Bytes `yaml:"public_key"`
// The key ID of the private key.
KeyID gomatrixserverlib.KeyID `yaml:"-"`
KeyID gomatrixserverlib.KeyID `yaml:"key_id"`
// When the private key was designed as "expired", as a UNIX timestamp
// in millisecond precision.

View file

@ -48,6 +48,7 @@ type Notifier struct {
lastCleanUpTime time.Time
// This map is reused to prevent allocations and GC pressure in SharedUsers.
_sharedUserMap map[string]struct{}
_wakeupUserMap map[string]struct{}
}
// NewNotifier creates a new notifier set to the given sync position.
@ -61,6 +62,7 @@ func NewNotifier() *Notifier {
lock: &sync.RWMutex{},
lastCleanUpTime: time.Now(),
_sharedUserMap: map[string]struct{}{},
_wakeupUserMap: map[string]struct{}{},
}
}
@ -408,12 +410,16 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P
// specified user IDs, and also the specified peekingDevices
func (n *Notifier) _wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
for _, userID := range userIDs {
n._wakeupUserMap[userID] = struct{}{}
}
for userID := range n._wakeupUserMap {
for _, stream := range n._fetchUserStreams(userID) {
if stream == nil {
continue
}
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
delete(n._wakeupUserMap, userID)
}
for _, peekingDevice := range peekingDevices {

View file

@ -77,9 +77,9 @@ func (p *ReceiptStreamProvider) IncrementalSync(
continue
}
jr := types.NewJoinResponse()
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
jr = existing
jr, ok := req.Response.Rooms.Join[roomID]
if !ok {
jr = types.NewJoinResponse()
}
ev := gomatrixserverlib.ClientEvent{

View file

@ -407,7 +407,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.PDUPosition, currentPos.PDUPosition,
syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition,
)
},
),
@ -416,7 +416,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.TypingPosition, currentPos.TypingPosition,
syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition,
)
},
),
@ -425,7 +425,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition,
)
},
),
@ -434,7 +434,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.InvitePosition, currentPos.InvitePosition,
syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition,
)
},
),
@ -443,7 +443,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition,
)
},
),
@ -452,7 +452,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition,
)
},
),
@ -461,7 +461,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.NotificationDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition,
)
},
),
@ -470,7 +470,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition,
)
},
),
@ -479,7 +479,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.PresencePosition, currentPos.PresencePosition,
syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition,
)
},
),