Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/fts

This commit is contained in:
Till Faelligen 2022-09-07 18:17:08 +02:00
commit c6c6bca3a8
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
22 changed files with 270 additions and 119 deletions

View file

@ -5,9 +5,10 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/federationapi/types"
) )
// FederationInternalAPI is used to query information from the federation sender. // FederationInternalAPI is used to query information from the federation sender.
@ -108,6 +109,7 @@ type FederationClientError struct {
Err string Err string
RetryAfter time.Duration RetryAfter time.Duration
Blacklisted bool Blacklisted bool
Code int // HTTP Status code from the remote server
} }
func (e FederationClientError) Error() string { func (e FederationClientError) Error() string {

View file

@ -18,6 +18,8 @@ import (
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/consumers"
@ -33,10 +35,10 @@ import (
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/gomatrixserverlib"
) )
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions // AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
@ -66,6 +68,7 @@ func AddPublicRoutes(
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
TopicSigningKeyUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
UserAPI: userAPI, UserAPI: userAPI,
} }

View file

@ -6,10 +6,12 @@ import (
"net/http" "net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/httputil"
) )
// AddRoutes adds the FederationInternalAPI handlers to the http.ServeMux. // AddRoutes adds the FederationInternalAPI handlers to the http.ServeMux.
@ -229,6 +231,10 @@ func federationClientError(err error) error {
return &ferr return &ferr
case *api.FederationClientError: case *api.FederationClientError:
return ferr return ferr
case gomatrix.HTTPError:
return &api.FederationClientError{
Code: ferr.Code,
}
default: default:
return &api.FederationClientError{ return &api.FederationClientError{
Err: err.Error(), Err: err.Error(),

View file

@ -21,12 +21,13 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
) )
// SyncAPIProducer produces events for the sync API server to consume // SyncAPIProducer produces events for the sync API server to consume
@ -36,6 +37,7 @@ type SyncAPIProducer struct {
TopicTypingEvent string TopicTypingEvent string
TopicPresenceEvent string TopicPresenceEvent string
TopicDeviceListUpdate string TopicDeviceListUpdate string
TopicSigningKeyUpdate string
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI UserAPI userapi.UserInternalAPI
@ -165,16 +167,24 @@ func (p *SyncAPIProducer) SendPresence(
} }
func (p *SyncAPIProducer) SendDeviceListUpdate( func (p *SyncAPIProducer) SendDeviceListUpdate(
ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent, ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin string,
) (err error) { ) (err error) {
m := nats.NewMsg(p.TopicDeviceListUpdate) m := nats.NewMsg(p.TopicDeviceListUpdate)
m.Header.Set(jetstream.UserID, deviceListUpdate.UserID) m.Header.Set("origin", origin)
m.Data, err = json.Marshal(deviceListUpdate) m.Data = deviceListUpdate
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}
log.Debugf("Sending device list update: %+v", m.Header) log.Debugf("Sending device list update: %+v", m.Header)
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx)) _, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
return err return err
} }
func (p *SyncAPIProducer) SendSigningKeyUpdate(
ctx context.Context, data gomatrixserverlib.RawJSON, origin gomatrixserverlib.ServerName,
) (err error) {
m := nats.NewMsg(p.TopicSigningKeyUpdate)
m.Header.Set("origin", string(origin))
m.Data = data
log.Debugf("Sending signing key update")
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}

View file

