mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-08 14:43:09 -06:00
Merge branch 'main' into takwaiw/3782-login-publickey
This commit is contained in:
commit
dd9186734d
1
.github/workflows/dendrite.yml
vendored
1
.github/workflows/dendrite.yml
vendored
|
|
@ -17,6 +17,7 @@ jobs:
|
||||||
name: WASM build test
|
name: WASM build test
|
||||||
timeout-minutes: 5
|
timeout-minutes: 5
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
if: ${{ false }} # disable for now
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
|
||||||
|
|
|
||||||
17
CHANGES.md
17
CHANGES.md
|
|
@ -1,5 +1,22 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## Dendrite 0.8.9 (2022-07-01)
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* Incoming device list updates over federation are now queued in JetStream for processing so that they will no longer block incoming federation transactions and should never end up dropped, which will hopefully help E2EE reliability
|
||||||
|
* The `/context` endpoint now returns `"start"` and `"end"` parameters to allow pagination from a context call
|
||||||
|
* The `/messages` endpoint will no longer return `"end"` when there are no more messages remaining
|
||||||
|
* Deactivated user accounts will now leave all rooms automatically
|
||||||
|
* New admin endpoint `/_dendrite/admin/evacuateUser/{userID}` has been added for forcing a local user to leave all joined rooms
|
||||||
|
* Dendrite will now automatically attempt to raise the file descriptor limit at startup if it is too low
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
* A rare crash when retrieving remote device lists has been fixed
|
||||||
|
* Fixes a bug where events were not redacted properly over federation
|
||||||
|
* The `/invite` endpoints will now return an error instead of silently proceeding if the user ID is obviously malformed
|
||||||
|
|
||||||
## Dendrite 0.8.8 (2022-06-09)
|
## Dendrite 0.8.8 (2022-06-09)
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
|
|
||||||
2
build.sh
2
build.sh
|
|
@ -21,4 +21,4 @@ mkdir -p bin
|
||||||
|
|
||||||
CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/...
|
CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/...
|
||||||
|
|
||||||
CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone
|
# CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -34,7 +34,7 @@ require (
|
||||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
|
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.13
|
github.com/mattn/go-sqlite3 v1.14.13
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -549,8 +549,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a h1:jOkrb6twViAGTHHadA51sQwdloHT0Vx1MCptk9InTHo=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f h1:XF2+J6sOq07yhK1I7ItwsgRwXorjj7gqiCvgZ4dn8W8=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
|
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
|
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
|
||||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ var build string
|
||||||
const (
|
const (
|
||||||
VersionMajor = 0
|
VersionMajor = 0
|
||||||
VersionMinor = 8
|
VersionMinor = 8
|
||||||
VersionPatch = 8
|
VersionPatch = 9
|
||||||
VersionTag = "" // example: "rc1"
|
VersionTag = "" // example: "rc1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
|
err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/perform"
|
"github.com/matrix-org/dendrite/roomserver/internal/perform"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/producers"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -49,17 +51,21 @@ type RoomserverInternalAPI struct {
|
||||||
JetStream nats.JetStreamContext
|
JetStream nats.JetStreamContext
|
||||||
Durable string
|
Durable string
|
||||||
InputRoomEventTopic string // JetStream topic for new input room events
|
InputRoomEventTopic string // JetStream topic for new input room events
|
||||||
OutputRoomEventTopic string // JetStream topic for new output room events
|
OutputProducer *producers.RoomEventProducer
|
||||||
PerspectiveServerNames []gomatrixserverlib.ServerName
|
PerspectiveServerNames []gomatrixserverlib.ServerName
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRoomserverAPI(
|
func NewRoomserverAPI(
|
||||||
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
|
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
|
||||||
consumer nats.JetStreamContext, nc *nats.Conn,
|
js nats.JetStreamContext, nc *nats.Conn, inputRoomEventTopic string,
|
||||||
inputRoomEventTopic, outputRoomEventTopic string,
|
|
||||||
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
|
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
|
||||||
) *RoomserverInternalAPI {
|
) *RoomserverInternalAPI {
|
||||||
serverACLs := acls.NewServerACLs(roomserverDB)
|
serverACLs := acls.NewServerACLs(roomserverDB)
|
||||||
|
producer := &producers.RoomEventProducer{
|
||||||
|
Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent)),
|
||||||
|
JetStream: js,
|
||||||
|
ACLs: serverACLs,
|
||||||
|
}
|
||||||
a := &RoomserverInternalAPI{
|
a := &RoomserverInternalAPI{
|
||||||
ProcessContext: processCtx,
|
ProcessContext: processCtx,
|
||||||
DB: roomserverDB,
|
DB: roomserverDB,
|
||||||
|
|
@ -68,8 +74,8 @@ func NewRoomserverAPI(
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
PerspectiveServerNames: perspectiveServerNames,
|
PerspectiveServerNames: perspectiveServerNames,
|
||||||
InputRoomEventTopic: inputRoomEventTopic,
|
InputRoomEventTopic: inputRoomEventTopic,
|
||||||
OutputRoomEventTopic: outputRoomEventTopic,
|
OutputProducer: producer,
|
||||||
JetStream: consumer,
|
JetStream: js,
|
||||||
NATSClient: nc,
|
NATSClient: nc,
|
||||||
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
|
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
|
||||||
ServerACLs: serverACLs,
|
ServerACLs: serverACLs,
|
||||||
|
|
@ -92,19 +98,19 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
|
||||||
r.KeyRing = keyRing
|
r.KeyRing = keyRing
|
||||||
|
|
||||||
r.Inputer = &input.Inputer{
|
r.Inputer = &input.Inputer{
|
||||||
Cfg: r.Cfg,
|
Cfg: r.Cfg,
|
||||||
ProcessContext: r.ProcessContext,
|
ProcessContext: r.ProcessContext,
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
InputRoomEventTopic: r.InputRoomEventTopic,
|
InputRoomEventTopic: r.InputRoomEventTopic,
|
||||||
OutputRoomEventTopic: r.OutputRoomEventTopic,
|
OutputProducer: r.OutputProducer,
|
||||||
JetStream: r.JetStream,
|
JetStream: r.JetStream,
|
||||||
NATSClient: r.NATSClient,
|
NATSClient: r.NATSClient,
|
||||||
Durable: nats.Durable(r.Durable),
|
Durable: nats.Durable(r.Durable),
|
||||||
ServerName: r.Cfg.Matrix.ServerName,
|
ServerName: r.Cfg.Matrix.ServerName,
|
||||||
FSAPI: fsAPI,
|
FSAPI: fsAPI,
|
||||||
KeyRing: keyRing,
|
KeyRing: keyRing,
|
||||||
ACLs: r.ServerACLs,
|
ACLs: r.ServerACLs,
|
||||||
Queryer: r.Queryer,
|
Queryer: r.Queryer,
|
||||||
}
|
}
|
||||||
r.Inviter = &perform.Inviter{
|
r.Inviter = &perform.Inviter{
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
|
|
@ -199,7 +205,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
|
||||||
if len(outputEvents) == 0 {
|
if len(outputEvents) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return r.WriteOutputEvents(req.Event.RoomID(), outputEvents)
|
return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomserverInternalAPI) PerformLeave(
|
func (r *RoomserverInternalAPI) PerformLeave(
|
||||||
|
|
@ -215,7 +221,7 @@ func (r *RoomserverInternalAPI) PerformLeave(
|
||||||
if len(outputEvents) == 0 {
|
if len(outputEvents) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return r.WriteOutputEvents(req.RoomID, outputEvents)
|
return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomserverInternalAPI) PerformForget(
|
func (r *RoomserverInternalAPI) PerformForget(
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/acls"
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/producers"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
|
@ -37,16 +38,8 @@ import (
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/tidwall/gjson"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var keyContentFields = map[string]string{
|
|
||||||
"m.room.join_rules": "join_rule",
|
|
||||||
"m.room.history_visibility": "history_visibility",
|
|
||||||
"m.room.member": "membership",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inputer is responsible for consuming from the roomserver input
|
// Inputer is responsible for consuming from the roomserver input
|
||||||
// streams and processing the events. All input events are queued
|
// streams and processing the events. All input events are queued
|
||||||
// into a single NATS stream and the order is preserved strictly.
|
// into a single NATS stream and the order is preserved strictly.
|
||||||
|
|
@ -75,19 +68,19 @@ var keyContentFields = map[string]string{
|
||||||
// up, so they will do nothing until a new event comes in for B
|
// up, so they will do nothing until a new event comes in for B
|
||||||
// or C.
|
// or C.
|
||||||
type Inputer struct {
|
type Inputer struct {
|
||||||
Cfg *config.RoomServer
|
Cfg *config.RoomServer
|
||||||
ProcessContext *process.ProcessContext
|
ProcessContext *process.ProcessContext
|
||||||
DB storage.Database
|
DB storage.Database
|
||||||
NATSClient *nats.Conn
|
NATSClient *nats.Conn
|
||||||
JetStream nats.JetStreamContext
|
JetStream nats.JetStreamContext
|
||||||
Durable nats.SubOpt
|
Durable nats.SubOpt
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
FSAPI fedapi.RoomserverFederationAPI
|
FSAPI fedapi.RoomserverFederationAPI
|
||||||
KeyRing gomatrixserverlib.JSONVerifier
|
KeyRing gomatrixserverlib.JSONVerifier
|
||||||
ACLs *acls.ServerACLs
|
ACLs *acls.ServerACLs
|
||||||
InputRoomEventTopic string
|
InputRoomEventTopic string
|
||||||
OutputRoomEventTopic string
|
OutputProducer *producers.RoomEventProducer
|
||||||
workers sync.Map // room ID -> *worker
|
workers sync.Map // room ID -> *worker
|
||||||
|
|
||||||
Queryer *query.Queryer
|
Queryer *query.Queryer
|
||||||
}
|
}
|
||||||
|
|
@ -370,58 +363,6 @@ func (r *Inputer) InputRoomEvents(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOutputEvents implements OutputRoomEventWriter
|
|
||||||
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
|
|
||||||
var err error
|
|
||||||
for _, update := range updates {
|
|
||||||
msg := &nats.Msg{
|
|
||||||
Subject: r.OutputRoomEventTopic,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
msg.Header.Set(jetstream.RoomID, roomID)
|
|
||||||
msg.Data, err = json.Marshal(update)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
logger := log.WithFields(log.Fields{
|
|
||||||
"room_id": roomID,
|
|
||||||
"type": update.Type,
|
|
||||||
})
|
|
||||||
if update.NewRoomEvent != nil {
|
|
||||||
eventType := update.NewRoomEvent.Event.Type()
|
|
||||||
logger = logger.WithFields(log.Fields{
|
|
||||||
"event_type": eventType,
|
|
||||||
"event_id": update.NewRoomEvent.Event.EventID(),
|
|
||||||
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
|
|
||||||
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
|
|
||||||
"send_as_server": update.NewRoomEvent.SendAsServer,
|
|
||||||
"sender": update.NewRoomEvent.Event.Sender(),
|
|
||||||
})
|
|
||||||
if update.NewRoomEvent.Event.StateKey() != nil {
|
|
||||||
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
|
|
||||||
}
|
|
||||||
contentKey := keyContentFields[eventType]
|
|
||||||
if contentKey != "" {
|
|
||||||
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
|
|
||||||
if value.Exists() {
|
|
||||||
logger = logger.WithField("content_value", value.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
|
|
||||||
ev := update.NewRoomEvent.Event.Unwrap()
|
|
||||||
defer r.ACLs.OnServerACLUpdate(ev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
|
|
||||||
if _, err := r.JetStream.PublishMsg(msg); err != nil {
|
|
||||||
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: "dendrite",
|
Namespace: "dendrite",
|
||||||
|
|
|
||||||
|
|
@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||||
}
|
}
|
||||||
case api.KindOld:
|
case api.KindOld:
|
||||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeOldRoomEvent,
|
Type: api.OutputTypeOldRoomEvent,
|
||||||
OldRoomEvent: &api.OutputOldRoomEvent{
|
OldRoomEvent: &api.OutputOldRoomEvent{
|
||||||
|
|
@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
// so notify downstream components to redact this event - they should have it if they've
|
// so notify downstream components to redact this event - they should have it if they've
|
||||||
// been tracking our output log.
|
// been tracking our output log.
|
||||||
if redactedEventID != "" {
|
if redactedEventID != "" {
|
||||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeRedactedEvent,
|
Type: api.OutputTypeRedactedEvent,
|
||||||
RedactedEvent: &api.OutputRedactedEvent{
|
RedactedEvent: &api.OutputRedactedEvent{
|
||||||
|
|
|
||||||
|
|
@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||||
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
|
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
|
||||||
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -219,7 +219,7 @@ func (r *Admin) PerformAdminEvacuateUser(
|
||||||
if len(outputEvents) == 0 {
|
if len(outputEvents) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil {
|
if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil {
|
||||||
res.Error = &api.PerformError{
|
res.Error = &api.PerformError{
|
||||||
Code: api.PerformErrorBadRequest,
|
Code: api.PerformErrorBadRequest,
|
||||||
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
|
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/getsentry/sentry-go"
|
||||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
@ -206,8 +207,17 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
|
||||||
}
|
}
|
||||||
logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
|
logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
|
||||||
for _, res := range result {
|
for _, res := range result {
|
||||||
if res.Error != nil {
|
switch err := res.Error.(type) {
|
||||||
logger.WithError(res.Error).Warn("event failed PDU checks")
|
case nil:
|
||||||
|
case gomatrixserverlib.SignatureErr:
|
||||||
|
// The signature of the event might not be valid anymore, for example if
|
||||||
|
// the key ID was reused with a different signature.
|
||||||
|
logger.WithError(err).Errorf("event failed PDU checks, storing anyway")
|
||||||
|
case gomatrixserverlib.AuthChainErr, gomatrixserverlib.AuthRulesErr:
|
||||||
|
logger.WithError(err).Warn("event failed PDU checks")
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
logger.WithError(err).Warn("event failed PDU checks")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
missingMap[id] = res.Event
|
missingMap[id] = res.Event
|
||||||
|
|
@ -306,6 +316,7 @@ FederationHit:
|
||||||
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
|
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
|
||||||
return nil, lastErr
|
return nil, lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -366,19 +377,25 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c := gomatrixserverlib.FederatedStateProvider{
|
var lastErr error
|
||||||
FedClient: b.fsAPI,
|
for _, srv := range b.servers {
|
||||||
RememberAuthEvents: false,
|
c := gomatrixserverlib.FederatedStateProvider{
|
||||||
Server: b.servers[0],
|
FedClient: b.fsAPI,
|
||||||
|
RememberAuthEvents: false,
|
||||||
|
Server: srv,
|
||||||
|
}
|
||||||
|
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
|
||||||
|
if err != nil {
|
||||||
|
lastErr = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for eventID, ev := range result {
|
||||||
|
b.eventIDMap[eventID] = ev
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
|
sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
|
||||||
if err != nil {
|
return nil, lastErr
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for eventID, ev := range result {
|
|
||||||
b.eventIDMap[eventID] = ev
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServersAtEvent is called when trying to determine which server to request from.
|
// ServersAtEvent is called when trying to determine which server to request from.
|
||||||
|
|
|
||||||
|
|
@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
||||||
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
|
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
|
err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeNewInboundPeek,
|
Type: api.OutputTypeNewInboundPeek,
|
||||||
NewInboundPeek: &api.OutputNewInboundPeek{
|
NewInboundPeek: &api.OutputNewInboundPeek{
|
||||||
|
|
|
||||||
|
|
@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID(
|
||||||
|
|
||||||
// TODO: handle federated peeks
|
// TODO: handle federated peeks
|
||||||
|
|
||||||
err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
|
err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeNewPeek,
|
Type: api.OutputTypeNewPeek,
|
||||||
NewPeek: &api.OutputNewPeek{
|
NewPeek: &api.OutputNewPeek{
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID(
|
||||||
|
|
||||||
// TODO: handle federated peeks
|
// TODO: handle federated peeks
|
||||||
|
|
||||||
err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{
|
err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeRetirePeek,
|
Type: api.OutputTypeRetirePeek,
|
||||||
RetirePeek: &api.OutputRetirePeek{
|
RetirePeek: &api.OutputRetirePeek{
|
||||||
|
|
|
||||||
89
roomserver/producers/roomevent.go
Normal file
89
roomserver/producers/roomevent.go
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package producers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
var keyContentFields = map[string]string{
|
||||||
|
"m.room.join_rules": "join_rule",
|
||||||
|
"m.room.history_visibility": "history_visibility",
|
||||||
|
"m.room.member": "membership",
|
||||||
|
}
|
||||||
|
|
||||||
|
type RoomEventProducer struct {
|
||||||
|
Topic string
|
||||||
|
ACLs *acls.ServerACLs
|
||||||
|
JetStream nats.JetStreamContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
|
||||||
|
var err error
|
||||||
|
for _, update := range updates {
|
||||||
|
msg := &nats.Msg{
|
||||||
|
Subject: r.Topic,
|
||||||
|
Header: nats.Header{},
|
||||||
|
}
|
||||||
|
msg.Header.Set(jetstream.RoomID, roomID)
|
||||||
|
msg.Data, err = json.Marshal(update)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logger := log.WithFields(log.Fields{
|
||||||
|
"room_id": roomID,
|
||||||
|
"type": update.Type,
|
||||||
|
})
|
||||||
|
if update.NewRoomEvent != nil {
|
||||||
|
eventType := update.NewRoomEvent.Event.Type()
|
||||||
|
logger = logger.WithFields(log.Fields{
|
||||||
|
"event_type": eventType,
|
||||||
|
"event_id": update.NewRoomEvent.Event.EventID(),
|
||||||
|
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
|
||||||
|
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
|
||||||
|
"send_as_server": update.NewRoomEvent.SendAsServer,
|
||||||
|
"sender": update.NewRoomEvent.Event.Sender(),
|
||||||
|
})
|
||||||
|
if update.NewRoomEvent.Event.StateKey() != nil {
|
||||||
|
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
|
||||||
|
}
|
||||||
|
contentKey := keyContentFields[eventType]
|
||||||
|
if contentKey != "" {
|
||||||
|
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
|
||||||
|
if value.Exists() {
|
||||||
|
logger = logger.WithField("content_value", value.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
|
||||||
|
ev := update.NewRoomEvent.Event.Unwrap()
|
||||||
|
defer r.ACLs.OnServerACLUpdate(ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.Tracef("Producing to topic '%s'", r.Topic)
|
||||||
|
if _, err := r.JetStream.PublishMsg(msg); err != nil {
|
||||||
|
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -55,7 +55,6 @@ func NewInternalAPI(
|
||||||
return internal.NewRoomserverAPI(
|
return internal.NewRoomserverAPI(
|
||||||
base.ProcessContext, cfg, roomserverDB, js, nc,
|
base.ProcessContext, cfg, roomserverDB, js, nc,
|
||||||
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
||||||
cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
|
||||||
base.Caches, perspectiveServerNames,
|
base.Caches, perspectiveServerNames,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
package routing
|
package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -25,6 +26,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
roomserver "github.com/matrix-org/dendrite/roomserver/api"
|
roomserver "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
|
@ -149,13 +151,30 @@ func Context(
|
||||||
if len(response.State) > filter.Limit {
|
if len(response.State) > filter.Limit {
|
||||||
response.State = response.State[len(response.State)-filter.Limit:]
|
response.State = response.State[len(response.State)-filter.Limit:]
|
||||||
}
|
}
|
||||||
|
start, end, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter)
|
||||||
|
if err == nil {
|
||||||
|
response.End = end.String()
|
||||||
|
response.Start = start.String()
|
||||||
|
}
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: response,
|
JSON: response,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
|
||||||
|
if len(startEvents) > 0 {
|
||||||
|
start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(endEvents) > 0 {
|
||||||
|
end, err = syncDB.EventPositionInTopology(ctx, endEvents[0].EventID())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func applyLazyLoadMembers(
|
func applyLazyLoadMembers(
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
filter *gomatrixserverlib.RoomEventFilter,
|
filter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
|
|
||||||
|
|
@ -48,3 +48,4 @@ Notifications can be viewed with GET /notifications
|
||||||
# More flakey
|
# More flakey
|
||||||
|
|
||||||
If remote user leaves room we no longer receive device updates
|
If remote user leaves room we no longer receive device updates
|
||||||
|
Guest users can join guest_access rooms
|
||||||
|
|
|
||||||
|
|
@ -241,7 +241,6 @@ Inbound federation can receive v2 /send_join
|
||||||
Message history can be paginated
|
Message history can be paginated
|
||||||
Backfill works correctly with history visibility set to joined
|
Backfill works correctly with history visibility set to joined
|
||||||
Guest user cannot call /events globally
|
Guest user cannot call /events globally
|
||||||
Guest users can join guest_access rooms
|
|
||||||
Guest user can set display names
|
Guest user can set display names
|
||||||
Guest user cannot upgrade other users
|
Guest user cannot upgrade other users
|
||||||
Guest non-joined user cannot call /events on shared room
|
Guest non-joined user cannot call /events on shared room
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue