Merge branch 'main' into neilalexander/downloadstate

This commit is contained in:
Neil Alexander 2022-10-10 14:10:04 +01:00
commit 391d531ec3
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
44 changed files with 538 additions and 277 deletions

View file

@ -7,6 +7,11 @@ about: Create a report to help us improve
<!--
All bug reports must provide the following background information
Text between <!-- and --> marks will be invisible in the report.
IF YOUR ISSUE IS CONSIDERED A SECURITY VULNERABILITY THEN PLEASE STOP
AND DO NOT POST IT AS A GITHUB ISSUE! Please report the issue responsibly by
disclosing in private by email to security@matrix.org instead. For more details, please
see: https://www.matrix.org/security-disclosure-policy/
-->
### Background information
@ -18,7 +23,6 @@ Text between <!-- and --> marks will be invisible in the report.
- **`go version`**:
- **Client used (if applicable)**:
### Description
- **What** is the problem:
@ -38,7 +42,6 @@ Examples of good descriptions:
- How: "Lots of logs about device change updates"
- When: "After my server joined Matrix HQ"
Examples of bad descriptions:
- What: "Can't send messages" - This is bad because it isn't specfic enough. Which endpoint isn't working and what is the response code? Does the message send but encryption fail?
- Who: "Me" - Who are you? Running the server or a user on a Dendrite server?

View file

@ -1,8 +1,8 @@
### Pull Request Checklist
<!-- Please read docs/CONTRIBUTING.md before submitting your pull request -->
<!-- Please read https://matrix-org.github.io/dendrite/development/contributing before submitting your pull request -->
* [ ] I have added tests for PR _or_ I have justified why this PR doesn't need tests.
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off)
* [ ] Pull request includes a [sign off below using a legally identifiable name](https://matrix-org.github.io/dendrite/development/contributing#sign-off) _or_ I have already signed off privately
Signed-off-by: `Your Name <your@email.example.org>`

View file

@ -1,5 +1,27 @@
# Changelog
## Dendrite 0.10.2 (2022-10-07)
### Features
* Dendrite will now fail to start if there is an obvious problem with the configured `max_open_conns` when using PostgreSQL database backends, since this can lead to instability and performance issues
* More information on this is available [in the documentation](https://matrix-org.github.io/dendrite/installation/start/optimisation#postgresql-connection-limit)
* Unnecessary/empty fields will no longer be sent in `/sync` responses
* It is now possible to configure `old_private_keys` from previous Matrix installations on the same domain if only public key is known, to make it easier to expire old keys correctly
* You can configure either just the `private_key` path, or you can supply both the `public_key` and `key_id`
### Fixes
* The sync transaction behaviour has been modified further so that errors in one stream should not propagate to other streams unnecessarily
* Rooms should now be classified as DM rooms correctly by passing through `is_direct` and unsigned hints
* A bug which caused marking device lists as stale to consume lots of CPU has been fixed
* Users accepting invites should no longer cause unnecessary federated joins if there are already other local users in the room
* The sync API state range queries have been optimised by adding missing indexes
* It should now be possible to configure non-English languages for full-text search in `search.language`
* The roomserver will no longer attempt to perform federated requests to the local server when trying to fetch missing events
* The `/keys/upload` endpoint will now always return the `one_time_keys_counts`, which may help with E2EE reliability
* The sync API will now retrieve the latest stream position before processing each stream rather than at the beginning of the request, to hopefully reduce the number of round-trips to `/sync`
## Dendrite 0.10.1 (2022-09-30)
### Features

View file

@ -79,7 +79,7 @@ $ ./bin/dendrite-monolith-server --tls-cert server.crt --tls-key server.key --co
# Create an user account (add -admin for an admin user).
# Specify the localpart only, e.g. 'alice' for '@alice:domain.com'
$ ./bin/create-account --config dendrite.yaml --url http://localhost:8008 --username alice
$ ./bin/create-account --config dendrite.yaml --username alice
```
Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`.
@ -90,7 +90,7 @@ We use a script called Are We Synapse Yet which checks Sytest compliance rates.
test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it
updates with CI. As of August 2022 we're at around 90% CS API coverage and 95% Federation coverage, though check
CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse
servers such as matrix.org reasonably well, although there are still some missing features (like Search).
servers such as matrix.org reasonably well, although there are still some missing features (like SSO and Third-party ID APIs).
We are prioritising features that will benefit single-user homeservers first (e.g Receipts, E2E) rather
than features that massive deployments may be interested in (OpenID, Guests, Admin APIs, AS API).
@ -112,6 +112,7 @@ This means Dendrite supports amongst others:
- Guests
- User Directory
- Presence
- Fulltext search
## Contributing

View file

@ -68,6 +68,12 @@ func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login,
JSON: jsonerror.BadJSON("A username must be supplied."),
}
}
if len(r.Password) == 0 {
return nil, &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: jsonerror.BadJSON("A password must be supplied."),
}
}
localpart, err := userutil.ParseUsernameParam(username, &t.Config.Matrix.ServerName)
if err != nil {
return nil, &util.JSONResponse{

View file

@ -1,138 +0,0 @@
// Copyright 2019 Alex Chen
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type getEventRequest struct {
req *http.Request
device *userapi.Device
roomID string
eventID string
cfg *config.ClientAPI
requestedEvent *gomatrixserverlib.Event
}
// GetEvent implements GET /_matrix/client/r0/rooms/{roomId}/event/{eventId}
// https://matrix.org/docs/spec/client_server/r0.4.0.html#get-matrix-client-r0-rooms-roomid-event-eventid
func GetEvent(
req *http.Request,
device *userapi.Device,
roomID string,
eventID string,
cfg *config.ClientAPI,
rsAPI api.ClientRoomserverAPI,
) util.JSONResponse {
eventsReq := api.QueryEventsByIDRequest{
EventIDs: []string{eventID},
}
var eventsResp api.QueryEventsByIDResponse
err := rsAPI.QueryEventsByID(req.Context(), &eventsReq, &eventsResp)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("queryAPI.QueryEventsByID failed")
return jsonerror.InternalServerError()
}
if len(eventsResp.Events) == 0 {
// Event not found locally
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("The event was not found or you do not have permission to read this event"),
}
}
requestedEvent := eventsResp.Events[0].Event
r := getEventRequest{
req: req,
device: device,
roomID: roomID,
eventID: eventID,
cfg: cfg,
requestedEvent: requestedEvent,
}
stateReq := api.QueryStateAfterEventsRequest{
RoomID: r.requestedEvent.RoomID(),
PrevEventIDs: r.requestedEvent.PrevEventIDs(),
StateToFetch: []gomatrixserverlib.StateKeyTuple{{
EventType: gomatrixserverlib.MRoomMember,
StateKey: device.UserID,
}},
}
var stateResp api.QueryStateAfterEventsResponse
if err := rsAPI.QueryStateAfterEvents(req.Context(), &stateReq, &stateResp); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("queryAPI.QueryStateAfterEvents failed")
return jsonerror.InternalServerError()
}
if !stateResp.RoomExists {
util.GetLogger(req.Context()).Errorf("Expected to find room for event %s but failed", r.requestedEvent.EventID())
return jsonerror.InternalServerError()
}
if !stateResp.PrevEventsExist {
// Missing some events locally; stateResp.StateEvents unavailable.
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("The event was not found or you do not have permission to read this event"),
}
}
var appService *config.ApplicationService
if device.AppserviceID != "" {
for _, as := range cfg.Derived.ApplicationServices {
if as.ID == device.AppserviceID {
appService = &as
break
}
}
}
for _, stateEvent := range stateResp.StateEvents {
if appService != nil {
if !appService.IsInterestedInUserID(*stateEvent.StateKey()) {
continue
}
} else if !stateEvent.StateKeyEquals(device.UserID) {
continue
}
membership, err := stateEvent.Membership()
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("stateEvent.Membership failed")
return jsonerror.InternalServerError()
}
if membership == gomatrixserverlib.Join {
return util.JSONResponse{
Code: http.StatusOK,
JSON: gomatrixserverlib.ToClientEvent(r.requestedEvent, gomatrixserverlib.FormatAll),
}
}
}
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("The event was not found or you do not have permission to read this event"),
}
}