@ -22,6 +22,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/federationapi/producers"
@ -31,10 +36,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
syncTypes "github.com/matrix-org/dendrite/syncapi/types" syncTypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
) )
const ( const (
@ -358,7 +359,9 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
case gomatrixserverlib.MDeviceListUpdate: case gomatrixserverlib.MDeviceListUpdate:
t.processDeviceListUpdate(ctx, e) if err := t.producer.SendDeviceListUpdate(ctx, e.Content, e.Origin); err != nil {
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
}
case gomatrixserverlib.MReceipt: case gomatrixserverlib.MReceipt:
// https://matrix.org/docs/spec/server_server/r0.1.4#receipts // https://matrix.org/docs/spec/server_server/r0.1.4#receipts
payload := map[string]types.FederationReceiptMRead{} payload := map[string]types.FederationReceiptMRead{}
@ -391,7 +394,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
case types.MSigningKeyUpdate: case types.MSigningKeyUpdate:
if err := t.processSigningKeyUpdate(ctx, e); err != nil { if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
logrus.WithError(err).Errorf("Failed to process signing key update") logrus.WithError(err).Errorf("Failed to process signing key update")
} }
case gomatrixserverlib.MPresence: case gomatrixserverlib.MPresence:
@ -431,42 +434,6 @@ func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) e
return nil return nil
} }
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
var updatePayload keyapi.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
}).Debug("Failed to unmarshal signing key update")
return err
}
if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
return nil
} else if serverName == t.ourServerName {
return nil
} else if serverName != t.Origin {
return nil
}
keys := gomatrixserverlib.CrossSigningKeys{}
if updatePayload.MasterKey != nil {
keys.MasterKey = *updatePayload.MasterKey
}
if updatePayload.SelfSigningKey != nil {
keys.SelfSigningKey = *updatePayload.SelfSigningKey
}
uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
CrossSigningKeys: keys,
UserID: updatePayload.UserID,
}
uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
if err := t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes); err != nil {
return err
}
if uploadRes.Error != nil {
return uploadRes.Error
}
return nil
}
// processReceiptEvent sends receipt events to JetStream // processReceiptEvent sends receipt events to JetStream
func (t *txnReq) processReceiptEvent(ctx context.Context, func (t *txnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string, userID, roomID, receiptType string,
@ -489,21 +456,3 @@ func (t *txnReq) processReceiptEvent(ctx context.Context,
return nil return nil
} }
func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverlib.EDU) {
var payload gomatrixserverlib.DeviceListUpdateEvent
if err := json.Unmarshal(e.Content, &payload); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal device list update event")
return
}
if _, serverName, err := gomatrixserverlib.SplitID('@', payload.UserID); err != nil {
return
} else if serverName == t.ourServerName {
return
} else if serverName != t.Origin {
return
}
if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil {
util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
}
}

View file

