Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/userprofile

This commit is contained in:
Till Faelligen 2022-06-27 09:05:45 +02:00
commit 4f997608c4
28 changed files with 559 additions and 103 deletions

View file

@ -1,5 +1,22 @@
# Changelog
## Dendrite 0.8.8 (2022-06-09)
### Features
* The performance of state resolution has been increased significantly for larger rooms
* A number of changes have been made to rate limiting:
* Logged in users will now be rate-limited on a per-session basis rather than by remote IP
* Rate limiting no longer applies to admin or appservice users
* It is now possible to configure additional users that are exempt from rate limiting using the `exempt_user_ids` option in the `rate_limiting` section of the Dendrite config
* Setting state is now idempotent via the client API state endpoints
### Fixes
* Room upgrades now properly propagate tombstone events to remote servers
* Room upgrades will no longer send tombstone events if creating the upgraded room fails
* A crash has been fixed when evaluating restricted room joins
## Dendrite 0.8.7 (2022-06-01)
### Features

View file

@ -44,7 +44,7 @@ func GetAliases(
return util.ErrorResponse(fmt.Errorf("rsAPI.QueryCurrentState: %w", err))
}
visibility := "invite"
visibility := gomatrixserverlib.HistoryVisibilityInvited
if historyVisEvent, ok := stateRes.StateEvents[stateTuple]; ok {
var err error
visibility, err = historyVisEvent.HistoryVisibility()

View file

@ -29,9 +29,10 @@ import (
"sync"
"time"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/tidwall/gjson"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/tokens"
@ -68,9 +69,10 @@ const (
// It shouldn't be passed by value because it contains a mutex.
type sessionsDict struct {
sync.RWMutex
sessions map[string][]authtypes.LoginType
params map[string]registerRequest
timer map[string]*time.Timer
sessions map[string][]authtypes.LoginType
sessionCompletedResult map[string]registerResponse
params map[string]registerRequest
timer map[string]*time.Timer
// deleteSessionToDeviceID protects requests to DELETE /devices/{deviceID} from being abused.
// If a UIA session is started by trying to delete device1, and then UIA is completed by deleting device2,
// the delete request will fail for device2 since the UIA was initiated by trying to delete device1.
@ -115,6 +117,7 @@ func (d *sessionsDict) deleteSession(sessionID string) {
delete(d.params, sessionID)
delete(d.sessions, sessionID)
delete(d.deleteSessionToDeviceID, sessionID)
delete(d.sessionCompletedResult, sessionID)
// stop the timer, e.g. because the registration was completed
if t, ok := d.timer[sessionID]; ok {
if !t.Stop() {
@ -130,6 +133,7 @@ func (d *sessionsDict) deleteSession(sessionID string) {
func newSessionsDict() *sessionsDict {
return &sessionsDict{
sessions: make(map[string][]authtypes.LoginType),
sessionCompletedResult: make(map[string]registerResponse),
params: make(map[string]registerRequest),
timer: make(map[string]*time.Timer),
deleteSessionToDeviceID: make(map[string]string),
@ -173,6 +177,19 @@ func (d *sessionsDict) addDeviceToDelete(sessionID, deviceID string) {
d.deleteSessionToDeviceID[sessionID] = deviceID
}
func (d *sessionsDict) addCompletedRegistration(sessionID string, response registerResponse) {
d.Lock()
defer d.Unlock()
d.sessionCompletedResult[sessionID] = response
}
func (d *sessionsDict) getCompletedRegistration(sessionID string) (registerResponse, bool) {
d.RLock()
defer d.RUnlock()
result, ok := d.sessionCompletedResult[sessionID]
return result, ok
}
func (d *sessionsDict) getDeviceToDelete(sessionID string) (string, bool) {
d.RLock()
defer d.RUnlock()
@ -544,6 +561,14 @@ func Register(
r.DeviceID = data.DeviceID
r.InitialDisplayName = data.InitialDisplayName
r.InhibitLogin = data.InhibitLogin
// Check if the user already registered using this session, if so, return that result
if response, ok := sessions.getCompletedRegistration(sessionID); ok {
return util.JSONResponse{
Code: http.StatusOK,
JSON: response,
}
}
}
if resErr := httputil.UnmarshalJSON(reqBody, &r); resErr != nil {
return *resErr
@ -839,13 +864,6 @@ func completeRegistration(
displayName, deviceID *string,
accType userapi.AccountType,
) util.JSONResponse {
var registrationOK bool
defer func() {
if registrationOK {
sessions.deleteSession(sessionID)
}
}()
if username == "" {
return util.JSONResponse{
Code: http.StatusBadRequest,
@ -886,7 +904,6 @@ func completeRegistration(
// Check whether inhibit_login option is set. If so, don't create an access
// token or a device for this user
if inhibitLogin {
registrationOK = true
return util.JSONResponse{
Code: http.StatusOK,
JSON: registerResponse{
@ -920,15 +937,17 @@ func completeRegistration(
}
}
registrationOK = true
result := registerResponse{
UserID: devRes.Device.UserID,
AccessToken: devRes.Device.AccessToken,
HomeServer: accRes.Account.ServerName,
DeviceID: devRes.Device.ID,
}
sessions.addCompletedRegistration(sessionID, result)
return util.JSONResponse{
Code: http.StatusOK,
JSON: registerResponse{
UserID: devRes.Device.UserID,
AccessToken: devRes.Device.AccessToken,
HomeServer: accRes.Account.ServerName,
DeviceID: devRes.Device.ID,
},
JSON: result,
}
}

View file

@ -28,7 +28,9 @@ import (
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
var filterType = flag.String("filtertype", "", "the event types to filter on")
var difference = flag.Bool("difference", false, "whether to calculate the difference between snapshots")
// nolint:gocyclo
func main() {
ctx := context.Background()
cfg := setup.ParseFlags(true)
@ -36,6 +38,7 @@ func main() {
Type: "std",
Level: "error",
})
cfg.ClientAPI.RegistrationDisabled = true
base := base.NewBaseDendrite(cfg, "ResolveState", base.DisableMetrics)
args := flag.Args()
@ -64,6 +67,59 @@ func main() {
RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion),
})
if *difference {
if len(snapshotNIDs) != 2 {
panic("need exactly two state snapshot NIDs to calculate difference")
}
var removed, added []types.StateEntry
removed, added, err = stateres.DifferenceBetweeenStateSnapshots(ctx, snapshotNIDs[0], snapshotNIDs[1])
if err != nil {
panic(err)
}
var eventNIDs []types.EventNID
for _, entry := range append(removed, added...) {
eventNIDs = append(eventNIDs, entry.EventNID)
}
var eventEntries []types.Event
eventEntries, err = roomserverDB.Events(ctx, eventNIDs)
if err != nil {
panic(err)
}
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(eventEntries))
for _, entry := range eventEntries {
events[entry.EventNID] = entry.Event
}
if len(removed) > 0 {
fmt.Println("Removed:")
for _, r := range removed {
event := events[r.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
if len(removed) > 0 && len(added) > 0 {
fmt.Println()
}
if len(added) > 0 {
fmt.Println("Added:")
for _, a := range added {
event := events[a.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
return
}
var stateEntries []types.StateEntry
for _, snapshotNID := range snapshotNIDs {
var entries []types.StateEntry

View file

@ -0,0 +1,68 @@
{
# debug
admin off
email example@example.com
default_sni example.com
# Debug endpoint
# acme_ca https://acme-staging-v02.api.letsencrypt.org/directory
}
#######################################################################
# Snippets
#______________________________________________________________________
(handle_errors_maintenance) {
handle_errors {
@maintenance expression {http.error.status_code} == 502
rewrite @maintenance maintenance.html
root * "/path/to/service/pages"
file_server
}
}
(matrix-well-known-header) {
# Headers
header Access-Control-Allow-Origin "*"
header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS"
header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization"
header Content-Type "application/json"
}
#######################################################################
example.com {
# ...
handle /.well-known/matrix/server {
import matrix-well-known-header
respond `{ "m.server": "matrix.example.com:443" }` 200
}
handle /.well-known/matrix/client {
import matrix-well-known-header
respond `{ "m.homeserver": { "base_url": "https://matrix.example.com" } }` 200
}
import handle_errors_maintenance
}
example.com:8448 {
# server<->server HTTPS traffic
reverse_proxy http://dendrite-host:8008
}
matrix.example.com {
handle /_matrix/* {
# client<->server HTTPS traffic
reverse_proxy http://dendrite-host:8008
}
handle_path /* {
# Client webapp (Element SPA or ...)
file_server {
root /path/to/www/example.com/matrix-web-client/
}
}
}

View file

@ -0,0 +1,71 @@
---
title: Optimise your installation
parent: Installation
has_toc: true
nav_order: 10
permalink: /installation/start/optimisation
---
# Optimise your installation
Now that you have Dendrite running, the following tweaks will improve the reliability
and performance of your installation.
## File descriptor limit
Most platforms have a limit on how many file descriptors a single process can open. All
connections made by Dendrite consume file descriptors — this includes database connections
and network requests to remote homeservers. When participating in large federated rooms
where Dendrite must talk to many remote servers, it is often very easy to exhaust default
limits which are quite low.
We currently recommend setting the file descriptor limit to 65535 to avoid such
issues. Dendrite will log immediately after startup if the file descriptor limit is too low:
```
level=warning msg="IMPORTANT: Process file descriptor limit is currently 1024, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues"
```
UNIX systems have two limits: a hard limit and a soft limit. You can view the soft limit
by running `ulimit -Sn` and the hard limit with `ulimit -Hn`:
```bash
$ ulimit -Hn
1048576
$ ulimit -Sn
1024
```
Increase the soft limit before starting Dendrite:
```bash
ulimit -Sn 65535
```
The log line at startup should no longer appear if the limit is sufficient.
If you are running under a systemd service, you can instead add `LimitNOFILE=65535` option
to the `[Service]` section of your service unit file.
## DNS caching
Dendrite has a built-in DNS cache which significantly reduces the load that Dendrite will
place on your DNS resolver. This may also speed up outbound federation.
Consider enabling the DNS cache by modifying the `global` section of your configuration file:
```yaml
dns_cache:
enabled: true
cache_size: 4096
cache_lifetime: 600s
```
## Time synchronisation
Matrix relies heavily on TLS which requires the system time to be correct. If the clock
drifts then you may find that federation no works reliably (or at all) and clients may
struggle to connect to your Dendrite server.
Ensure that the time is synchronised on your system by enabling NTP sync.

View file

@ -63,6 +63,7 @@ func AddPublicRoutes(
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
ServerName: cfg.Matrix.ServerName,
UserAPI: userAPI,
}

View file

@ -17,6 +17,7 @@ package producers
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
@ -34,6 +35,7 @@ type SyncAPIProducer struct {
TopicSendToDeviceEvent string
TopicTypingEvent string
TopicPresenceEvent string
TopicDeviceListUpdate string
JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
@ -161,3 +163,18 @@ func (p *SyncAPIProducer) SendPresence(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}
func (p *SyncAPIProducer) SendDeviceListUpdate(
ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent,
) (err error) {
m := nats.NewMsg(p.TopicDeviceListUpdate)
m.Header.Set(jetstream.UserID, deviceListUpdate.UserID)
m.Data, err = json.Marshal(deviceListUpdate)
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}
log.Debugf("Sending device list update: %+v", m.Header)
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}

View file

@ -85,6 +85,9 @@ func GetUserDevices(
if targetKey, ok := targetUser[gomatrixserverlib.KeyID(dev.DeviceID)]; ok {
for sourceUserID, forSourceUser := range targetKey {
for sourceKeyID, sourceKey := range forSourceUser {
if device.Keys.Signatures == nil {
device.Keys.Signatures = map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{}
}
if _, ok := device.Keys.Signatures[sourceUserID]; !ok {
device.Keys.Signatures[sourceUserID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{}
}

View file

@ -512,11 +512,7 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
} else if serverName != t.Origin {
return
}
var inputRes keyapi.InputDeviceListUpdateResponse
t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{
Event: payload,
}, &inputRes)
if inputRes.Error != nil {
util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil {
util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
}
}

2
go.mod
View file

@ -34,7 +34,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220607143425-e55d796fd0b3
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13

4
go.sum
View file

@ -418,8 +418,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220607143425-e55d796fd0b3 h1:2eYcBt8Kg+nW/xIJY5x8Uo2dQLjUF+oxLap00uFC5l8=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220607143425-e55d796fd0b3/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a h1:jOkrb6twViAGTHHadA51sQwdloHT0Vx1MCptk9InTHo=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 8
VersionPatch = 7
VersionPatch = 8
VersionTag = "" // example: "rc1"
)

View file

@ -62,8 +62,6 @@ type FederationKeyAPI interface {
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
// InputDeviceListUpdate from a federated server EDU
InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse)
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
}
@ -337,11 +335,3 @@ type QuerySignaturesResponse struct {
// The request error, if any
Error *KeyError
}
type InputDeviceListUpdateRequest struct {
Event gomatrixserverlib.DeviceListUpdateEvent
}
type InputDeviceListUpdateResponse struct {
Error *KeyError
}

View file

@ -0,0 +1,82 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// DeviceListUpdateConsumer consumes device list updates that came in over federation.
type DeviceListUpdateConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
updater *internal.DeviceListUpdater
}
// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
func NewDeviceListUpdateConsumer(
process *process.ProcessContext,
cfg *config.KeyServer,
js nats.JetStreamContext,
updater *internal.DeviceListUpdater,
) *DeviceListUpdateConsumer {
return &DeviceListUpdateConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
updater: updater,
}
}
// Start consuming from key servers
func (t *DeviceListUpdateConsumer) Start() error {
return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m gomatrixserverlib.DeviceListUpdateEvent
if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("Failed to read from device list update input topic")
return true
}
err := t.updater.Update(ctx, m)
if err != nil {
logrus.WithFields(logrus.Fields{
"user_id": m.UserID,
"device_id": m.DeviceID,
"stream_id": m.StreamID,
"prev_id": m.PrevID,
}).WithError(err).Errorf("Failed to update device list")
return false
}
return true
}

View file

@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
a.UserAPI = i
}
func (a *KeyInternalAPI) InputDeviceListUpdate(
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
) {
err := a.Updater.Update(ctx, req.Event)
if err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("failed to update device list: %s", err),
}
}
}
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset)
if err != nil {

View file

@ -63,20 +63,6 @@ type httpKeyInternalAPI struct {
func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
// no-op: doesn't need it
}
func (h *httpKeyInternalAPI) InputDeviceListUpdate(
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate")
defer span.Finish()
apiURL := h.apiURL + InputDeviceListUpdatePath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
if err != nil {
res.Error = &api.KeyError{
Err: err.Error(),
}
}
}
func (h *httpKeyInternalAPI) PerformClaimKeys(
ctx context.Context,

View file

@ -25,17 +25,6 @@ import (
)
func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
internalAPIMux.Handle(InputDeviceListUpdatePath,
httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse {
request := api.InputDeviceListUpdateRequest{}
response := api.InputDeviceListUpdateResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.InputDeviceListUpdate(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformClaimKeysPath,
httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse {
request := api.PerformClaimKeysRequest{}

View file

@ -18,6 +18,7 @@ import (
"github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/consumers"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers"
@ -59,10 +60,17 @@ func NewInternalAPI(
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
ap.Updater = updater
go func() {
if err := updater.Start(); err != nil {
if err = updater.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start device list updater")
}
}()
dlConsumer := consumers.NewDeviceListUpdateConsumer(
base.ProcessContext, cfg, js, updater,
)
if err = dlConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start device list consumer")
}
return ap
}

View file

@ -161,6 +161,8 @@ type OutputNewRoomEvent struct {
// The transaction ID of the send request if sent by a local user and one
// was specified
TransactionID *TransactionID `json:"transaction_id,omitempty"`
// The history visibility of the event.
HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"`
}
func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
@ -187,7 +189,8 @@ func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.Headere
// should build their current room state up from OutputNewRoomEvents only.
type OutputOldRoomEvent struct {
// The Event.
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"`
}
// An OutputNewInviteEvent is written whenever an invite becomes active.

View file

@ -295,6 +295,22 @@ func (r *Inputer) processRoomEvent(
}
}
// Get the state before the event so that we can work out if the event was
// allowed at the time, and also to get the history visibility. We won't
// bother doing this if the event was already rejected as it just ends up
// burning CPU time.
historyVisibility := gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive.
if rejectionErr == nil && !isRejected && !softfail {
var err error
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
if err != nil {
return fmt.Errorf("r.processStateBefore: %w", err)
}
if rejectionErr != nil {
isRejected = true
}
}
// Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected || softfail)
if err != nil {
@ -360,6 +376,7 @@ func (r *Inputer) processRoomEvent(
input.SendAsServer, // send as server
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
historyVisibility, // the history visibility before the event
); err != nil {
return fmt.Errorf("r.updateLatestEvents: %w", err)
}
@ -368,7 +385,8 @@ func (r *Inputer) processRoomEvent(
{
Type: api.OutputTypeOldRoomEvent,
OldRoomEvent: &api.OutputOldRoomEvent{
Event: headered,
Event: headered,
HistoryVisibility: historyVisibility,
},
},
})
@ -402,6 +420,100 @@ func (r *Inputer) processRoomEvent(
return nil
}
// processStateBefore works out what the state is before the event and
// then checks the event auths against the state at the time. It also
// tries to determine what the history visibility was of the event at
// the time, so that it can be sent in the output event to downstream
// components.
// nolint:nakedret
func (r *Inputer) processStateBefore(
ctx context.Context,
input *api.InputRoomEvent,
missingPrev bool,
) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) {
historyVisibility = gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive.
event := input.Event.Unwrap()
isCreateEvent := event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("")
var stateBeforeEvent []*gomatrixserverlib.Event
switch {
case isCreateEvent:
// There's no state before a create event so there is nothing
// else to do.
return
case input.HasState:
// If we're overriding the state then we need to go and retrieve
// them from the database. It's a hard error if they are missing.
stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs)
if err != nil {
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
}
stateBeforeEvent = make([]*gomatrixserverlib.Event, 0, len(stateEvents))
for _, entry := range stateEvents {
stateBeforeEvent = append(stateBeforeEvent, entry.Event)
}
case missingPrev:
// We don't know all of the prev events, so we can't work out
// the state before the event. Reject it in that case.
rejectionErr = fmt.Errorf("event %q has missing prev events", event.EventID())
return
case len(event.PrevEventIDs()) == 0:
// There should be prev events since it's not a create event.
// A non-create event that claims to have no prev events is
// invalid, so reject it.
rejectionErr = fmt.Errorf("event %q must have prev events", event.EventID())
return
default:
// For all non-create events, there must be prev events, so we'll
// ask the query API for the relevant tuples needed for auth. We
// will include the history visibility here even though we don't
// actually need it for auth, because we want to send it in the
// output events.
tuplesNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event}).Tuples()
tuplesNeeded = append(tuplesNeeded, gomatrixserverlib.StateKeyTuple{
EventType: gomatrixserverlib.MRoomHistoryVisibility,
StateKey: "",
})
stateBeforeReq := &api.QueryStateAfterEventsRequest{
RoomID: event.RoomID(),
PrevEventIDs: event.PrevEventIDs(),
StateToFetch: tuplesNeeded,
}
stateBeforeRes := &api.QueryStateAfterEventsResponse{}
if err := r.Queryer.QueryStateAfterEvents(ctx, stateBeforeReq, stateBeforeRes); err != nil {
return "", nil, fmt.Errorf("r.Queryer.QueryStateAfterEvents: %w", err)
}
switch {
case !stateBeforeRes.RoomExists:
rejectionErr = fmt.Errorf("room %q does not exist", event.RoomID())
return
case !stateBeforeRes.PrevEventsExist:
rejectionErr = fmt.Errorf("prev events of %q are not known", event.EventID())
return
default:
stateBeforeEvent = gomatrixserverlib.UnwrapEventHeaders(stateBeforeRes.StateEvents)
}
}
// At this point, stateBeforeEvent should be populated either by
// the supplied state in the input request, or from the prev events.
// Check whether the event is allowed or not.
stateBeforeAuth := gomatrixserverlib.NewAuthEvents(stateBeforeEvent)
if rejectionErr = gomatrixserverlib.Allowed(event, &stateBeforeAuth); rejectionErr != nil {
return
}
// Work out what the history visibility was at the time of the
// event.
for _, event := range stateBeforeEvent {
if event.Type() != gomatrixserverlib.MRoomHistoryVisibility || !event.StateKeyEquals("") {
continue
}
if hisVis, err := event.HistoryVisibility(); err == nil {
historyVisibility = hisVis
break
}
}
return
}
// fetchAuthEvents will check to see if any of the
// auth events specified by the given event are unknown. If they are
// then we will go off and request them from the federation and then

View file

@ -56,6 +56,7 @@ func (r *Inputer) updateLatestEvents(
sendAsServer string,
transactionID *api.TransactionID,
rewritesState bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "updateLatestEvents")
defer span.Finish()
@ -69,15 +70,16 @@ func (r *Inputer) updateLatestEvents(
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
u := latestEventsUpdater{
ctx: ctx,
api: r,
updater: updater,
roomInfo: roomInfo,
stateAtEvent: stateAtEvent,
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
rewritesState: rewritesState,
ctx: ctx,
api: r,
updater: updater,
roomInfo: roomInfo,
stateAtEvent: stateAtEvent,
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
rewritesState: rewritesState,
historyVisibility: historyVisibility,
}
if err = u.doUpdateLatestEvents(); err != nil {
@ -119,6 +121,8 @@ type latestEventsUpdater struct {
// The snapshots of current state before and after processing this event
oldStateNID types.StateSnapshotNID
newStateNID types.StateSnapshotNID
// The history visibility of the event itself (from the state before the event).
historyVisibility gomatrixserverlib.HistoryVisibility
}
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
@ -365,12 +369,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
}
ore := api.OutputNewRoomEvent{
Event: u.event.Headered(u.roomInfo.RoomVersion),
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
SendAsServer: u.sendAsServer,
Event: u.event.Headered(u.roomInfo.RoomVersion),
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
SendAsServer: u.sendAsServer,
HistoryVisibility: u.historyVisibility,
}
eventIDMap, err := u.stateEventMap()

View file

@ -823,13 +823,39 @@ func (d *Database) handleRedactions(
return nil, "", nil
}
// Get the power level from the database, so we can verify the user is allowed to redact the event
powerLevels, err := d.GetStateEvent(ctx, event.RoomID(), gomatrixserverlib.MRoomPowerLevels, "")
if err != nil {
return nil, "", fmt.Errorf("d.GetStateEvent: %w", err)
}
pl, err := powerLevels.PowerLevels()
if err != nil {
return nil, "", fmt.Errorf("unable to get powerlevels for room: %w", err)
}
redactUser := pl.UserLevel(redactionEvent.Sender())
switch {
case redactUser >= pl.Redact:
// The power level of the redaction events sender is greater than or equal to the redact level.
case redactedEvent.Origin() == redactionEvent.Origin() && redactedEvent.Sender() == redactionEvent.Sender():
// The domain of the redaction events sender matches that of the original events sender.
default:
return nil, "", nil
}
// mark the event as redacted
if redactionsArePermanent {
redactedEvent.Event = redactedEvent.Redact()
}
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
if err != nil {
return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err)
}
if redactionsArePermanent {
redactedEvent.Event = redactedEvent.Redact()
// NOTSPEC: sytest relies on this unspecced field existing :(
err = redactedEvent.SetUnsignedField("redacted_by", redactionEvent.EventID())
if err != nil {
return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err)
}
// overwrite the eventJSON table
err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON())

View file

@ -15,8 +15,21 @@ func platformSanityChecks() {
// If we run out of file descriptors, we might run into problems accessing
// PostgreSQL amongst other things. Complain at startup if we think the
// number of file descriptors is too low.
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 {
warn := func(rLimit *syscall.Rlimit) {
logrus.Warnf("IMPORTANT: Process file descriptor limit is currently %d, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues", rLimit.Cur)
}
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 {
// The file descriptor count is too low. Let's try to raise it.
rLimit.Cur = 65535
if err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
// We failed to raise it, so log an error.
logrus.WithError(err).Warn("IMPORTANT: Failed to raise the file descriptor limit")
warn(&rLimit)
} else if err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err == nil && rLimit.Cur < 65535 {
// We think we successfully raised the limit, but a second call to
// get the limit told us that we didn't succeed. Log an error.
warn(&rLimit)
}
}
}

View file

@ -16,6 +16,7 @@ const (
var (
InputRoomEvent = "InputRoomEvent"
InputDeviceListUpdate = "InputDeviceListUpdate"
OutputRoomEvent = "OutputRoomEvent"
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
OutputKeyChangeEvent = "OutputKeyChangeEvent"
@ -45,6 +46,11 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: InputDeviceListUpdate,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputRoomEvent,
Retention: nats.InterestPolicy,

View file

@ -43,9 +43,6 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT
}
func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) {
}
func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) {
}
func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) {
}

View file

@ -97,7 +97,7 @@ func Context(
state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
// verify the user is allowed to see the context for this room/event
for _, x := range state {
var hisVis string
var hisVis gomatrixserverlib.HistoryVisibility
hisVis, err = x.HistoryVisibility()
if err != nil {
continue

View file

@ -716,6 +716,8 @@ PUT /rooms/:room_id/redact/:event_id/:txn_id is idempotent
Unnamed room comes with a name summary
Named room comes with just joined member count summary
Room summary only has 5 heroes
registration is idempotent, with username specified
Setting state twice is idempotent
Joining room twice is idempotent
Inbound federation can return missing events for shared visibility
Inbound federation can return missing events for shared visibility
Inbound federation ignores redactions from invalid servers room > v3