View file

@ -19,11 +19,12 @@ import (
"net/http"
"time"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
type uploadKeysRequest struct {
@ -77,7 +78,6 @@ func UploadKeys(req *http.Request, keyAPI api.ClientKeyAPI, device *userapi.Devi
}
}
keyCount := make(map[string]int)
// we only return key counts when the client uploads OTKs
if len(uploadRes.OneTimeKeyCounts) > 0 {
keyCount = uploadRes.OneTimeKeyCounts[0].KeyCount
}

View file

@ -373,15 +373,6 @@ func Setup(
nil, cfg, rsAPI, transactionsCache)
}),
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/event/{eventID}",
httputil.MakeAuthAPI("rooms_get_event", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return GetEvent(req, device, vars["roomID"], vars["eventID"], cfg, rsAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/state", httputil.MakeAuthAPI("room_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))

View file

@ -18,12 +18,17 @@ global:
private_key: matrix_key.pem
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
# to old signing private keys that were formerly in use on this domain. These
# to old signing keys that were formerly in use on this domain name. These
# keys will not be used for federation request or event signing, but will be
# provided to any other homeserver that asks when trying to verify old events.
old_private_keys:
# If the old private key file is available:
# - private_key: old_matrix_key.pem
# expired_at: 1601024554498
# If only the public key (in base64 format) and key ID are known:
# - public_key: mn59Kxfdq9VziYHSBzI7+EDPDcBS2Xl7jeUdiiQcOnM=
# key_id: ed25519:mykeyid
# expired_at: 1601024554498
# How long a remote server can cache our server signing key before requesting it
# again. Increasing this number will reduce the number of requests made by other

View file

@ -18,12 +18,17 @@ global:
private_key: matrix_key.pem
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
# to old signing private keys that were formerly in use on this domain. These
# to old signing keys that were formerly in use on this domain name. These
# keys will not be used for federation request or event signing, but will be
# provided to any other homeserver that asks when trying to verify old events.
old_private_keys:
# If the old private key file is available:
# - private_key: old_matrix_key.pem
# expired_at: 1601024554498
# If only the public key (in base64 format) and key ID are known:
# - public_key: mn59Kxfdq9VziYHSBzI7+EDPDcBS2Xl7jeUdiiQcOnM=
# key_id: ed25519:mykeyid
# expired_at: 1601024554498
# How long a remote server can cache our server signing key before requesting it
# again. Increasing this number will reduce the number of requests made by other

View file

@ -55,7 +55,7 @@ matrix.example.com {
# Change the end of each reverse_proxy line to the correct
# address for your various services.
@sync_api {
path_regexp /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/messages)$
path_regexp /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/(messages|context/.*?|event/.*?))$
}
reverse_proxy @sync_api sync_api:8073

View file

@ -18,8 +18,10 @@ VirtualHost {
# /_matrix/client/.*/user/{userId}/filter/{filterID}
# /_matrix/client/.*/keys/changes
# /_matrix/client/.*/rooms/{roomId}/messages
# /_matrix/client/.*/rooms/{roomId}/context/{eventID}
# /_matrix/client/.*/rooms/{roomId}/event/{eventID}
# to sync_api
ReverseProxy = /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/messages) http://localhost:8073 600
ReverseProxy = /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/(messages|context/.*?|event/.*?))$ http://localhost:8073 600
ReverseProxy = /_matrix/client http://localhost:8071 600
ReverseProxy = /_matrix/federation http://localhost:8072 600
ReverseProxy = /_matrix/key http://localhost:8072 600

View file

@ -28,8 +28,10 @@ server {
# /_matrix/client/.*/user/{userId}/filter/{filterID}
# /_matrix/client/.*/keys/changes
# /_matrix/client/.*/rooms/{roomId}/messages
# /_matrix/client/.*/rooms/{roomId}/context/{eventID}
# /_matrix/client/.*/rooms/{roomId}/event/{eventID}
# to sync_api
location ~ /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/messages)$ {
location ~ /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/(messages|context/.*?|event/.*?))$ {
proxy_pass http://sync_api:8073;
}

View file

@ -159,6 +159,7 @@ type PerformJoinRequest struct {
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
Content map[string]interface{} `json:"content"`
Unsigned map[string]interface{} `json:"unsigned"`
}
type PerformJoinResponse struct {

View file

@ -7,14 +7,15 @@ import (
"fmt"
"time"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
)
// PerformLeaveRequest implements api.FederationInternalAPI
@ -95,6 +96,7 @@ func (r *FederationInternalAPI) PerformJoin(
request.Content,
serverName,
supportedVersions,
request.Unsigned,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
@ -139,6 +141,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
content map[string]interface{},
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
unsigned map[string]interface{},
) error {
// Try to perform a make_join using the information supplied in the
// request.
@ -259,7 +262,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
if err != nil {
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
}
logrus.WithField("hosts", joinedHosts).WithField("room", roomID).Info("Joined federated room with hosts")
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
}
@ -267,6 +270,14 @@ func (r *FederationInternalAPI) performJoinUsingServer(
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if unsigned != nil {
event, err = event.SetUnsigned(unsigned)
if err != nil {
// non-fatal, log and continue
logrus.WithError(err).Errorf("Failed to set unsigned content")
}
}
if err = roomserverAPI.SendEventWithState(
context.Background(),
r.rsAPI,

View file

@ -160,7 +160,7 @@ func localKeys(cfg *config.FederationAPI, validUntil time.Time) (*gomatrixserver
for _, oldVerifyKey := range cfg.Matrix.OldVerifyKeys {
keys.OldVerifyKeys[oldVerifyKey.KeyID] = gomatrixserverlib.OldVerifyKey{
VerifyKey: gomatrixserverlib.VerifyKey{
Key: gomatrixserverlib.Base64Bytes(oldVerifyKey.PrivateKey.Public().(ed25519.PublicKey)),
Key: oldVerifyKey.PublicKey,
},
ExpiredTS: oldVerifyKey.ExpiredAt,
}

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 10
VersionPatch = 1
VersionPatch = 2
VersionTag = "" // example: "rc1"
)

View file

@ -70,6 +70,11 @@ func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.Perform
if len(req.OneTimeKeys) > 0 {
a.uploadOneTimeKeys(ctx, req, res)
}
otks, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
if err != nil {
return err
}
res.OneTimeKeyCounts = []api.OneTimeKeysCount{*otks}
return nil
}
@ -207,15 +212,13 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query
return nil
}
maxStreamID := int64(0)
// remove deleted devices
var result []api.DeviceMessage
for _, m := range msgs {
if m.StreamID > maxStreamID {
maxStreamID = m.StreamID
}
}
// remove deleted devices
var result []api.DeviceMessage
for _, m := range msgs {
if m.KeyJSON == nil {
if m.KeyJSON == nil || len(m.KeyJSON) == 0 {
continue
}
result = append(result, m)

View file

@ -0,0 +1,156 @@
package internal_test
import (
"context"
"reflect"
"testing"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
)
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
t.Helper()
connStr, close := test.PrepareDBConnectionString(t, dbType)
db, err := storage.NewDatabase(nil, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
})
if err != nil {
t.Fatalf("failed to create new user db: %v", err)
}
return db, close
}
func Test_QueryDeviceMessages(t *testing.T) {
alice := test.NewUser(t)
type args struct {
req *api.QueryDeviceMessagesRequest
res *api.QueryDeviceMessagesResponse
}
tests := []struct {
name string
args args
wantErr bool
want *api.QueryDeviceMessagesResponse
}{
{
name: "no existing keys",
args: args{
req: &api.QueryDeviceMessagesRequest{
UserID: "@doesNotExist:localhost",
},
res: &api.QueryDeviceMessagesResponse{},
},
want: &api.QueryDeviceMessagesResponse{},
},
{
name: "existing user returns devices",
args: args{
req: &api.QueryDeviceMessagesRequest{
UserID: alice.ID,
},
res: &api.QueryDeviceMessagesResponse{},
},
want: &api.QueryDeviceMessagesResponse{
StreamID: 6,
Devices: []api.DeviceMessage{
{
Type: api.TypeDeviceKeyUpdate, StreamID: 5, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
DisplayName: "first device",
UserID: alice.ID,
KeyJSON: []byte("ghi"),
},
},
{
Type: api.TypeDeviceKeyUpdate, StreamID: 6, DeviceKeys: &api.DeviceKeys{
DeviceID: "mySecondDevice",
DisplayName: "second device",
UserID: alice.ID,
KeyJSON: []byte("jkl"),
}, // streamID 6
},
},
},
},
}
deviceMessages := []api.DeviceMessage{
{ // not the user we're looking for
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
UserID: "@doesNotExist:localhost",
},
// streamID 1 for this user
},
{ // empty keyJSON will be ignored
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
}, // streamID 1
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
KeyJSON: []byte("abc"),
}, // streamID 2
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
KeyJSON: []byte("def"),
}, // streamID 3
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
UserID: alice.ID,
KeyJSON: []byte(""),
}, // streamID 4
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "myDevice",
DisplayName: "first device",
UserID: alice.ID,
KeyJSON: []byte("ghi"),
}, // streamID 5
},
{
Type: api.TypeDeviceKeyUpdate, DeviceKeys: &api.DeviceKeys{
DeviceID: "mySecondDevice",
UserID: alice.ID,
KeyJSON: []byte("jkl"),
DisplayName: "second device",
}, // streamID 6
},
}
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, closeDB := mustCreateDatabase(t, dbType)
defer closeDB()
if err := db.StoreLocalDeviceKeys(ctx, deviceMessages); err != nil {
t.Fatalf("failed to store local devicesKeys")
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &internal.KeyInternalAPI{
DB: db,
}
if err := a.QueryDeviceMessages(ctx, tt.args.req, tt.args.res); (err != nil) != tt.wantErr {
t.Errorf("QueryDeviceMessages() error = %v, wantErr %v", err, tt.wantErr)
}
got := tt.args.res
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("QueryDeviceMessages(): got:\n%+v, want:\n%+v", got, tt.want)
}
})
}
})
}