@ -53,9 +53,9 @@ func (d *Database) AssociateEDUWithDestination(
// Keep EDUs for at least x minutes before deleting them // Keep EDUs for at least x minutes before deleting them
expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration)) expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration))
} }
// We forcibly set m.direct_to_device events to 0, as we always want them // We forcibly set m.direct_to_device and m.device_list_update events
// to be delivered. (required for E2EE) // to 0, as we always want them to be delivered. (required for E2EE)
if eduType == gomatrixserverlib.MDirectToDevice { if eduType == gomatrixserverlib.MDirectToDevice || eduType == gomatrixserverlib.MDeviceListUpdate {
expiresAt = 0 expiresAt = 0
} }
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {

4
go.mod
View file

@ -17,12 +17,11 @@ require (
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/kardianos/minwinsvc v1.0.0
github.com/lib/pq v1.10.5 github.com/lib/pq v1.10.5
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220905160024-206bfc07ea29 github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13 github.com/mattn/go-sqlite3 v1.14.13
@ -93,6 +92,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect
github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494 // indirect github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494 // indirect
github.com/kardianos/minwinsvc v1.0.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect github.com/klauspost/compress v1.15.9 // indirect
github.com/lucas-clemente/quic-go v0.28.1 // indirect github.com/lucas-clemente/quic-go v0.28.1 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect

4
go.sum
View file

@ -388,8 +388,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220905160024-206bfc07ea29 h1:55lVTCUNuq/WvTH6usVwrliqnxbziOjOCHeQs1tcSeg= github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661 h1:dww9rH0HVfAO9JOBD1nxq26GHKbEw07thAJTu1DrAQs=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220905160024-206bfc07ea29/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed h1:YMcCnrmTbT5M1LtTiagiFFaj9vEgvC6iVEzWsIb0tQQ= github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed h1:YMcCnrmTbT5M1LtTiagiFFaj9vEgvC6iVEzWsIb0tQQ=
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -53,6 +53,9 @@ func InitialPowerLevelsContent(roomCreator string) (c gomatrixserverlib.PowerLev
"m.room.history_visibility": 100, "m.room.history_visibility": 100,
"m.room.canonical_alias": 50, "m.room.canonical_alias": 50,
"m.room.avatar": 50, "m.room.avatar": 50,
"m.room.tombstone": 100,
"m.room.encryption": 100,
"m.room.server_acl": 100,
} }
c.Users = map[string]int64{roomCreator: 100} c.Users = map[string]int64{roomCreator: 100}
return c return c

View file

@ -28,7 +28,10 @@ import (
func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search { func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search {
t.Helper() t.Helper()
cfg := config.Fulltext{} cfg := config.Fulltext{}
cfg.Defaults(config.DefaultOpts{Generate: true, Monolithic: true}) cfg.Defaults(config.DefaultOpts{
Generate: true,
Monolithic: true,
})
if tempDir != "" { if tempDir != "" {
cfg.IndexPath = config.Path(tempDir) cfg.IndexPath = config.Path(tempDir)
cfg.InMemory = false cfg.InMemory = false

View file

@ -24,7 +24,7 @@ import (
"net/url" "net/url"
"strings" "strings"
opentracing "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext" "github.com/opentracing/opentracing-go/ext"
) )
@ -81,8 +81,8 @@ func PostJSON[reqtype, restype any, errtype error](
return fmt.Errorf("HTTP %d from %s (no response body)", res.StatusCode, apiURL) return fmt.Errorf("HTTP %d from %s (no response body)", res.StatusCode, apiURL)
} }
var reserr errtype var reserr errtype
if err = json.Unmarshal(body, reserr); err != nil { if err = json.Unmarshal(body, &reserr); err != nil {
return fmt.Errorf("HTTP %d from %s", res.StatusCode, apiURL) return fmt.Errorf("HTTP %d from %s - %w", res.StatusCode, apiURL, err)
} }
return reserr return reserr
} }

View file

@ -18,22 +18,24 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
) )
// DeviceListUpdateConsumer consumes device list updates that came in over federation. // DeviceListUpdateConsumer consumes device list updates that came in over federation.
type DeviceListUpdateConsumer struct { type DeviceListUpdateConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string durable string
topic string topic string
updater *internal.DeviceListUpdater updater *internal.DeviceListUpdater
serverName gomatrixserverlib.ServerName
} }
// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers. // NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
@ -44,11 +46,12 @@ func NewDeviceListUpdateConsumer(
updater *internal.DeviceListUpdater, updater *internal.DeviceListUpdater,
) *DeviceListUpdateConsumer { ) *DeviceListUpdateConsumer {
return &DeviceListUpdateConsumer{ return &DeviceListUpdateConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"), durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
updater: updater, updater: updater,
serverName: cfg.Matrix.ServerName,
} }
} }
@ -69,6 +72,15 @@ func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
logrus.WithError(err).Errorf("Failed to read from device list update input topic") logrus.WithError(err).Errorf("Failed to read from device list update input topic")
return true return true
} }
origin := gomatrixserverlib.ServerName(msg.Header.Get("origin"))
if _, serverName, err := gomatrixserverlib.SplitID('@', m.UserID); err != nil {
return true
} else if serverName == t.serverName {
return true
} else if serverName != origin {
return true
}
err := t.updater.Update(ctx, m) err := t.updater.Update(ctx, m)
if err != nil { if err != nil {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{

View file

@ -0,0 +1,110 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
)
// SigningKeyUpdateConsumer consumes signing key updates that came in over federation.
type SigningKeyUpdateConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
keyAPI *internal.KeyInternalAPI
cfg *config.KeyServer
}
// NewSigningKeyUpdateConsumer creates a new SigningKeyUpdateConsumer. Call Start() to begin consuming from key servers.
func NewSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.KeyServer,
js nats.JetStreamContext,
keyAPI *internal.KeyInternalAPI,
) *SigningKeyUpdateConsumer {
return &SigningKeyUpdateConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Prefixed("KeyServerSigningKeyConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
keyAPI: keyAPI,
cfg: cfg,
}
}
// Start consuming from key servers
func (t *SigningKeyUpdateConsumer) Start() error {
return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, 1,
t.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called in response to a message received on the
// signing key update events topic from the key server.
func (t *SigningKeyUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
var updatePayload keyapi.CrossSigningKeyUpdate
if err := json.Unmarshal(msg.Data, &updatePayload); err != nil {
logrus.WithError(err).Errorf("Failed to read from signing key update input topic")
return true
}
origin := gomatrixserverlib.ServerName(msg.Header.Get("origin"))
if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
logrus.WithError(err).Error("failed to split user id")
return true
} else if serverName == t.cfg.Matrix.ServerName {
logrus.Warn("dropping device key update from ourself")
return true
} else if serverName != origin {
logrus.Warnf("dropping device key update, %s != %s", serverName, origin)
return true
}
keys := gomatrixserverlib.CrossSigningKeys{}
if updatePayload.MasterKey != nil {
keys.MasterKey = *updatePayload.MasterKey
}
if updatePayload.SelfSigningKey != nil {
keys.SelfSigningKey = *updatePayload.SelfSigningKey
}
uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
CrossSigningKeys: keys,
UserID: updatePayload.UserID,
}
uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
if err := t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes); err != nil {
logrus.WithError(err).Error("failed to upload device keys")
return false
}
if uploadRes.Error != nil {
logrus.WithError(uploadRes.Error).Error("failed to upload device keys")
return true
}
return true
}

