Add a SigningKeyUpdate producer (#2697)
This adds a new stream for signing key updates, this should ensure we don't lose any updates over federation.
This commit is contained in:
parent
440eb0f3a2
commit
2cfcfddecc
|
@ -18,6 +18,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/api"
|
"github.com/matrix-org/dendrite/federationapi/api"
|
||||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/federationapi/consumers"
|
"github.com/matrix-org/dendrite/federationapi/consumers"
|
||||||
|
@ -33,10 +35,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/base"
|
"github.com/matrix-org/dendrite/setup/base"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/routing"
|
"github.com/matrix-org/dendrite/federationapi/routing"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
||||||
|
@ -66,6 +68,7 @@ func AddPublicRoutes(
|
||||||
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
||||||
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||||
TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
|
TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
|
||||||
|
TopicSigningKeyUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,12 +21,13 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncAPIProducer produces events for the sync API server to consume
|
// SyncAPIProducer produces events for the sync API server to consume
|
||||||
|
@ -36,6 +37,7 @@ type SyncAPIProducer struct {
|
||||||
TopicTypingEvent string
|
TopicTypingEvent string
|
||||||
TopicPresenceEvent string
|
TopicPresenceEvent string
|
||||||
TopicDeviceListUpdate string
|
TopicDeviceListUpdate string
|
||||||
|
TopicSigningKeyUpdate string
|
||||||
JetStream nats.JetStreamContext
|
JetStream nats.JetStreamContext
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
UserAPI userapi.UserInternalAPI
|
UserAPI userapi.UserInternalAPI
|
||||||
|
@ -178,3 +180,15 @@ func (p *SyncAPIProducer) SendDeviceListUpdate(
|
||||||
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *SyncAPIProducer) SendSigningKeyUpdate(
|
||||||
|
ctx context.Context, data gomatrixserverlib.RawJSON, origin gomatrixserverlib.ServerName,
|
||||||
|
) (err error) {
|
||||||
|
m := nats.NewMsg(p.TopicSigningKeyUpdate)
|
||||||
|
m.Header.Set("origin", string(origin))
|
||||||
|
m.Data = data
|
||||||
|
|
||||||
|
log.Debugf("Sending signing key update")
|
||||||
|
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -22,6 +22,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/federationapi/producers"
|
"github.com/matrix-org/dendrite/federationapi/producers"
|
||||||
|
@ -31,10 +36,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
|
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/util"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -391,7 +392,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case types.MSigningKeyUpdate:
|
case types.MSigningKeyUpdate:
|
||||||
if err := t.processSigningKeyUpdate(ctx, e); err != nil {
|
if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to process signing key update")
|
logrus.WithError(err).Errorf("Failed to process signing key update")
|
||||||
}
|
}
|
||||||
case gomatrixserverlib.MPresence:
|
case gomatrixserverlib.MPresence:
|
||||||
|
@ -431,42 +432,6 @@ func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) e
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
|
|
||||||
var updatePayload keyapi.CrossSigningKeyUpdate
|
|
||||||
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
|
|
||||||
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
|
||||||
"user_id": updatePayload.UserID,
|
|
||||||
}).Debug("Failed to unmarshal signing key update")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
|
|
||||||
return nil
|
|
||||||
} else if serverName == t.ourServerName {
|
|
||||||
return nil
|
|
||||||
} else if serverName != t.Origin {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
keys := gomatrixserverlib.CrossSigningKeys{}
|
|
||||||
if updatePayload.MasterKey != nil {
|
|
||||||
keys.MasterKey = *updatePayload.MasterKey
|
|
||||||
}
|
|
||||||
if updatePayload.SelfSigningKey != nil {
|
|
||||||
keys.SelfSigningKey = *updatePayload.SelfSigningKey
|
|
||||||
}
|
|
||||||
uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
|
|
||||||
CrossSigningKeys: keys,
|
|
||||||
UserID: updatePayload.UserID,
|
|
||||||
}
|
|
||||||
uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
|
|
||||||
if err := t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if uploadRes.Error != nil {
|
|
||||||
return uploadRes.Error
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// processReceiptEvent sends receipt events to JetStream
|
// processReceiptEvent sends receipt events to JetStream
|
||||||
func (t *txnReq) processReceiptEvent(ctx context.Context,
|
func (t *txnReq) processReceiptEvent(ctx context.Context,
|
||||||
userID, roomID, receiptType string,
|
userID, roomID, receiptType string,
|
||||||
|
|
110
keyserver/consumers/signingkeyupdate.go
Normal file
110
keyserver/consumers/signingkeyupdate.go
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package consumers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SigningKeyUpdateConsumer consumes signing key updates that came in over federation.
|
||||||
|
type SigningKeyUpdateConsumer struct {
|
||||||
|
ctx context.Context
|
||||||
|
jetstream nats.JetStreamContext
|
||||||
|
durable string
|
||||||
|
topic string
|
||||||
|
keyAPI *internal.KeyInternalAPI
|
||||||
|
cfg *config.KeyServer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSigningKeyUpdateConsumer creates a new SigningKeyUpdateConsumer. Call Start() to begin consuming from key servers.
|
||||||
|
func NewSigningKeyUpdateConsumer(
|
||||||
|
process *process.ProcessContext,
|
||||||
|
cfg *config.KeyServer,
|
||||||
|
js nats.JetStreamContext,
|
||||||
|
keyAPI *internal.KeyInternalAPI,
|
||||||
|
) *SigningKeyUpdateConsumer {
|
||||||
|
return &SigningKeyUpdateConsumer{
|
||||||
|
ctx: process.Context(),
|
||||||
|
jetstream: js,
|
||||||
|
durable: cfg.Matrix.JetStream.Prefixed("KeyServerSigningKeyConsumer"),
|
||||||
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
|
||||||
|
keyAPI: keyAPI,
|
||||||
|
cfg: cfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start consuming from key servers
|
||||||
|
func (t *SigningKeyUpdateConsumer) Start() error {
|
||||||
|
return jetstream.JetStreamConsumer(
|
||||||
|
t.ctx, t.jetstream, t.topic, t.durable, 1,
|
||||||
|
t.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// onMessage is called in response to a message received on the
|
||||||
|
// signing key update events topic from the key server.
|
||||||
|
func (t *SigningKeyUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
|
var updatePayload keyapi.CrossSigningKeyUpdate
|
||||||
|
if err := json.Unmarshal(msg.Data, &updatePayload); err != nil {
|
||||||
|
logrus.WithError(err).Errorf("Failed to read from signing key update input topic")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
origin := gomatrixserverlib.ServerName(msg.Header.Get("origin"))
|
||||||
|
if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
|
||||||
|
logrus.WithError(err).Error("failed to split user id")
|
||||||
|
return true
|
||||||
|
} else if serverName == t.cfg.Matrix.ServerName {
|
||||||
|
logrus.Warn("dropping device key update from ourself")
|
||||||
|
return true
|
||||||
|
} else if serverName != origin {
|
||||||
|
logrus.Warnf("dropping device key update, %s != %s", serverName, origin)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := gomatrixserverlib.CrossSigningKeys{}
|
||||||
|
if updatePayload.MasterKey != nil {
|
||||||
|
keys.MasterKey = *updatePayload.MasterKey
|
||||||
|
}
|
||||||
|
if updatePayload.SelfSigningKey != nil {
|
||||||
|
keys.SelfSigningKey = *updatePayload.SelfSigningKey
|
||||||
|
}
|
||||||
|
uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
|
||||||
|
CrossSigningKeys: keys,
|
||||||
|
UserID: updatePayload.UserID,
|
||||||
|
}
|
||||||
|
uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
|
||||||
|
if err := t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes); err != nil {
|
||||||
|
logrus.WithError(err).Error("failed to upload device keys")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if uploadRes.Error != nil {
|
||||||
|
logrus.WithError(uploadRes.Error).Error("failed to upload device keys")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
|
@ -16,6 +16,8 @@ package keyserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/consumers"
|
"github.com/matrix-org/dendrite/keyserver/consumers"
|
||||||
|
@ -26,7 +28,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/base"
|
"github.com/matrix-org/dendrite/setup/base"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
||||||
|
@ -72,5 +73,12 @@ func NewInternalAPI(
|
||||||
logrus.WithError(err).Panic("failed to start device list consumer")
|
logrus.WithError(err).Panic("failed to start device list consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sigConsumer := consumers.NewSigningKeyUpdateConsumer(
|
||||||
|
base.ProcessContext, cfg, js, ap,
|
||||||
|
)
|
||||||
|
if err := sigConsumer.Start(); err != nil {
|
||||||
|
logrus.WithError(err).Panic("failed to start signing key consumer")
|
||||||
|
}
|
||||||
|
|
||||||
return ap
|
return ap
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ const (
|
||||||
var (
|
var (
|
||||||
InputRoomEvent = "InputRoomEvent"
|
InputRoomEvent = "InputRoomEvent"
|
||||||
InputDeviceListUpdate = "InputDeviceListUpdate"
|
InputDeviceListUpdate = "InputDeviceListUpdate"
|
||||||
|
InputSigningKeyUpdate = "InputSigningKeyUpdate"
|
||||||
OutputRoomEvent = "OutputRoomEvent"
|
OutputRoomEvent = "OutputRoomEvent"
|
||||||
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
|
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
|
||||||
OutputKeyChangeEvent = "OutputKeyChangeEvent"
|
OutputKeyChangeEvent = "OutputKeyChangeEvent"
|
||||||
|
@ -51,6 +52,11 @@ var streams = []*nats.StreamConfig{
|
||||||
Retention: nats.InterestPolicy,
|
Retention: nats.InterestPolicy,
|
||||||
Storage: nats.FileStorage,
|
Storage: nats.FileStorage,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: InputSigningKeyUpdate,
|
||||||
|
Retention: nats.InterestPolicy,
|
||||||
|
Storage: nats.FileStorage,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: OutputRoomEvent,
|
Name: OutputRoomEvent,
|
||||||
Retention: nats.InterestPolicy,
|
Retention: nats.InterestPolicy,
|
||||||
|
|
Loading…
Reference in a new issue