View file

@ -20,6 +20,7 @@ import (
"time"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/api"
@ -204,20 +205,17 @@ func (s *deviceKeysStatements) SelectBatchDeviceKeys(ctx context.Context, userID
deviceIDMap[d] = true
}
var result []api.DeviceMessage
var displayName sql.NullString
for rows.Next() {
dk := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{},
DeviceKeys: &api.DeviceKeys{
UserID: userID,
},
}
dk.UserID = userID
var keyJSON string
var streamID int64
var displayName sql.NullString
if err := rows.Scan(&dk.DeviceID, &keyJSON, &streamID, &displayName); err != nil {
if err := rows.Scan(&dk.DeviceID, &dk.KeyJSON, &dk.StreamID, &displayName); err != nil {
return nil, err
}
dk.KeyJSON = []byte(keyJSON)
dk.StreamID = streamID
if displayName.Valid {
dk.DisplayName = displayName.String
}

View file

@ -137,21 +137,17 @@ func (s *deviceKeysStatements) SelectBatchDeviceKeys(ctx context.Context, userID
}
defer internal.CloseAndLogIfError(ctx, rows, "selectBatchDeviceKeysStmt: rows.close() failed")
var result []api.DeviceMessage
var displayName sql.NullString
for rows.Next() {
dk := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{},
DeviceKeys: &api.DeviceKeys{
UserID: userID,
},
}
dk.Type = api.TypeDeviceKeyUpdate
dk.UserID = userID
var keyJSON string
var streamID int64
var displayName sql.NullString
if err := rows.Scan(&dk.DeviceID, &keyJSON, &streamID, &displayName); err != nil {
if err := rows.Scan(&dk.DeviceID, &dk.KeyJSON, &dk.StreamID, &displayName); err != nil {
return nil, err
}
dk.KeyJSON = []byte(keyJSON)
dk.StreamID = streamID
if displayName.Valid {
dk.DisplayName = displayName.String
}

View file

@ -80,6 +80,7 @@ type PerformJoinRequest struct {
UserID string `json:"user_id"`
Content map[string]interface{} `json:"content"`
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
Unsigned map[string]interface{} `json:"unsigned"`
}
type PerformJoinResponse struct {

View file

@ -7,6 +7,9 @@ import (
"fmt"
"strings"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/auth"
"github.com/matrix-org/dendrite/roomserver/state"
@ -14,8 +17,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// TODO: temporary package which has helper functions used by both internal/perform packages.
@ -97,35 +98,35 @@ func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverNam
func IsInvitePending(
ctx context.Context, db storage.Database,
roomID, userID string,
) (bool, string, string, error) {
) (bool, string, string, *gomatrixserverlib.Event, error) {
// Look up the room NID for the supplied room ID.
info, err := db.RoomInfo(ctx, roomID)
if err != nil {
return false, "", "", fmt.Errorf("r.DB.RoomInfo: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.RoomInfo: %w", err)
}
if info == nil {
return false, "", "", fmt.Errorf("cannot get RoomInfo: unknown room ID %s", roomID)
return false, "", "", nil, fmt.Errorf("cannot get RoomInfo: unknown room ID %s", roomID)
}
// Look up the state key NID for the supplied user ID.
targetUserNIDs, err := db.EventStateKeyNIDs(ctx, []string{userID})
if err != nil {
return false, "", "", fmt.Errorf("r.DB.EventStateKeyNIDs: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.EventStateKeyNIDs: %w", err)
}
targetUserNID, targetUserFound := targetUserNIDs[userID]
if !targetUserFound {
return false, "", "", fmt.Errorf("missing NID for user %q (%+v)", userID, targetUserNIDs)
return false, "", "", nil, fmt.Errorf("missing NID for user %q (%+v)", userID, targetUserNIDs)
}
// Let's see if we have an event active for the user in the room. If
// we do then it will contain a server name that we can direct the
// send_leave to.
senderUserNIDs, eventIDs, err := db.GetInvitesForUser(ctx, info.RoomNID, targetUserNID)
senderUserNIDs, eventIDs, eventJSON, err := db.GetInvitesForUser(ctx, info.RoomNID, targetUserNID)
if err != nil {
return false, "", "", fmt.Errorf("r.DB.GetInvitesForUser: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.GetInvitesForUser: %w", err)
}
if len(senderUserNIDs) == 0 {
return false, "", "", nil
return false, "", "", nil, nil
}
userNIDToEventID := make(map[types.EventStateKeyNID]string)
for i, nid := range senderUserNIDs {
@ -135,18 +136,20 @@ func IsInvitePending(
// Look up the user ID from the NID.
senderUsers, err := db.EventStateKeys(ctx, senderUserNIDs)
if err != nil {
return false, "", "", fmt.Errorf("r.DB.EventStateKeys: %w", err)
return false, "", "", nil, fmt.Errorf("r.DB.EventStateKeys: %w", err)
}
if len(senderUsers) == 0 {
return false, "", "", fmt.Errorf("no senderUsers")
return false, "", "", nil, fmt.Errorf("no senderUsers")
}
senderUser, senderUserFound := senderUsers[senderUserNIDs[0]]
if !senderUserFound {
return false, "", "", fmt.Errorf("missing user for NID %d (%+v)", senderUserNIDs[0], senderUsers)
return false, "", "", nil, fmt.Errorf("missing user for NID %d (%+v)", senderUserNIDs[0], senderUsers)
}
return true, senderUser, userNIDToEventID[senderUserNIDs[0]], nil
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false, info.RoomVersion)
return true, senderUser, userNIDToEventID[senderUserNIDs[0]], event, err
}
// GetMembershipsAtState filters the state events to

View file

@ -173,12 +173,15 @@ func (r *Inputer) processRoomEvent(
for _, server := range serverRes.ServerNames {
servers[server] = struct{}{}
}
// Don't try to talk to ourselves.
delete(servers, r.Cfg.Matrix.ServerName)
// Now build up the list of servers.
serverRes.ServerNames = serverRes.ServerNames[:0]
if input.Origin != "" {
if input.Origin != "" && input.Origin != r.Cfg.Matrix.ServerName {
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
delete(servers, input.Origin)
}
if senderDomain != input.Origin {
if senderDomain != input.Origin && senderDomain != r.Cfg.Matrix.ServerName {
serverRes.ServerNames = append(serverRes.ServerNames, senderDomain)
delete(servers, senderDomain)
}

View file

@ -22,6 +22,10 @@ import (
"time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -32,8 +36,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type Joiner struct {
@ -236,7 +238,7 @@ func (r *Joiner) performJoinRoomByID(
// Force a federated join if we're dealing with a pending invite
// and we aren't in the room.
isInvitePending, inviteSender, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID)
isInvitePending, inviteSender, _, inviteEvent, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID)
if err == nil && !serverInRoom && isInvitePending {
_, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
if ierr != nil {
@ -248,6 +250,17 @@ func (r *Joiner) performJoinRoomByID(
if inviterDomain != r.Cfg.Matrix.ServerName {
req.ServerNames = append(req.ServerNames, inviterDomain)
forceFederatedJoin = true
memberEvent := gjson.Parse(string(inviteEvent.JSON()))
// only set unsigned if we've got a content.membership, which we _should_
if memberEvent.Get("content.membership").Exists() {
req.Unsigned = map[string]interface{}{
"prev_sender": memberEvent.Get("sender").Str,
"prev_content": map[string]interface{}{
"is_direct": memberEvent.Get("content.is_direct").Bool(),
"membership": memberEvent.Get("content.membership").Str,
},
}
}
}
}
@ -348,6 +361,7 @@ func (r *Joiner) performFederatedJoinRoomByID(
UserID: req.UserID, // the user ID joining the room
ServerNames: req.ServerNames, // the server to try joining with
Content: req.Content, // the membership event content
Unsigned: req.Unsigned, // the unsigned event content, if any
}
fedRes := fsAPI.PerformJoinResponse{}
r.FSAPI.PerformJoin(ctx, &fedReq, &fedRes)

View file

@ -79,7 +79,7 @@ func (r *Leaver) performLeaveRoomByID(
) ([]api.OutputEvent, error) {
// If there's an invite outstanding for the room then respond to
// that.
isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID)
isInvitePending, senderUser, eventID, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID)
if err == nil && isInvitePending {
_, senderDomain, serr := gomatrixserverlib.SplitID('@', senderUser)
if serr != nil {

View file

@ -872,7 +872,7 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
// but we don't specify an authorised via user, since the event auth
// will allow the join anyway.
var pending bool
if pending, _, _, err = helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID); err != nil {
if pending, _, _, _, err = helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID); err != nil {
return fmt.Errorf("helpers.IsInvitePending: %w", err)
} else if pending {
res.Allowed = true

View file

@ -17,10 +17,11 @@ package storage
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
@ -104,7 +105,7 @@ type Database interface {
// Look up the active invites targeting a user in a room and return the
// numeric state key IDs for the user IDs who sent them along with the event IDs for the invites.
// Returns an error if there was a problem talking to the database.
GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, err error)
GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, inviteEventJSON []byte, err error)
// Save a given room alias with the room ID it refers to.
// Returns an error if there was a problem talking to the database.
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error

View file

@ -61,7 +61,7 @@ const insertInviteEventSQL = "" +
" ON CONFLICT DO NOTHING"
const selectInviteActiveForUserInRoomSQL = "" +
"SELECT invite_event_id, sender_nid FROM roomserver_invites" +
"SELECT invite_event_id, sender_nid, invite_event_json FROM roomserver_invites" +
" WHERE target_nid = $1 AND room_nid = $2" +
" AND NOT retired"
@ -141,25 +141,26 @@ func (s *inviteStatements) UpdateInviteRetired(
func (s *inviteStatements) SelectInviteActiveForUserInRoom(
ctx context.Context, txn *sql.Tx,
targetUserNID types.EventStateKeyNID, roomNID types.RoomNID,
) ([]types.EventStateKeyNID, []string, error) {
) ([]types.EventStateKeyNID, []string, []byte, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteActiveForUserInRoomStmt)
rows, err := stmt.QueryContext(
ctx, targetUserNID, roomNID,
)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteActiveForUserInRoom: rows.close() failed")
var result []types.EventStateKeyNID
var eventIDs []string
var inviteEventID string
var senderUserNID int64
var eventJSON []byte
for rows.Next() {
if err := rows.Scan(&inviteEventID, &senderUserNID); err != nil {
return nil, nil, err
if err := rows.Scan(&inviteEventID, &senderUserNID, &eventJSON); err != nil {
return nil, nil, nil, err
}
result = append(result, types.EventStateKeyNID(senderUserNID))
eventIDs = append(eventIDs, inviteEventID)
}
return result, eventIDs, rows.Err()
return result, eventIDs, eventJSON, rows.Err()
}

View file

@ -7,13 +7,14 @@ import (
"fmt"
"sort"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
)
// Ideally, when we have both events we should redact the event JSON and forget about the redaction, but we currently
@ -445,7 +446,7 @@ func (d *Database) GetInvitesForUser(
ctx context.Context,
roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID,
) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, err error) {
) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, inviteEventJSON []byte, err error) {
return d.InvitesTable.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
}

View file

@ -44,7 +44,7 @@ const insertInviteEventSQL = "" +
" ON CONFLICT DO NOTHING"
const selectInviteActiveForUserInRoomSQL = "" +
"SELECT invite_event_id, sender_nid FROM roomserver_invites" +
"SELECT invite_event_id, sender_nid, invite_event_json FROM roomserver_invites" +
" WHERE target_nid = $1 AND room_nid = $2" +
" AND NOT retired"
@ -136,25 +136,26 @@ func (s *inviteStatements) UpdateInviteRetired(
func (s *inviteStatements) SelectInviteActiveForUserInRoom(
ctx context.Context, txn *sql.Tx,
targetUserNID types.EventStateKeyNID, roomNID types.RoomNID,
) ([]types.EventStateKeyNID, []string, error) {
) ([]types.EventStateKeyNID, []string, []byte, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteActiveForUserInRoomStmt)
rows, err := stmt.QueryContext(
ctx, targetUserNID, roomNID,
)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteActiveForUserInRoom: rows.close() failed")
var result []types.EventStateKeyNID
var eventIDs []string
var eventID string
var senderUserNID int64
var eventJSON []byte
for rows.Next() {
if err := rows.Scan(&eventID, &senderUserNID); err != nil {
return nil, nil, err
if err := rows.Scan(&eventID, &senderUserNID, &eventJSON); err != nil {
return nil, nil, nil, err
}
result = append(result, types.EventStateKeyNID(senderUserNID))
eventIDs = append(eventIDs, eventID)
}
return result, eventIDs, nil
return result, eventIDs, eventJSON, nil
}

View file

@ -116,7 +116,7 @@ type Invites interface {
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string, roomNID types.RoomNID, targetUserNID, senderUserNID types.EventStateKeyNID, inviteEventJSON []byte) (bool, error)
UpdateInviteRetired(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) ([]string, error)
// SelectInviteActiveForUserInRoom returns a list of sender state key NIDs and invite event IDs matching those nids.
SelectInviteActiveForUserInRoom(ctx context.Context, txn *sql.Tx, targetUserNID types.EventStateKeyNID, roomNID types.RoomNID) ([]types.EventStateKeyNID, []string, error)
SelectInviteActiveForUserInRoom(ctx context.Context, txn *sql.Tx, targetUserNID types.EventStateKeyNID, roomNID types.RoomNID) ([]types.EventStateKeyNID, []string, []byte, error)
}
type MembershipState int64

View file

@ -4,6 +4,9 @@ import (
"context"
"testing"
"github.com/matrix-org/util"
"github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
@ -11,8 +14,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/util"
"github.com/stretchr/testify/assert"
)
func mustCreateInviteTable(t *testing.T, dbType test.DBType) (tables.Invites, func()) {
@ -67,7 +68,7 @@ func TestInviteTable(t *testing.T) {
assert.NoError(t, err)
assert.True(t, newInvite)
stateKeyNIDs, eventIDs, err := tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
stateKeyNIDs, eventIDs, _, err := tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
assert.NoError(t, err)
assert.Equal(t, []string{eventID1, eventID2}, eventIDs)
assert.Equal(t, []types.EventStateKeyNID{2, 2}, stateKeyNIDs)
@ -78,13 +79,13 @@ func TestInviteTable(t *testing.T) {
assert.Equal(t, []string{eventID1, eventID2}, retiredEventIDs)
// This should now be empty
stateKeyNIDs, eventIDs, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
stateKeyNIDs, eventIDs, _, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, targetUserNID, roomNID)
assert.NoError(t, err)
assert.Empty(t, eventIDs)
assert.Empty(t, stateKeyNIDs)
// Non-existent targetUserNID
stateKeyNIDs, eventIDs, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, types.EventStateKeyNID(10), roomNID)
stateKeyNIDs, eventIDs, _, err = tab.SelectInviteActiveForUserInRoom(ctx, nil, types.EventStateKeyNID(10), roomNID)
assert.NoError(t, err)
assert.Empty(t, stateKeyNIDs)
assert.Empty(t, eventIDs)

View file

@ -231,13 +231,14 @@ func loadConfig(
return nil, err
}
for i, oldPrivateKey := range c.Global.OldVerifyKeys {
for _, key := range c.Global.OldVerifyKeys {
switch {
case key.PrivateKeyPath != "":
var oldPrivateKeyData []byte
oldPrivateKeyPath := absPath(basePath, oldPrivateKey.PrivateKeyPath)
oldPrivateKeyPath := absPath(basePath, key.PrivateKeyPath)
oldPrivateKeyData, err = readFile(oldPrivateKeyPath)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to read %q: %w", oldPrivateKeyPath, err)
}
// NOTSPEC: Ordinarily we should enforce key ID formatting, but since there are
@ -245,10 +246,25 @@ func loadConfig(
// to lack of validation in Synapse, we won't enforce that for old verify keys.
keyID, privateKey, perr := readKeyPEM(oldPrivateKeyPath, oldPrivateKeyData, false)
if perr != nil {
return nil, perr
return nil, fmt.Errorf("failed to parse %q: %w", oldPrivateKeyPath, perr)
}
c.Global.OldVerifyKeys[i].KeyID, c.Global.OldVerifyKeys[i].PrivateKey = keyID, privateKey
key.KeyID = keyID
key.PrivateKey = privateKey
key.PublicKey = gomatrixserverlib.Base64Bytes(privateKey.Public().(ed25519.PublicKey))
case key.KeyID == "":
return nil, fmt.Errorf("'key_id' must be specified if 'public_key' is specified")
case len(key.PublicKey) == ed25519.PublicKeySize:
continue
case len(key.PublicKey) > 0:
return nil, fmt.Errorf("the supplied 'public_key' is the wrong length")
default:
return nil, fmt.Errorf("either specify a 'private_key' path or supply both 'public_key' and 'key_id'")
}
}
c.MediaAPI.AbsBasePath = Path(absPath(basePath, c.MediaAPI.BasePath))

View file

@ -27,7 +27,7 @@ type Global struct {
// Information about old private keys that used to be used to sign requests and
// events on this domain. They will not be used but will be advertised to other
// servers that ask for them to help verify old events.
OldVerifyKeys []OldVerifyKeys `yaml:"old_private_keys"`
OldVerifyKeys []*OldVerifyKeys `yaml:"old_private_keys"`
// How long a remote server can cache our server key for before requesting it again.
// Increasing this number will reduce the number of requests made by remote servers
@ -127,8 +127,11 @@ type OldVerifyKeys struct {
// The private key itself.
PrivateKey ed25519.PrivateKey `yaml:"-"`
// The public key, in case only that part is known.
PublicKey gomatrixserverlib.Base64Bytes `yaml:"public_key"`
// The key ID of the private key.
KeyID gomatrixserverlib.KeyID `yaml:"-"`
KeyID gomatrixserverlib.KeyID `yaml:"key_id"`
// When the private key was designed as "expired", as a UNIX timestamp
// in millisecond precision.

View file

@ -19,11 +19,13 @@ import (
"math"
"time"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
)
func init() {
@ -189,7 +191,7 @@ func visibilityForEvents(
UserID: userID,
}, membershipResp)
if err != nil {
return result, err
logrus.WithError(err).Error("visibilityForEvents: failed to fetch membership at event, defaulting to 'leave'")
}
// Create a map from eventID -> eventVisibility

View file

@ -48,6 +48,7 @@ type Notifier struct {
lastCleanUpTime time.Time
// This map is reused to prevent allocations and GC pressure in SharedUsers.
_sharedUserMap map[string]struct{}
_wakeupUserMap map[string]struct{}
}
// NewNotifier creates a new notifier set to the given sync position.
@ -61,6 +62,7 @@ func NewNotifier() *Notifier {
lock: &sync.RWMutex{},
lastCleanUpTime: time.Now(),
_sharedUserMap: map[string]struct{}{},
_wakeupUserMap: map[string]struct{}{},
}
}
@ -408,12 +410,16 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P
// specified user IDs, and also the specified peekingDevices
func (n *Notifier) _wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
for _, userID := range userIDs {
n._wakeupUserMap[userID] = struct{}{}
}
for userID := range n._wakeupUserMap {
for _, stream := range n._fetchUserStreams(userID) {
if stream == nil {
continue
}
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
delete(n._wakeupUserMap, userID)
}
for _, peekingDevice := range peekingDevices {

102
syncapi/routing/getevent.go Normal file
View file

@ -0,0 +1,102 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// GetEvent implements
//
// GET /_matrix/client/r0/rooms/{roomId}/event/{eventId}
//
// https://spec.matrix.org/v1.4/client-server-api/#get_matrixclientv3roomsroomideventeventid
func GetEvent(
req *http.Request,
device *userapi.Device,
roomID string,
eventID string,
cfg *config.SyncAPI,
syncDB storage.Database,
rsAPI api.SyncRoomserverAPI,
) util.JSONResponse {
ctx := req.Context()
db, err := syncDB.NewDatabaseTransaction(ctx)
logger := util.GetLogger(ctx).WithFields(logrus.Fields{
"event_id": eventID,
"room_id": roomID,
})
if err != nil {
logger.WithError(err).Error("GetEvent: syncDB.NewDatabaseTransaction failed")
return jsonerror.InternalServerError()
}
events, err := db.Events(ctx, []string{eventID})
if err != nil {
logger.WithError(err).Error("GetEvent: syncDB.Events failed")
return jsonerror.InternalServerError()
}
// The requested event does not exist in our database
if len(events) == 0 {
logger.Debugf("GetEvent: requested event doesn't exist locally")
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("The event was not found or you do not have permission to read this event"),
}
}
// If the request is coming from an appservice, get the user from the request
userID := device.UserID
if asUserID := req.FormValue("user_id"); device.AppserviceID != "" && asUserID != "" {
userID = asUserID
}
// Apply history visibility to determine if the user is allowed to view the event
events, err = internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, events, nil, userID, "event")
if err != nil {
logger.WithError(err).Error("GetEvent: internal.ApplyHistoryVisibilityFilter failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}
// We only ever expect there to be one event
if len(events) != 1 {
// 0 events -> not allowed to view event; > 1 events -> something that shouldn't happen
logger.WithField("event_count", len(events)).Debug("GetEvent: can't return the requested event")
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("The event was not found or you do not have permission to read this event"),
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: gomatrixserverlib.HeaderedToClientEvent(events[0], gomatrixserverlib.FormatAll),
}
}

View file

@ -60,6 +60,16 @@ func Setup(
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, rsAPI, cfg, srp, lazyLoadCache)
})).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/event/{eventID}",
httputil.MakeAuthAPI("rooms_get_event", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return GetEvent(req, device, vars["roomID"], vars["eventID"], cfg, syncDB, rsAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/user/{userId}/filter",
httputil.MakeAuthAPI("put_filter", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))

View file

@ -77,9 +77,9 @@ func (p *ReceiptStreamProvider) IncrementalSync(
continue
}
jr := types.NewJoinResponse()
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
jr = existing
jr, ok := req.Response.Rooms.Join[roomID]
if !ok {
jr = types.NewJoinResponse()
}
ev := gomatrixserverlib.ClientEvent{

View file

@ -407,7 +407,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.PDUPosition, currentPos.PDUPosition,
syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition,
)
},
),
@ -416,7 +416,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.TypingPosition, currentPos.TypingPosition,
syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition,
)
},
),
@ -425,7 +425,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition,
)
},
),
@ -434,7 +434,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.InvitePosition, currentPos.InvitePosition,
syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition,
)
},
),
@ -443,7 +443,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition,
)
},
),
@ -452,7 +452,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition,
)
},
),
@ -461,7 +461,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.NotificationDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition,
)
},
),
@ -470,7 +470,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition,
)
},
),
@ -479,7 +479,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.PresencePosition, currentPos.PresencePosition,
syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition,
)
},
),