View file

@ -19,9 +19,11 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"net"
"sync" "sync"
"time" "time"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -388,6 +390,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
return waitTime, true return waitTime, true
} }
failCount := 0 failCount := 0
userLoop:
for _, userID := range userIDs { for _, userID := range userIDs {
if ctx.Err() != nil { if ctx.Err() != nil {
// we've timed out, give up and go to the back of the queue to let another server be processed. // we've timed out, give up and go to the back of the queue to let another server be processed.
@ -397,19 +401,40 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
if err != nil { if err != nil {
failCount += 1 failCount += 1
fcerr, ok := err.(*fedsenderapi.FederationClientError) switch e := err.(type) {
if ok { case *fedsenderapi.FederationClientError:
if fcerr.RetryAfter > 0 { if e.RetryAfter > 0 {
waitTime = fcerr.RetryAfter waitTime = e.RetryAfter
} else if fcerr.Blacklisted { } else if e.Blacklisted {
waitTime = time.Hour * 8 waitTime = time.Hour * 8
} else { break userLoop
// For all other errors (DNS resolution, network etc.) wait 1 hour. } else if e.Code >= 300 {
// We didn't get a real FederationClientError (e.g. in polylith mode, where gomatrix.HTTPError
// are "converted" to FederationClientError), but we probably shouldn't hit them every $waitTime seconds.
waitTime = time.Hour waitTime = time.Hour
break userLoop
} }
} else { case net.Error:
waitTime = time.Hour // Use the default waitTime, if it's a timeout.
logger.WithError(err).WithField("user_id", userID).Debug("GetUserDevices returned unknown error type") // It probably doesn't make sense to try further users.
if !e.Timeout() {
waitTime = time.Minute * 10
logrus.WithError(e).Error("GetUserDevices returned net.Error")
break userLoop
}
case gomatrix.HTTPError:
// The remote server returned an error, give it some time to recover.
// This is to avoid spamming remote servers, which may not be Matrix servers anymore.
if e.Code >= 300 {
waitTime = time.Hour
logrus.WithError(e).Error("GetUserDevices returned gomatrix.HTTPError")
break userLoop
}
default:
// Something else failed
waitTime = time.Minute * 10
logger.WithError(err).WithField("user_id", userID).Debugf("GetUserDevices returned unknown error type: %T", err)
break userLoop
} }
continue continue
} }
@ -437,7 +462,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
} }
} }
if failCount > 0 { if failCount > 0 {
logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Warn("Failed to query device keys for some users") logger.WithFields(logrus.Fields{
"total": len(userIDs),
"failed": failCount,
"skipped": len(userIDs) - failCount,
"waittime": waitTime,
}).Warn("Failed to query device keys for some users")
} }
for _, userID := range userIDs { for _, userID := range userIDs {
// always clear the channel to unblock Update calls regardless of success/failure // always clear the channel to unblock Update calls regardless of success/failure

View file

@ -16,6 +16,8 @@ package keyserver
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/sirupsen/logrus"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/consumers"
@ -26,7 +28,6 @@ import (
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/sirupsen/logrus"
) )
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions // AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
@ -72,5 +73,12 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start device list consumer") logrus.WithError(err).Panic("failed to start device list consumer")
} }
sigConsumer := consumers.NewSigningKeyUpdateConsumer(
base.ProcessContext, cfg, js, ap,
)
if err := sigConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start signing key consumer")
}
return ap return ap
} }

View file

