mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-31 10:43:10 -06:00
Remove internal.ContinualConsumer from federationapi
This commit is contained in:
parent
8310cb83c9
commit
94e34b702d
|
|
@ -17,94 +17,89 @@ package consumers
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// KeyChangeConsumer consumes events that originate in key server.
|
// KeyChangeConsumer consumes events that originate in key server.
|
||||||
type KeyChangeConsumer struct {
|
type KeyChangeConsumer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
consumer *internal.ContinualConsumer
|
jetstream nats.JetStreamContext
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
serverName gomatrixserverlib.ServerName
|
serverName gomatrixserverlib.ServerName
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||||
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
|
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
|
||||||
func NewKeyChangeConsumer(
|
func NewKeyChangeConsumer(
|
||||||
process *process.ProcessContext,
|
process *process.ProcessContext,
|
||||||
cfg *config.KeyServer,
|
cfg *config.KeyServer,
|
||||||
kafkaConsumer sarama.Consumer,
|
js nats.JetStreamContext,
|
||||||
queues *queue.OutgoingQueues,
|
queues *queue.OutgoingQueues,
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
) *KeyChangeConsumer {
|
) *KeyChangeConsumer {
|
||||||
c := &KeyChangeConsumer{
|
return &KeyChangeConsumer{
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
consumer: &internal.ContinualConsumer{
|
jetstream: js,
|
||||||
Process: process,
|
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
|
||||||
ComponentName: "federationapi/keychange",
|
|
||||||
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
|
|
||||||
Consumer: kafkaConsumer,
|
|
||||||
PartitionStore: store,
|
|
||||||
},
|
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
serverName: cfg.Matrix.ServerName,
|
serverName: cfg.Matrix.ServerName,
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
}
|
}
|
||||||
c.consumer.ProcessMessage = c.onMessage
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from key servers
|
// Start consuming from key servers
|
||||||
func (t *KeyChangeConsumer) Start() error {
|
func (t *KeyChangeConsumer) Start() error {
|
||||||
if err := t.consumer.Start(); err != nil {
|
_, err := t.jetstream.Subscribe(
|
||||||
return fmt.Errorf("t.consumer.Start: %w", err)
|
t.topic, t.onMessage,
|
||||||
}
|
nats.DeliverAll(),
|
||||||
return nil
|
)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called in response to a message received on the
|
// onMessage is called in response to a message received on the
|
||||||
// key change events topic from the key server.
|
// key change events topic from the key server.
|
||||||
func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
func (t *KeyChangeConsumer) onMessage(msg *nats.Msg) {
|
||||||
var m api.DeviceMessage
|
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
|
||||||
if err := json.Unmarshal(msg.Value, &m); err != nil {
|
var m api.DeviceMessage
|
||||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||||
return nil
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
}
|
return true
|
||||||
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
|
}
|
||||||
// This probably shouldn't happen but stops us from panicking if we come
|
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
|
||||||
// across an update that doesn't satisfy either types.
|
// This probably shouldn't happen but stops us from panicking if we come
|
||||||
return nil
|
// across an update that doesn't satisfy either types.
|
||||||
}
|
return true
|
||||||
switch m.Type {
|
}
|
||||||
case api.TypeCrossSigningUpdate:
|
switch m.Type {
|
||||||
return t.onCrossSigningMessage(m)
|
case api.TypeCrossSigningUpdate:
|
||||||
case api.TypeDeviceKeyUpdate:
|
return t.onCrossSigningMessage(m)
|
||||||
fallthrough
|
case api.TypeDeviceKeyUpdate:
|
||||||
default:
|
fallthrough
|
||||||
return t.onDeviceKeyMessage(m)
|
default:
|
||||||
}
|
return t.onDeviceKeyMessage(m)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
|
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
if m.DeviceKeys == nil {
|
if m.DeviceKeys == nil {
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
logger := logrus.WithField("user_id", m.UserID)
|
logger := logrus.WithField("user_id", m.UserID)
|
||||||
|
|
||||||
|
|
@ -112,10 +107,10 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
|
||||||
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
|
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("Failed to extract domain from key change event")
|
logger.WithError(err).Error("Failed to extract domain from key change event")
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
if originServerName != t.serverName {
|
if originServerName != t.serverName {
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
||||||
|
|
@ -125,13 +120,13 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("failed to calculate joined rooms for user")
|
logger.WithError(err).Error("failed to calculate joined rooms for user")
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
// send this key change to all servers who share rooms with this user.
|
// send this key change to all servers who share rooms with this user.
|
||||||
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
|
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pack the EDU and marshal it
|
// Pack the EDU and marshal it
|
||||||
|
|
@ -149,24 +144,25 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
|
||||||
Keys: m.KeyJSON,
|
Keys: m.KeyJSON,
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(event); err != nil {
|
if edu.Content, err = json.Marshal(event); err != nil {
|
||||||
return err
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("Sending device list update message to %q", destinations)
|
logrus.Infof("Sending device list update message to %q", destinations)
|
||||||
return t.queues.SendEDU(edu, t.serverName, destinations)
|
err = t.queues.SendEDU(edu, t.serverName, destinations)
|
||||||
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
|
func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
output := m.CrossSigningKeyUpdate
|
output := m.CrossSigningKeyUpdate
|
||||||
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
|
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
if host != gomatrixserverlib.ServerName(t.serverName) {
|
if host != gomatrixserverlib.ServerName(t.serverName) {
|
||||||
// Ignore any messages that didn't originate locally, otherwise we'll
|
// Ignore any messages that didn't originate locally, otherwise we'll
|
||||||
// end up parroting information we received from other servers.
|
// end up parroting information we received from other servers.
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
logger := logrus.WithField("user_id", output.UserID)
|
logger := logrus.WithField("user_id", output.UserID)
|
||||||
|
|
||||||
|
|
@ -177,13 +173,13 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
|
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
// send this key change to all servers who share rooms with this user.
|
// send this key change to all servers who share rooms with this user.
|
||||||
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
|
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pack the EDU and marshal it
|
// Pack the EDU and marshal it
|
||||||
|
|
@ -193,11 +189,12 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(output); err != nil {
|
if edu.Content, err = json.Marshal(output); err != nil {
|
||||||
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
|
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Infof("Sending cross-signing update message to %q", destinations)
|
logger.Infof("Sending cross-signing update message to %q", destinations)
|
||||||
return t.queues.SendEDU(edu, t.serverName, destinations)
|
err = t.queues.SendEDU(edu, t.serverName, destinations)
|
||||||
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func prevID(streamID int) []int {
|
func prevID(streamID int) []int {
|
||||||
|
|
|
||||||
|
|
@ -125,7 +125,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
}).Panicf("roomserver output log: remote peek event failure")
|
}).Panicf("roomserver output log: remote peek event failure")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
case api.OutputTypeNewInviteEvent:
|
||||||
|
log.WithField("type", output.Type).Debug(
|
||||||
|
"received new invite, send device keys",
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
|
|
|
||||||
|
|
@ -92,7 +92,7 @@ func NewInternalAPI(
|
||||||
FailuresUntilBlacklist: cfg.FederationMaxRetries,
|
FailuresUntilBlacklist: cfg.FederationMaxRetries,
|
||||||
}
|
}
|
||||||
|
|
||||||
js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
|
js := jetstream.Prepare(&cfg.Matrix.JetStream)
|
||||||
|
|
||||||
queues := queue.NewOutgoingQueues(
|
queues := queue.NewOutgoingQueues(
|
||||||
federationDB, base.ProcessContext,
|
federationDB, base.ProcessContext,
|
||||||
|
|
@ -120,7 +120,7 @@ func NewInternalAPI(
|
||||||
logrus.WithError(err).Panic("failed to start typing server consumer")
|
logrus.WithError(err).Panic("failed to start typing server consumer")
|
||||||
}
|
}
|
||||||
keyConsumer := consumers.NewKeyChangeConsumer(
|
keyConsumer := consumers.NewKeyChangeConsumer(
|
||||||
base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationDB, rsAPI,
|
base.ProcessContext, &base.Cfg.KeyServer, js, queues, federationDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err := keyConsumer.Start(); err != nil {
|
if err := keyConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start key server consumer")
|
logrus.WithError(err).Panic("failed to start key server consumer")
|
||||||
|
|
|
||||||
|
|
@ -19,12 +19,10 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage/shared"
|
"github.com/matrix-org/dendrite/federationapi/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/federationapi/types"
|
"github.com/matrix-org/dendrite/federationapi/types"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database interface {
|
type Database interface {
|
||||||
internal.PartitionStorer
|
|
||||||
gomatrixserverlib.KeyDatabase
|
gomatrixserverlib.KeyDatabase
|
||||||
|
|
||||||
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
|
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue