mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-27 00:31:55 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/consent-tracking
This commit is contained in:
commit
79e1c9e4bd
34
CHANGES.md
34
CHANGES.md
|
@ -1,5 +1,39 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## Dendrite 0.6.4 (2022-02-21)
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* All Client-Server API endpoints are now available under the `/v3` namespace
|
||||||
|
* The `/whoami` response format now matches the latest Matrix spec version
|
||||||
|
* Support added for the `/context` endpoint, which should help clients to render quote-replies correctly
|
||||||
|
* Accounts now have an optional account type field, allowing admin accounts to be created
|
||||||
|
* Server notices are now supported
|
||||||
|
* Refactored the user API storage to deduplicate a significant amount of code, as well as merging both user API databases into a single database
|
||||||
|
* The account database is now used for all user API storage and the device database is now obsolete
|
||||||
|
* For some installations that have separate account and device databases, this may result in access tokens being revoked and client sessions being logged out — users may need to log in again
|
||||||
|
* The above can be avoided by moving the `device_devices` table into the account database manually
|
||||||
|
* Guest registration can now be separately disabled with the new `client_api.guests_disabled` configuration option
|
||||||
|
* Outbound connections now obey proxy settings from the environment, deprecating the `federation_api.proxy_outbound` configuration options
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
* The roomserver input API will now strictly consume only one database transaction per room, which should prevent situations where the roomserver can deadlock waiting for database connections to become available
|
||||||
|
* Room joins will now fall back to federation if the local room state is insufficient to create a membership event
|
||||||
|
* Create events are now correctly filtered from federation `/send` transactions
|
||||||
|
* Excessive logging when federation is disabled should now be fixed
|
||||||
|
* Dendrite will no longer panic if trying to retire an invite event that has not been seen yet
|
||||||
|
* The device list updater will now wait for longer after a connection issue, rather than flooding the logs with errors
|
||||||
|
* The device list updater will no longer produce unnecessary output events for federated key updates with no changes, which should help to reduce CPU usage
|
||||||
|
* Local device name changes will now generate key change events correctly
|
||||||
|
* The sync API will now try to share device list update notifications even if all state key NIDs cannot be fetched
|
||||||
|
* An off-by-one error in the sync stream token handling which could result in a crash has been fixed
|
||||||
|
* State events will no longer be re-sent unnecessary by the roomserver to other components if they have already been sent, which should help to reduce the NATS message sizes on the roomserver output topic in some cases
|
||||||
|
* The roomserver input API now uses the process context and should handle graceful shutdowns better
|
||||||
|
* Guest registration is now correctly disabled when the `client_api.registration_disabled` configuration option is set
|
||||||
|
* One-time encryption keys are now cleaned up correctly when a device is logged out or removed
|
||||||
|
* Invalid state snapshots in the state storage refactoring migration are now reset rather than causing a panic at startup
|
||||||
|
|
||||||
## Dendrite 0.6.3 (2022-02-10)
|
## Dendrite 0.6.3 (2022-02-10)
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
|
|
@ -235,7 +235,7 @@ func OnIncomingStateTypeRequest(
|
||||||
}
|
}
|
||||||
// If the user has never been in the room then stop at this point.
|
// If the user has never been in the room then stop at this point.
|
||||||
// We won't tell the user about a room they have never joined.
|
// We won't tell the user about a room they have never joined.
|
||||||
if !membershipRes.HasBeenInRoom {
|
if !membershipRes.HasBeenInRoom || membershipRes.Membership == gomatrixserverlib.Ban {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusForbidden,
|
Code: http.StatusForbidden,
|
||||||
JSON: jsonerror.Forbidden(fmt.Sprintf("Unknown room %q or user %q has never joined this room", roomID, device.UserID)),
|
JSON: jsonerror.Forbidden(fmt.Sprintf("Unknown room %q or user %q has never joined this room", roomID, device.UserID)),
|
||||||
|
|
|
@ -554,7 +554,6 @@ func (r *FederationInternalAPI) PerformInvite(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("r.federation.SendInviteV2: failed to send invite: %w", err)
|
return fmt.Errorf("r.federation.SendInviteV2: failed to send invite: %w", err)
|
||||||
}
|
}
|
||||||
logrus.Infof("GOT INVITE RESPONSE %s", string(inviteRes.Event))
|
|
||||||
|
|
||||||
inviteEvent, err := inviteRes.Event.UntrustedEvent(request.RoomVersion)
|
inviteEvent, err := inviteRes.Event.UntrustedEvent(request.RoomVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -40,7 +40,7 @@ require (
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220214133635-20632dd262ed
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220214133635-20632dd262ed
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf
|
github.com/matrix-org/pinecone v0.0.0-20220223104432-0f0afd1a46aa
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.10
|
github.com/mattn/go-sqlite3 v1.14.10
|
||||||
github.com/morikuni/aec v1.0.0 // indirect
|
github.com/morikuni/aec v1.0.0 // indirect
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -985,8 +985,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220214133635-20632dd262ed h1:R8EiLWArq7KT96DrUq1xq9scPh8vLwKKeCTnORPyjhU=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220214133635-20632dd262ed h1:R8EiLWArq7KT96DrUq1xq9scPh8vLwKKeCTnORPyjhU=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220214133635-20632dd262ed/go.mod h1:qFvhfbQ5orQxlH9vCiFnP4dW27xxnWHdNUBKyj/fbiY=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220214133635-20632dd262ed/go.mod h1:qFvhfbQ5orQxlH9vCiFnP4dW27xxnWHdNUBKyj/fbiY=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf h1:/nqfHUdQHr3WVdbZieaYFvHF1rin5pvDTa/NOZ/qCyE=
|
github.com/matrix-org/pinecone v0.0.0-20220223104432-0f0afd1a46aa h1:rMYFNVto66gp+eWS8XAUzgp4m0qmUBid6l1HX3mHstk=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
|
github.com/matrix-org/pinecone v0.0.0-20220223104432-0f0afd1a46aa/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
|
||||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
|
|
|
@ -17,7 +17,7 @@ var build string
|
||||||
const (
|
const (
|
||||||
VersionMajor = 0
|
VersionMajor = 0
|
||||||
VersionMinor = 6
|
VersionMinor = 6
|
||||||
VersionPatch = 3
|
VersionPatch = 4
|
||||||
VersionTag = "" // example: "rc1"
|
VersionTag = "" // example: "rc1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -73,6 +74,26 @@ type DeviceMessage struct {
|
||||||
DeviceChangeID int64
|
DeviceChangeID int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeviceKeysEqual returns true if the device keys updates contain the
|
||||||
|
// same display name and key JSON. This will return false if either of
|
||||||
|
// the updates is not a device keys update, or if the user ID/device ID
|
||||||
|
// differ between the two.
|
||||||
|
func (m1 *DeviceMessage) DeviceKeysEqual(m2 *DeviceMessage) bool {
|
||||||
|
if m1.DeviceKeys == nil || m2.DeviceKeys == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if m1.UserID != m2.UserID || m1.DeviceID != m2.DeviceID {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if m1.DisplayName != m2.DisplayName {
|
||||||
|
return false // different display names
|
||||||
|
}
|
||||||
|
if len(m1.KeyJSON) == 0 || len(m2.KeyJSON) == 0 {
|
||||||
|
return false // either is empty
|
||||||
|
}
|
||||||
|
return bytes.Equal(m1.KeyJSON, m2.KeyJSON)
|
||||||
|
}
|
||||||
|
|
||||||
// DeviceKeys represents a set of device keys for a single device
|
// DeviceKeys represents a set of device keys for a single device
|
||||||
// https://matrix.org/docs/spec/client_server/r0.6.1#post-matrix-client-r0-keys-upload
|
// https://matrix.org/docs/spec/client_server/r0.6.1#post-matrix-client-r0-keys-upload
|
||||||
type DeviceKeys struct {
|
type DeviceKeys struct {
|
||||||
|
|
|
@ -224,7 +224,7 @@ func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib.
|
||||||
}).Info("DeviceListUpdater.Update")
|
}).Info("DeviceListUpdater.Update")
|
||||||
|
|
||||||
// if we haven't missed anything update the database and notify users
|
// if we haven't missed anything update the database and notify users
|
||||||
if exists {
|
if exists || event.Deleted {
|
||||||
k := event.Keys
|
k := event.Keys
|
||||||
if event.Deleted {
|
if event.Deleted {
|
||||||
k = nil
|
k = nil
|
||||||
|
@ -241,14 +241,33 @@ func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib.
|
||||||
StreamID: event.StreamID,
|
StreamID: event.StreamID,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeviceKeysJSON will side-effect modify this, so it needs
|
||||||
|
// to be a copy, not sharing any pointers with the above.
|
||||||
|
deviceKeysCopy := *keys[0].DeviceKeys
|
||||||
|
deviceKeysCopy.KeyJSON = nil
|
||||||
|
existingKeys := []api.DeviceMessage{
|
||||||
|
{
|
||||||
|
Type: keys[0].Type,
|
||||||
|
DeviceKeys: &deviceKeysCopy,
|
||||||
|
StreamID: keys[0].StreamID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch what keys we had already and only emit changes
|
||||||
|
if err = u.db.DeviceKeysJSON(ctx, existingKeys); err != nil {
|
||||||
|
// non-fatal, log and continue
|
||||||
|
util.GetLogger(ctx).WithError(err).WithField("user_id", event.UserID).Errorf(
|
||||||
|
"failed to query device keys json for calculating diffs",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
err = u.db.StoreRemoteDeviceKeys(ctx, keys, nil)
|
err = u.db.StoreRemoteDeviceKeys(ctx, keys, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("failed to store remote device keys for %s (%s): %w", event.UserID, event.DeviceID, err)
|
return false, fmt.Errorf("failed to store remote device keys for %s (%s): %w", event.UserID, event.DeviceID, err)
|
||||||
}
|
}
|
||||||
// ALWAYS emit key changes when we've been poked over federation even if there's no change
|
|
||||||
// just in case this poke is important for something.
|
if err = emitDeviceKeyChanges(u.producer, existingKeys, keys, false); err != nil {
|
||||||
err = u.producer.ProduceKeyChanges(keys)
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("failed to produce device key changes for %s (%s): %w", event.UserID, event.DeviceID, err)
|
return false, fmt.Errorf("failed to produce device key changes for %s (%s): %w", event.UserID, event.DeviceID, err)
|
||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -454,7 +473,7 @@ func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevi
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to mark device list as fresh: %w", err)
|
return fmt.Errorf("failed to mark device list as fresh: %w", err)
|
||||||
}
|
}
|
||||||
err = emitDeviceKeyChanges(u.producer, existingKeys, keys)
|
err = emitDeviceKeyChanges(u.producer, existingKeys, keys, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to emit key changes for fresh device list: %w", err)
|
return fmt.Errorf("failed to emit key changes for fresh device list: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -648,7 +648,7 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = emitDeviceKeyChanges(a.Producer, existingKeys, keysToStore)
|
err = emitDeviceKeyChanges(a.Producer, existingKeys, keysToStore, req.OnlyDisplayNameUpdates)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err)
|
util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -710,7 +710,11 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func emitDeviceKeyChanges(producer KeyChangeProducer, existing, new []api.DeviceMessage) error {
|
func emitDeviceKeyChanges(producer KeyChangeProducer, existing, new []api.DeviceMessage, onlyUpdateDisplayName bool) error {
|
||||||
|
// if we only want to update the display names, we can skip the checks below
|
||||||
|
if onlyUpdateDisplayName {
|
||||||
|
return producer.ProduceKeyChanges(new)
|
||||||
|
}
|
||||||
// find keys in new that are not in existing
|
// find keys in new that are not in existing
|
||||||
var keysAdded []api.DeviceMessage
|
var keysAdded []api.DeviceMessage
|
||||||
for _, newKey := range new {
|
for _, newKey := range new {
|
||||||
|
@ -718,7 +722,7 @@ func emitDeviceKeyChanges(producer KeyChangeProducer, existing, new []api.Device
|
||||||
for _, existingKey := range existing {
|
for _, existingKey := range existing {
|
||||||
// Do not treat the absence of keys as equal, or else we will not emit key changes
|
// Do not treat the absence of keys as equal, or else we will not emit key changes
|
||||||
// when users delete devices which never had a key to begin with as both KeyJSONs are nil.
|
// when users delete devices which never had a key to begin with as both KeyJSONs are nil.
|
||||||
if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) && len(existingKey.KeyJSON) > 0 {
|
if existingKey.DeviceKeysEqual(&newKey) {
|
||||||
exists = true
|
exists = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,22 +20,17 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
type checkForAuthAndSoftFailStorage interface {
|
|
||||||
state.StateResolutionStorage
|
|
||||||
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
|
|
||||||
RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckForSoftFail returns true if the event should be soft-failed
|
// CheckForSoftFail returns true if the event should be soft-failed
|
||||||
// and false otherwise. The return error value should be checked before
|
// and false otherwise. The return error value should be checked before
|
||||||
// the soft-fail bool.
|
// the soft-fail bool.
|
||||||
func CheckForSoftFail(
|
func CheckForSoftFail(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db checkForAuthAndSoftFailStorage,
|
db storage.Database,
|
||||||
event *gomatrixserverlib.HeaderedEvent,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
stateEventIDs []string,
|
stateEventIDs []string,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
|
@ -97,7 +92,7 @@ func CheckForSoftFail(
|
||||||
// Returns the numeric IDs for the auth events.
|
// Returns the numeric IDs for the auth events.
|
||||||
func CheckAuthEvents(
|
func CheckAuthEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db checkForAuthAndSoftFailStorage,
|
db storage.Database,
|
||||||
event *gomatrixserverlib.HeaderedEvent,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
authEventIDs []string,
|
authEventIDs []string,
|
||||||
) ([]types.EventNID, error) {
|
) ([]types.EventNID, error) {
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -40,19 +39,6 @@ import (
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
type retryAction int
|
|
||||||
type commitAction int
|
|
||||||
|
|
||||||
const (
|
|
||||||
doNotRetry retryAction = iota
|
|
||||||
retryLater
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
commitTransaction commitAction = iota
|
|
||||||
rollbackTransaction
|
|
||||||
)
|
|
||||||
|
|
||||||
var keyContentFields = map[string]string{
|
var keyContentFields = map[string]string{
|
||||||
"m.room.join_rules": "join_rule",
|
"m.room.join_rules": "join_rule",
|
||||||
"m.room.history_visibility": "history_visibility",
|
"m.room.history_visibility": "history_visibility",
|
||||||
|
@ -117,8 +103,7 @@ func (r *Inputer) Start() error {
|
||||||
_ = msg.InProgress() // resets the acknowledgement wait timer
|
_ = msg.InProgress() // resets the acknowledgement wait timer
|
||||||
defer eventsInProgress.Delete(index)
|
defer eventsInProgress.Delete(index)
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||||
action, err := r.processRoomEventUsingUpdater(r.ProcessContext.Context(), roomID, &inputRoomEvent)
|
if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
|
||||||
if err != nil {
|
|
||||||
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
}
|
}
|
||||||
|
@ -127,11 +112,8 @@ func (r *Inputer) Start() error {
|
||||||
"event_id": inputRoomEvent.Event.EventID(),
|
"event_id": inputRoomEvent.Event.EventID(),
|
||||||
"type": inputRoomEvent.Event.Type(),
|
"type": inputRoomEvent.Event.Type(),
|
||||||
}).Warn("Roomserver failed to process async event")
|
}).Warn("Roomserver failed to process async event")
|
||||||
}
|
_ = msg.Term()
|
||||||
switch action {
|
} else {
|
||||||
case retryLater:
|
|
||||||
_ = msg.Nak()
|
|
||||||
case doNotRetry:
|
|
||||||
_ = msg.Ack()
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -153,37 +135,6 @@ func (r *Inputer) Start() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// processRoomEventUsingUpdater opens up a room updater and tries to
|
|
||||||
// process the event. It returns whether or not we should positively
|
|
||||||
// or negatively acknowledge the event (i.e. for NATS) and an error
|
|
||||||
// if it occurred.
|
|
||||||
func (r *Inputer) processRoomEventUsingUpdater(
|
|
||||||
ctx context.Context,
|
|
||||||
roomID string,
|
|
||||||
inputRoomEvent *api.InputRoomEvent,
|
|
||||||
) (retryAction, error) {
|
|
||||||
roomInfo, err := r.DB.RoomInfo(ctx, roomID)
|
|
||||||
if err != nil {
|
|
||||||
return doNotRetry, fmt.Errorf("r.DB.RoomInfo: %w", err)
|
|
||||||
}
|
|
||||||
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
|
||||||
if err != nil {
|
|
||||||
return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
|
|
||||||
}
|
|
||||||
action, err := r.processRoomEvent(ctx, updater, inputRoomEvent)
|
|
||||||
switch action {
|
|
||||||
case commitTransaction:
|
|
||||||
if cerr := updater.Commit(); cerr != nil {
|
|
||||||
return retryLater, fmt.Errorf("updater.Commit: %w", cerr)
|
|
||||||
}
|
|
||||||
case rollbackTransaction:
|
|
||||||
if rerr := updater.Rollback(); rerr != nil {
|
|
||||||
return retryLater, fmt.Errorf("updater.Rollback: %w", rerr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return doNotRetry, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// InputRoomEvents implements api.RoomserverInternalAPI
|
// InputRoomEvents implements api.RoomserverInternalAPI
|
||||||
func (r *Inputer) InputRoomEvents(
|
func (r *Inputer) InputRoomEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
@ -230,7 +181,7 @@ func (r *Inputer) InputRoomEvents(
|
||||||
worker.Act(nil, func() {
|
worker.Act(nil, func() {
|
||||||
defer eventsInProgress.Delete(index)
|
defer eventsInProgress.Delete(index)
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||||
_, err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent)
|
err := r.processRoomEvent(ctx, &inputRoomEvent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
|
|
@ -26,10 +26,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/internal/hooks"
|
"github.com/matrix-org/dendrite/internal/hooks"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
@ -68,15 +68,14 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (r *Inputer) processRoomEvent(
|
func (r *Inputer) processRoomEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
updater *shared.RoomUpdater,
|
|
||||||
input *api.InputRoomEvent,
|
input *api.InputRoomEvent,
|
||||||
) (commitAction, error) {
|
) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// Before we do anything, make sure the context hasn't expired for this pending task.
|
// Before we do anything, make sure the context hasn't expired for this pending task.
|
||||||
// If it has then we'll give up straight away — it's probably a synchronous input
|
// If it has then we'll give up straight away — it's probably a synchronous input
|
||||||
// request and the caller has already given up, but the inbox task was still queued.
|
// request and the caller has already given up, but the inbox task was still queued.
|
||||||
return rollbackTransaction, context.DeadlineExceeded
|
return context.DeadlineExceeded
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +108,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
// if we have already got this event then do not process it again, if the input kind is an outlier.
|
// if we have already got this event then do not process it again, if the input kind is an outlier.
|
||||||
// Outliers contain no extra information which may warrant a re-processing.
|
// Outliers contain no extra information which may warrant a re-processing.
|
||||||
if input.Kind == api.KindOutlier {
|
if input.Kind == api.KindOutlier {
|
||||||
evs, err2 := updater.EventsFromIDs(ctx, []string{event.EventID()})
|
evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
|
||||||
if err2 == nil && len(evs) == 1 {
|
if err2 == nil && len(evs) == 1 {
|
||||||
// check hash matches if we're on early room versions where the event ID was a random string
|
// check hash matches if we're on early room versions where the event ID was a random string
|
||||||
idFormat, err2 := headered.RoomVersion.EventIDFormat()
|
idFormat, err2 := headered.RoomVersion.EventIDFormat()
|
||||||
|
@ -118,11 +117,11 @@ func (r *Inputer) processRoomEvent(
|
||||||
case gomatrixserverlib.EventIDFormatV1:
|
case gomatrixserverlib.EventIDFormatV1:
|
||||||
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
|
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
|
||||||
logger.Debugf("Already processed event; ignoring")
|
logger.Debugf("Already processed event; ignoring")
|
||||||
return rollbackTransaction, nil
|
return nil
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
logger.Debugf("Already processed event; ignoring")
|
logger.Debugf("Already processed event; ignoring")
|
||||||
return rollbackTransaction, nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,17 +130,21 @@ func (r *Inputer) processRoomEvent(
|
||||||
// Don't waste time processing the event if the room doesn't exist.
|
// Don't waste time processing the event if the room doesn't exist.
|
||||||
// A room entry locally will only be created in response to a create
|
// A room entry locally will only be created in response to a create
|
||||||
// event.
|
// event.
|
||||||
|
roomInfo, rerr := r.DB.RoomInfo(ctx, event.RoomID())
|
||||||
|
if rerr != nil {
|
||||||
|
return fmt.Errorf("r.DB.RoomInfo: %w", rerr)
|
||||||
|
}
|
||||||
isCreateEvent := event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("")
|
isCreateEvent := event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("")
|
||||||
if !updater.RoomExists() && !isCreateEvent {
|
if roomInfo == nil && !isCreateEvent {
|
||||||
return rollbackTransaction, fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID())
|
return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID())
|
||||||
}
|
}
|
||||||
|
|
||||||
var missingAuth, missingPrev bool
|
var missingAuth, missingPrev bool
|
||||||
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
|
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
|
||||||
if !isCreateEvent {
|
if !isCreateEvent {
|
||||||
missingAuthIDs, missingPrevIDs, err := updater.MissingAuthPrevEvents(ctx, event)
|
missingAuthIDs, missingPrevIDs, err := r.DB.MissingAuthPrevEvents(ctx, event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("updater.MissingAuthPrevEvents: %w", err)
|
return fmt.Errorf("updater.MissingAuthPrevEvents: %w", err)
|
||||||
}
|
}
|
||||||
missingAuth = len(missingAuthIDs) > 0
|
missingAuth = len(missingAuthIDs) > 0
|
||||||
missingPrev = !input.HasState && len(missingPrevIDs) > 0
|
missingPrev = !input.HasState && len(missingPrevIDs) > 0
|
||||||
|
@ -153,7 +156,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
ExcludeSelf: true,
|
ExcludeSelf: true,
|
||||||
}
|
}
|
||||||
if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||||
}
|
}
|
||||||
// Sort all of the servers into a map so that we can randomise
|
// Sort all of the servers into a map so that we can randomise
|
||||||
// their order. Then make sure that the input origin and the
|
// their order. Then make sure that the input origin and the
|
||||||
|
@ -182,8 +185,8 @@ func (r *Inputer) processRoomEvent(
|
||||||
isRejected := false
|
isRejected := false
|
||||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||||
knownEvents := map[string]*types.Event{}
|
knownEvents := map[string]*types.Event{}
|
||||||
if err := r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
if err := r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("r.fetchAuthEvents: %w", err)
|
return fmt.Errorf("r.fetchAuthEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the event is allowed by its auth events. If it isn't then
|
// Check if the event is allowed by its auth events. If it isn't then
|
||||||
|
@ -205,12 +208,12 @@ func (r *Inputer) processRoomEvent(
|
||||||
// but weren't found.
|
// but weren't found.
|
||||||
if isRejected {
|
if isRejected {
|
||||||
if event.StateKey() != nil {
|
if event.StateKey() != nil {
|
||||||
return commitTransaction, fmt.Errorf(
|
return fmt.Errorf(
|
||||||
"missing auth event %s for state event %s (type %q, state key %q)",
|
"missing auth event %s for state event %s (type %q, state key %q)",
|
||||||
authEventID, event.EventID(), event.Type(), *event.StateKey(),
|
authEventID, event.EventID(), event.Type(), *event.StateKey(),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
return commitTransaction, fmt.Errorf(
|
return fmt.Errorf(
|
||||||
"missing auth event %s for timeline event %s (type %q)",
|
"missing auth event %s for timeline event %s (type %q)",
|
||||||
authEventID, event.EventID(), event.Type(),
|
authEventID, event.EventID(), event.Type(),
|
||||||
)
|
)
|
||||||
|
@ -226,7 +229,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
// Check that the event passes authentication checks based on the
|
// Check that the event passes authentication checks based on the
|
||||||
// current room state.
|
// current room state.
|
||||||
var err error
|
var err error
|
||||||
softfail, err = helpers.CheckForSoftFail(ctx, updater, headered, input.StateEventIDs)
|
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Warn("Error authing soft-failed event")
|
logger.WithError(err).Warn("Error authing soft-failed event")
|
||||||
}
|
}
|
||||||
|
@ -250,7 +253,8 @@ func (r *Inputer) processRoomEvent(
|
||||||
missingState := missingStateReq{
|
missingState := missingStateReq{
|
||||||
origin: input.Origin,
|
origin: input.Origin,
|
||||||
inputer: r,
|
inputer: r,
|
||||||
db: updater,
|
db: r.DB,
|
||||||
|
roomInfo: roomInfo,
|
||||||
federation: r.FSAPI,
|
federation: r.FSAPI,
|
||||||
keys: r.KeyRing,
|
keys: r.KeyRing,
|
||||||
roomsMu: internal.NewMutexByRoom(),
|
roomsMu: internal.NewMutexByRoom(),
|
||||||
|
@ -290,16 +294,16 @@ func (r *Inputer) processRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the event.
|
// Store the event.
|
||||||
_, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("updater.StoreEvent: %w", err)
|
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if storing this event results in it being redacted then do so.
|
// if storing this event results in it being redacted then do so.
|
||||||
if !isRejected && redactedEventID == event.EventID() {
|
if !isRejected && redactedEventID == event.EventID() {
|
||||||
r, rerr := eventutil.RedactEvent(redactionEvent, event)
|
r, rerr := eventutil.RedactEvent(redactionEvent, event)
|
||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||||
}
|
}
|
||||||
event = r
|
event = r
|
||||||
}
|
}
|
||||||
|
@ -310,23 +314,25 @@ func (r *Inputer) processRoomEvent(
|
||||||
if input.Kind == api.KindOutlier {
|
if input.Kind == api.KindOutlier {
|
||||||
logger.Debug("Stored outlier")
|
logger.Debug("Stored outlier")
|
||||||
hooks.Run(hooks.KindNewEventPersisted, headered)
|
hooks.Run(hooks.KindNewEventPersisted, headered)
|
||||||
return commitTransaction, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
roomInfo, err := updater.RoomInfo(ctx, event.RoomID())
|
// Request the room info again — it's possible that the room has been
|
||||||
|
// created by now if it didn't exist already.
|
||||||
|
roomInfo, err = r.DB.RoomInfo(ctx, event.RoomID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("updater.RoomInfo: %w", err)
|
return fmt.Errorf("updater.RoomInfo: %w", err)
|
||||||
}
|
}
|
||||||
if roomInfo == nil {
|
if roomInfo == nil {
|
||||||
return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
|
return fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
|
||||||
}
|
}
|
||||||
|
|
||||||
if input.HasState || (!missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0) {
|
if input.HasState || (!missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0) {
|
||||||
// We haven't calculated a state for this event yet.
|
// We haven't calculated a state for this event yet.
|
||||||
// Lets calculate one.
|
// Lets calculate one.
|
||||||
err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected)
|
err = r.calculateAndSetState(ctx, input, roomInfo, &stateAtEvent, event, isRejected)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("r.calculateAndSetState: %w", err)
|
return fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,16 +343,15 @@ func (r *Inputer) processRoomEvent(
|
||||||
"missing_prev": missingPrev,
|
"missing_prev": missingPrev,
|
||||||
}).Warn("Stored rejected event")
|
}).Warn("Stored rejected event")
|
||||||
if rejectionErr != nil {
|
if rejectionErr != nil {
|
||||||
return commitTransaction, types.RejectedError(rejectionErr.Error())
|
return types.RejectedError(rejectionErr.Error())
|
||||||
}
|
}
|
||||||
return commitTransaction, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
switch input.Kind {
|
switch input.Kind {
|
||||||
case api.KindNew:
|
case api.KindNew:
|
||||||
if err = r.updateLatestEvents(
|
if err = r.updateLatestEvents(
|
||||||
ctx, // context
|
ctx, // context
|
||||||
updater, // room updater
|
|
||||||
roomInfo, // room info for the room being updated
|
roomInfo, // room info for the room being updated
|
||||||
stateAtEvent, // state at event (below)
|
stateAtEvent, // state at event (below)
|
||||||
event, // event
|
event, // event
|
||||||
|
@ -354,7 +359,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
input.TransactionID, // transaction ID
|
input.TransactionID, // transaction ID
|
||||||
input.HasState, // rewrites state?
|
input.HasState, // rewrites state?
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("r.updateLatestEvents: %w", err)
|
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||||
}
|
}
|
||||||
case api.KindOld:
|
case api.KindOld:
|
||||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
||||||
|
@ -366,7 +371,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,14 +390,14 @@ func (r *Inputer) processRoomEvent(
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Everything was OK — the latest events updater didn't error and
|
// Everything was OK — the latest events updater didn't error and
|
||||||
// we've sent output events. Finally, generate a hook call.
|
// we've sent output events. Finally, generate a hook call.
|
||||||
hooks.Run(hooks.KindNewEventPersisted, headered)
|
hooks.Run(hooks.KindNewEventPersisted, headered)
|
||||||
return commitTransaction, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchAuthEvents will check to see if any of the
|
// fetchAuthEvents will check to see if any of the
|
||||||
|
@ -404,7 +409,6 @@ func (r *Inputer) processRoomEvent(
|
||||||
// they are now in the database.
|
// they are now in the database.
|
||||||
func (r *Inputer) fetchAuthEvents(
|
func (r *Inputer) fetchAuthEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
updater *shared.RoomUpdater,
|
|
||||||
logger *logrus.Entry,
|
logger *logrus.Entry,
|
||||||
event *gomatrixserverlib.HeaderedEvent,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
auth *gomatrixserverlib.AuthEvents,
|
auth *gomatrixserverlib.AuthEvents,
|
||||||
|
@ -418,7 +422,7 @@ func (r *Inputer) fetchAuthEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, authEventID := range authEventIDs {
|
for _, authEventID := range authEventIDs {
|
||||||
authEvents, err := updater.EventsFromIDs(ctx, []string{authEventID})
|
authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
|
||||||
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
||||||
unknown[authEventID] = struct{}{}
|
unknown[authEventID] = struct{}{}
|
||||||
continue
|
continue
|
||||||
|
@ -495,7 +499,7 @@ nextAuthEvent:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, store the event in the database.
|
// Finally, store the event in the database.
|
||||||
eventNID, _, _, _, _, err := updater.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
|
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -520,14 +524,18 @@ nextAuthEvent:
|
||||||
|
|
||||||
func (r *Inputer) calculateAndSetState(
|
func (r *Inputer) calculateAndSetState(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
updater *shared.RoomUpdater,
|
|
||||||
input *api.InputRoomEvent,
|
input *api.InputRoomEvent,
|
||||||
roomInfo *types.RoomInfo,
|
roomInfo *types.RoomInfo,
|
||||||
stateAtEvent *types.StateAtEvent,
|
stateAtEvent *types.StateAtEvent,
|
||||||
event *gomatrixserverlib.Event,
|
event *gomatrixserverlib.Event,
|
||||||
isRejected bool,
|
isRejected bool,
|
||||||
) error {
|
) error {
|
||||||
var err error
|
var succeeded bool
|
||||||
|
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
|
||||||
|
}
|
||||||
|
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||||
roomState := state.NewStateResolution(updater, roomInfo)
|
roomState := state.NewStateResolution(updater, roomInfo)
|
||||||
|
|
||||||
if input.HasState {
|
if input.HasState {
|
||||||
|
@ -536,7 +544,7 @@ func (r *Inputer) calculateAndSetState(
|
||||||
// We've been told what the state at the event is so we don't need to calculate it.
|
// We've been told what the state at the event is so we don't need to calculate it.
|
||||||
// Check that those state events are in the database and store the state.
|
// Check that those state events are in the database and store the state.
|
||||||
var entries []types.StateEntry
|
var entries []types.StateEntry
|
||||||
if entries, err = updater.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
|
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
|
||||||
return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err)
|
return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err)
|
||||||
}
|
}
|
||||||
entries = types.DeduplicateStateEntries(entries)
|
entries = types.DeduplicateStateEntries(entries)
|
||||||
|
@ -557,5 +565,6 @@ func (r *Inputer) calculateAndSetState(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("r.DB.SetState: %w", err)
|
return fmt.Errorf("r.DB.SetState: %w", err)
|
||||||
}
|
}
|
||||||
|
succeeded = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||||
|
@ -47,7 +48,6 @@ import (
|
||||||
// Can only be called once at a time
|
// Can only be called once at a time
|
||||||
func (r *Inputer) updateLatestEvents(
|
func (r *Inputer) updateLatestEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
updater *shared.RoomUpdater,
|
|
||||||
roomInfo *types.RoomInfo,
|
roomInfo *types.RoomInfo,
|
||||||
stateAtEvent types.StateAtEvent,
|
stateAtEvent types.StateAtEvent,
|
||||||
event *gomatrixserverlib.Event,
|
event *gomatrixserverlib.Event,
|
||||||
|
@ -55,6 +55,14 @@ func (r *Inputer) updateLatestEvents(
|
||||||
transactionID *api.TransactionID,
|
transactionID *api.TransactionID,
|
||||||
rewritesState bool,
|
rewritesState bool,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
|
var succeeded bool
|
||||||
|
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||||
|
|
||||||
u := latestEventsUpdater{
|
u := latestEventsUpdater{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
api: r,
|
api: r,
|
||||||
|
@ -71,6 +79,7 @@ func (r *Inputer) updateLatestEvents(
|
||||||
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
|
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
succeeded = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
@ -25,7 +25,8 @@ type parsedRespState struct {
|
||||||
|
|
||||||
type missingStateReq struct {
|
type missingStateReq struct {
|
||||||
origin gomatrixserverlib.ServerName
|
origin gomatrixserverlib.ServerName
|
||||||
db *shared.RoomUpdater
|
db storage.Database
|
||||||
|
roomInfo *types.RoomInfo
|
||||||
inputer *Inputer
|
inputer *Inputer
|
||||||
keys gomatrixserverlib.JSONVerifier
|
keys gomatrixserverlib.JSONVerifier
|
||||||
federation fedapi.FederationInternalAPI
|
federation fedapi.FederationInternalAPI
|
||||||
|
@ -80,7 +81,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
|
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
|
||||||
// in the gap in the DAG
|
// in the gap in the DAG
|
||||||
for _, newEvent := range newEvents {
|
for _, newEvent := range newEvents {
|
||||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||||
Kind: api.KindOld,
|
Kind: api.KindOld,
|
||||||
Event: newEvent.Headered(roomVersion),
|
Event: newEvent.Headered(roomVersion),
|
||||||
Origin: t.origin,
|
Origin: t.origin,
|
||||||
|
@ -139,8 +140,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
for _, ire := range outlierRoomEvents {
|
for _, ire := range outlierRoomEvents {
|
||||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &ire)
|
if err = t.inputer.processRoomEvent(ctx, &ire); err != nil {
|
||||||
if err != nil {
|
|
||||||
if _, ok := err.(types.RejectedError); !ok {
|
if _, ok := err.(types.RejectedError); !ok {
|
||||||
return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err)
|
return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err)
|
||||||
}
|
}
|
||||||
|
@ -163,7 +163,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
stateIDs = append(stateIDs, event.EventID())
|
stateIDs = append(stateIDs, event.EventID())
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||||
Kind: api.KindOld,
|
Kind: api.KindOld,
|
||||||
Event: backwardsExtremity.Headered(roomVersion),
|
Event: backwardsExtremity.Headered(roomVersion),
|
||||||
Origin: t.origin,
|
Origin: t.origin,
|
||||||
|
@ -182,7 +182,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
// they will automatically fast-forward based on the room state at the
|
// they will automatically fast-forward based on the room state at the
|
||||||
// extremity in the last step.
|
// extremity in the last step.
|
||||||
for _, newEvent := range newEvents {
|
for _, newEvent := range newEvents {
|
||||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||||
Kind: api.KindOld,
|
Kind: api.KindOld,
|
||||||
Event: newEvent.Headered(roomVersion),
|
Event: newEvent.Headered(roomVersion),
|
||||||
Origin: t.origin,
|
Origin: t.origin,
|
||||||
|
@ -473,8 +473,10 @@ retryAllowedState:
|
||||||
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
|
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
|
||||||
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
|
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
|
||||||
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
||||||
|
latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID)
|
||||||
latest := t.db.LatestEvents()
|
if err != nil {
|
||||||
|
return nil, false, false, fmt.Errorf("t.DB.LatestEventIDs: %w", err)
|
||||||
|
}
|
||||||
latestEvents := make([]string, len(latest))
|
latestEvents := make([]string, len(latest))
|
||||||
for i, ev := range latest {
|
for i, ev := range latest {
|
||||||
latestEvents[i] = ev.EventID
|
latestEvents[i] = ev.EventID
|
||||||
|
|
|
@ -35,6 +35,11 @@ type Database interface {
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
stateBlockNIDs []types.StateBlockNID,
|
||||||
state []types.StateEntry,
|
state []types.StateEntry,
|
||||||
) (types.StateSnapshotNID, error)
|
) (types.StateSnapshotNID, error)
|
||||||
|
|
||||||
|
MissingAuthPrevEvents(
|
||||||
|
ctx context.Context, e *gomatrixserverlib.Event,
|
||||||
|
) (missingAuth, missingPrev []string, err error)
|
||||||
|
|
||||||
// Look up the state of a room at each event for a list of string event IDs.
|
// Look up the state of a room at each event for a list of string event IDs.
|
||||||
// Returns an error if there is an error talking to the database.
|
// Returns an error if there is an error talking to the database.
|
||||||
// The length of []types.StateAtEvent is guaranteed to equal the length of eventIDs if no error is returned.
|
// The length of []types.StateAtEvent is guaranteed to equal the length of eventIDs if no error is returned.
|
||||||
|
|
|
@ -103,25 +103,6 @@ func (u *RoomUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
|
||||||
return u.currentStateSnapshotNID
|
return u.currentStateSnapshotNID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) MissingAuthPrevEvents(
|
|
||||||
ctx context.Context, e *gomatrixserverlib.Event,
|
|
||||||
) (missingAuth, missingPrev []string, err error) {
|
|
||||||
for _, authEventID := range e.AuthEventIDs() {
|
|
||||||
if nids, err := u.EventNIDs(ctx, []string{authEventID}); err != nil || len(nids) == 0 {
|
|
||||||
missingAuth = append(missingAuth, authEventID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, prevEventID := range e.PrevEventIDs() {
|
|
||||||
state, err := u.StateAtEventIDs(ctx, []string{prevEventID})
|
|
||||||
if err != nil || len(state) == 0 || (!state[0].IsCreate() && state[0].BeforeStateSnapshotNID == 0) {
|
|
||||||
missingPrev = append(missingPrev, prevEventID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer
|
// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer
|
||||||
func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
|
func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
|
||||||
return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||||
|
@ -146,13 +127,6 @@ func (u *RoomUpdater) SnapshotNIDFromEventID(
|
||||||
return u.d.snapshotNIDFromEventID(ctx, u.txn, eventID)
|
return u.d.snapshotNIDFromEventID(ctx, u.txn, eventID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) StoreEvent(
|
|
||||||
ctx context.Context, event *gomatrixserverlib.Event,
|
|
||||||
authEventNIDs []types.EventNID, isRejected bool,
|
|
||||||
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
|
||||||
return u.d.storeEvent(ctx, u, event, authEventNIDs, isRejected)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *RoomUpdater) StateBlockNIDs(
|
func (u *RoomUpdater) StateBlockNIDs(
|
||||||
ctx context.Context, stateNIDs []types.StateSnapshotNID,
|
ctx context.Context, stateNIDs []types.StateSnapshotNID,
|
||||||
) ([]types.StateBlockNIDList, error) {
|
) ([]types.StateBlockNIDList, error) {
|
||||||
|
@ -212,44 +186,16 @@ func (u *RoomUpdater) EventIDs(
|
||||||
return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs)
|
return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) EventNIDs(
|
|
||||||
ctx context.Context, eventIDs []string,
|
|
||||||
) (map[string]types.EventNID, error) {
|
|
||||||
return u.d.eventNIDs(ctx, u.txn, eventIDs, NoFilter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *RoomUpdater) UnsentEventNIDs(
|
|
||||||
ctx context.Context, eventIDs []string,
|
|
||||||
) (map[string]types.EventNID, error) {
|
|
||||||
return u.d.eventNIDs(ctx, u.txn, eventIDs, FilterUnsentOnly)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *RoomUpdater) StateAtEventIDs(
|
func (u *RoomUpdater) StateAtEventIDs(
|
||||||
ctx context.Context, eventIDs []string,
|
ctx context.Context, eventIDs []string,
|
||||||
) ([]types.StateAtEvent, error) {
|
) ([]types.StateAtEvent, error) {
|
||||||
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
|
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) StateEntriesForEventIDs(
|
|
||||||
ctx context.Context, eventIDs []string,
|
|
||||||
) ([]types.StateEntry, error) {
|
|
||||||
return u.d.EventsTable.BulkSelectStateEventByID(ctx, u.txn, eventIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
|
||||||
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
||||||
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true)
|
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) GetMembershipEventNIDsForRoom(
|
|
||||||
ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool,
|
|
||||||
) ([]types.EventNID, error) {
|
|
||||||
return u.d.getMembershipEventNIDsForRoom(ctx, u.txn, roomNID, joinOnly, localOnly)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsReferenced implements types.RoomRecentEventsUpdater
|
// IsReferenced implements types.RoomRecentEventsUpdater
|
||||||
func (u *RoomUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) {
|
func (u *RoomUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) {
|
||||||
err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256)
|
err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256)
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -673,6 +674,29 @@ func (d *Database) GetPublishedRooms(ctx context.Context) ([]string, error) {
|
||||||
return d.PublishedTable.SelectAllPublishedRooms(ctx, nil, true)
|
return d.PublishedTable.SelectAllPublishedRooms(ctx, nil, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) MissingAuthPrevEvents(
|
||||||
|
ctx context.Context, e *gomatrixserverlib.Event,
|
||||||
|
) (missingAuth, missingPrev []string, err error) {
|
||||||
|
authEventNIDs, err := d.EventNIDs(ctx, e.AuthEventIDs())
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("d.EventNIDs: %w", err)
|
||||||
|
}
|
||||||
|
for _, authEventID := range e.AuthEventIDs() {
|
||||||
|
if _, ok := authEventNIDs[authEventID]; !ok {
|
||||||
|
missingAuth = append(missingAuth, authEventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, prevEventID := range e.PrevEventIDs() {
|
||||||
|
state, err := d.StateAtEventIDs(ctx, []string{prevEventID})
|
||||||
|
if err != nil || len(state) == 0 || (!state[0].IsCreate() && state[0].BeforeStateSnapshotNID == 0) {
|
||||||
|
missingPrev = append(missingPrev, prevEventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Database) assignRoomNID(
|
func (d *Database) assignRoomNID(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
|
@ -1103,7 +1127,7 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs []string)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(nidToUserID) != len(userNIDToCount) {
|
if len(nidToUserID) != len(userNIDToCount) {
|
||||||
return nil, fmt.Errorf("found %d users but only have state key nids for %d of them", len(userNIDToCount), len(nidToUserID))
|
logrus.Warnf("SelectJoinedUsersSetForRooms found %d users but BulkSelectEventStateKey only returned state key NIDs for %d of them", len(userNIDToCount), len(nidToUserID))
|
||||||
}
|
}
|
||||||
result := make(map[string]int, len(userNIDToCount))
|
result := make(map[string]int, len(userNIDToCount))
|
||||||
for nid, count := range userNIDToCount {
|
for nid, count := range userNIDToCount {
|
||||||
|
|
|
@ -597,3 +597,7 @@ Device list doesn't change if remote server is down
|
||||||
/context/ on non world readable room does not work
|
/context/ on non world readable room does not work
|
||||||
/context/ returns correct number of events
|
/context/ returns correct number of events
|
||||||
/context/ with lazy_load_members filter works
|
/context/ with lazy_load_members filter works
|
||||||
|
Can query remote device keys using POST after notification
|
||||||
|
Device deletion propagates over federation
|
||||||
|
Get left notifs in sync and /keys/changes when other user leaves
|
||||||
|
Remote banned user is kicked and may not rejoin until unbanned
|
||||||
|
|
Loading…
Reference in a new issue