Merge branch 'main' into neilalexander/downloadstate
This commit is contained in:
commit
c52f4b95f1
2
.github/workflows/dendrite.yml
vendored
2
.github/workflows/dendrite.yml
vendored
|
@ -342,7 +342,7 @@ jobs:
|
|||
# See https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md specifically GOROOT_1_17_X64
|
||||
run: |
|
||||
sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev
|
||||
go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest
|
||||
go get -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
|
||||
|
||||
- name: Run actions/checkout@v2 for dendrite
|
||||
uses: actions/checkout@v2
|
||||
|
|
|
@ -18,6 +18,11 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||
"github.com/matrix-org/dendrite/federationapi/types"
|
||||
|
@ -26,9 +31,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// KeyChangeConsumer consumes events that originate in key server.
|
||||
|
@ -78,6 +80,7 @@ func (t *KeyChangeConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) boo
|
|||
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||
var m api.DeviceMessage
|
||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||
return true
|
||||
}
|
||||
|
@ -105,6 +108,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
|||
// only send key change events which originated from us
|
||||
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logger.WithError(err).Error("Failed to extract domain from key change event")
|
||||
return true
|
||||
}
|
||||
|
@ -118,6 +122,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
|||
WantMembership: "join",
|
||||
}, &queryRes)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logger.WithError(err).Error("failed to calculate joined rooms for user")
|
||||
return true
|
||||
}
|
||||
|
@ -125,6 +130,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
|||
// send this key change to all servers who share rooms with this user.
|
||||
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
|
||||
return true
|
||||
}
|
||||
|
@ -147,6 +153,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
|||
Keys: m.KeyJSON,
|
||||
}
|
||||
if edu.Content, err = json.Marshal(event); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logger.WithError(err).Error("failed to marshal EDU JSON")
|
||||
return true
|
||||
}
|
||||
|
@ -160,6 +167,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
|||
output := m.CrossSigningKeyUpdate
|
||||
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
|
||||
return true
|
||||
}
|
||||
|
@ -176,12 +184,14 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
|||
WantMembership: "join",
|
||||
}, &queryRes)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
|
||||
return true
|
||||
}
|
||||
// send this key change to all servers who share rooms with this user.
|
||||
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
|
||||
return true
|
||||
}
|
||||
|
@ -196,6 +206,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
|||
Origin: string(t.serverName),
|
||||
}
|
||||
if edu.Content, err = json.Marshal(output); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -18,16 +18,18 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// OutputSendToDeviceConsumer consumes events that originate in the clientapi.
|
||||
|
@ -76,6 +78,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
|
|||
sender := msg.Header.Get("sender")
|
||||
_, originServerName, err := gomatrixserverlib.SplitID('@', sender)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender")
|
||||
return true
|
||||
}
|
||||
|
@ -85,12 +88,14 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
|
|||
// Extract the send-to-device event from msg.
|
||||
var ote syncTypes.OutputSendToDeviceEvent
|
||||
if err = json.Unmarshal(msg.Data, &ote); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("output log: message parse failed (expected send-to-device)")
|
||||
return true
|
||||
}
|
||||
|
||||
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
|
||||
return true
|
||||
}
|
||||
|
@ -116,6 +121,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
|
|||
},
|
||||
}
|
||||
if edu.Content, err = json.Marshal(tdm); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -307,11 +308,13 @@ func (oqs *OutgoingQueues) SendEDU(
|
|||
|
||||
ephemeralJSON, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
return fmt.Errorf("json.Marshal: %w", err)
|
||||
}
|
||||
|
||||
nid, err := oqs.db.StoreJSON(oqs.process.Context(), string(ephemeralJSON))
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -350,6 +351,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
for deviceID, message := range byUser {
|
||||
// TODO: check that the user and the device actually exist here
|
||||
if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
||||
"sender": directPayload.Sender,
|
||||
"user_id": userID,
|
||||
|
@ -360,6 +362,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
}
|
||||
case gomatrixserverlib.MDeviceListUpdate:
|
||||
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
|
||||
}
|
||||
case gomatrixserverlib.MReceipt:
|
||||
|
@ -395,6 +398,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
}
|
||||
case types.MSigningKeyUpdate:
|
||||
if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
logrus.WithError(err).Errorf("Failed to process signing key update")
|
||||
}
|
||||
case gomatrixserverlib.MPresence:
|
||||
|
|
|
@ -71,27 +71,27 @@ func (d *Database) UpdateRoom(
|
|||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
var joinedHosts []types.JoinedHost
|
||||
if purgeRoomFirst {
|
||||
// If the event is a create event then we'll delete all of the existing
|
||||
// data for the room. The only reason that a create event would be replayed
|
||||
// to us in this way is if we're about to receive the entire room state.
|
||||
if err = d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err)
|
||||
}
|
||||
for _, add := range addHosts {
|
||||
if err = d.FederationJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName); err != nil {
|
||||
return err
|
||||
}
|
||||
joinedHosts = append(joinedHosts, add)
|
||||
}
|
||||
} else {
|
||||
if joinedHosts, err = d.FederationJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, add := range addHosts {
|
||||
err = d.FederationJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
|
||||
if err != nil {
|
||||
for _, add := range addHosts {
|
||||
if err = d.FederationJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = d.FederationJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil {
|
||||
return err
|
||||
}
|
||||
joinedHosts = append(joinedHosts, add)
|
||||
}
|
||||
if err = d.FederationJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -47,7 +47,6 @@ Notifications can be viewed with GET /notifications
|
|||
|
||||
# More flakey
|
||||
|
||||
If remote user leaves room we no longer receive device updates
|
||||
Guest users can join guest_access rooms
|
||||
|
||||
# This will fail in HTTP API mode, so blacklisted for now
|
||||
|
|
|
@ -742,3 +742,4 @@ User in private room doesn't appear in user directory
|
|||
User joining then leaving public room appears and dissappears from directory
|
||||
User in remote room doesn't appear in user directory after server left room
|
||||
User in shared private room does appear in user directory until leave
|
||||
Existing members see new member's presence
|
Loading…
Reference in a new issue