View file

@ -838,6 +838,8 @@ func (a *UserInternalAPI) QueryAccountByPassword(ctx context.Context, req *api.Q
return nil
case bcrypt.ErrMismatchedHashAndPassword: // user exists, but password doesn't match
return nil
case bcrypt.ErrHashTooShort: // user exists, but probably a passwordless account
return nil
default:
res.Exists = true
res.Account = acc

View file

@ -75,6 +75,9 @@ func (d *Database) GetAccountByPassword(
if err != nil {
return nil, err
}
if len(hash) == 0 && len(plaintextPassword) > 0 {
return nil, bcrypt.ErrHashTooShort
}
if err := bcrypt.CompareHashAndPassword([]byte(hash), []byte(plaintextPassword)); err != nil {
return nil, err
}

View file

@ -151,6 +151,33 @@ func TestQueryProfile(t *testing.T) {
})
}
// TestPasswordlessLoginFails ensures that a passwordless account cannot
// be logged into using an arbitrary password (effectively a regression test
// for https://github.com/matrix-org/dendrite/issues/2780).
func TestPasswordlessLoginFails(t *testing.T) {
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
userAPI, accountDB, close := MustMakeInternalAPI(t, apiTestOpts{}, dbType)
defer close()
_, err := accountDB.CreateAccount(ctx, "auser", "", "", api.AccountTypeAppService)
if err != nil {
t.Fatalf("failed to make account: %s", err)
}
userReq := &api.QueryAccountByPasswordRequest{
Localpart: "auser",
PlaintextPassword: "apassword",
}
userRes := &api.QueryAccountByPasswordResponse{}
if err := userAPI.QueryAccountByPassword(ctx, userReq, userRes); err != nil {
t.Fatal(err)
}
if userRes.Exists || userRes.Account != nil {
t.Fatal("QueryAccountByPassword should not return correctly for a passwordless account")
}
})
}
func TestLoginToken(t *testing.T) {
ctx := context.Background()