@ -287,11 +287,14 @@ func (u *latestEventsUpdater) latestState() error {
}).Warnf("State reset detected (removing %d events)", removed) }).Warnf("State reset detected (removing %d events)", removed)
sentry.WithScope(func(scope *sentry.Scope) { sentry.WithScope(func(scope *sentry.Scope) {
scope.SetLevel("warning") scope.SetLevel("warning")
scope.SetTag("event_id", u.event.EventID()) scope.SetContext("State reset", map[string]interface{}{
scope.SetTag("old_state_nid", fmt.Sprintf("%d", u.oldStateNID)) "Event ID": u.event.EventID(),
scope.SetTag("new_state_nid", fmt.Sprintf("%d", u.newStateNID)) "Old state NID": fmt.Sprintf("%d", u.oldStateNID),
scope.SetTag("old_latest", u.oldLatest.EventIDs()) "New state NID": fmt.Sprintf("%d", u.newStateNID),
scope.SetTag("new_latest", u.latest.EventIDs()) "Old latest": u.oldLatest.EventIDs(),
"New latest": u.latest.EventIDs(),
"State removed": removed,
})
sentry.CaptureMessage("State reset detected") sentry.CaptureMessage("State reset detected")
}) })
} }

View file

@ -38,6 +38,7 @@ import (
"golang.org/x/net/http2/h2c" "golang.org/x/net/http2/h2c"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -91,6 +92,7 @@ type BaseDendrite struct {
Database *sql.DB Database *sql.DB
DatabaseWriter sqlutil.Writer DatabaseWriter sqlutil.Writer
EnableMetrics bool EnableMetrics bool
Fulltext *fulltext.Search
startupLock sync.Mutex startupLock sync.Mutex
Fulltext *fulltext.Search Fulltext *fulltext.Search
} }

View file

@ -29,6 +29,7 @@ func (c *SyncAPI) Defaults(opts DefaultOpts) {
} }
func (c *SyncAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *SyncAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
c.Fulltext.Verify(configErrs, isMonolith)
if isMonolith { // polylith required configs below if isMonolith { // polylith required configs below
return return
} }

View file

@ -17,6 +17,7 @@ const (
var ( var (
InputRoomEvent = "InputRoomEvent" InputRoomEvent = "InputRoomEvent"
InputDeviceListUpdate = "InputDeviceListUpdate" InputDeviceListUpdate = "InputDeviceListUpdate"
InputSigningKeyUpdate = "InputSigningKeyUpdate"
OutputRoomEvent = "OutputRoomEvent" OutputRoomEvent = "OutputRoomEvent"
OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent"
@ -52,6 +53,11 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,
Storage: nats.FileStorage, Storage: nats.FileStorage,
}, },
{
Name: InputSigningKeyUpdate,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{ {
Name: OutputRoomEvent, Name: OutputRoomEvent,
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,

View file

@ -49,3 +49,6 @@ Notifications can be viewed with GET /notifications
If remote user leaves room we no longer receive device updates If remote user leaves room we no longer receive device updates
Guest users can join guest_access rooms Guest users can join guest_access rooms
# This will fail in HTTP API mode, so blacklisted for now
If a device list update goes missing, the server resyncs on the next one

View file

@ -741,4 +741,4 @@ Newly joined room includes presence in incremental sync
User in private room doesn't appear in user directory User in private room doesn't appear in user directory
User joining then leaving public room appears and dissappears from directory User joining then leaving public room appears and dissappears from directory
User in remote room doesn't appear in user directory after server left room User in remote room doesn't appear in user directory after server left room
User in shared private room does appear in user directory until leave User in shared private room does appear in user directory until leave

View file

@ -46,10 +46,10 @@ func CreateBaseDendrite(t *testing.T, dbType test.DBType) (*base.BaseDendrite, f
Generate: true, Generate: true,
Monolithic: true, Monolithic: true,
}) })
cfg.SyncAPI.Fulltext.Defaults(config.DefaultOpts{ cfg.SyncAPI.Fulltext.Defaults(config.DefaultOpts{ // use in memory fts
Generate: true, Generate: true,
Monolithic: true, Monolithic: true,
}) // use in memory fts })
cfg.Global.ServerName = "test" cfg.Global.ServerName = "test"
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
// the file system event with InMemory=true :( // the file system event with InMemory=true :(