Fix keyserver consumer maybe

This commit is contained in:
Neil Alexander 2022-11-15 09:57:40 +00:00
parent 2c92b29d22
commit 8f54d33f1d
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 28 additions and 26 deletions

View file

@ -30,12 +30,12 @@ import (
// DeviceListUpdateConsumer consumes device list updates that came in over federation. // DeviceListUpdateConsumer consumes device list updates that came in over federation.
type DeviceListUpdateConsumer struct { type DeviceListUpdateConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string durable string
topic string topic string
updater *internal.DeviceListUpdater updater *internal.DeviceListUpdater
serverName gomatrixserverlib.ServerName isLocalServerName func(gomatrixserverlib.ServerName) bool
} }
// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers. // NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
@ -46,12 +46,12 @@ func NewDeviceListUpdateConsumer(
updater *internal.DeviceListUpdater, updater *internal.DeviceListUpdater,
) *DeviceListUpdateConsumer { ) *DeviceListUpdateConsumer {
return &DeviceListUpdateConsumer{ return &DeviceListUpdateConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"), durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate), topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
updater: updater, updater: updater,
serverName: cfg.Matrix.ServerName, isLocalServerName: cfg.Matrix.IsLocalServerName,
} }
} }
@ -75,7 +75,7 @@ func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
origin := gomatrixserverlib.ServerName(msg.Header.Get("origin")) origin := gomatrixserverlib.ServerName(msg.Header.Get("origin"))
if _, serverName, err := gomatrixserverlib.SplitID('@', m.UserID); err != nil { if _, serverName, err := gomatrixserverlib.SplitID('@', m.UserID); err != nil {
return true return true
} else if serverName == t.serverName { } else if t.isLocalServerName(serverName) {
return true return true
} else if serverName != origin { } else if serverName != origin {
return true return true

View file

@ -31,12 +31,13 @@ import (
// SigningKeyUpdateConsumer consumes signing key updates that came in over federation. // SigningKeyUpdateConsumer consumes signing key updates that came in over federation.
type SigningKeyUpdateConsumer struct { type SigningKeyUpdateConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string durable string
topic string topic string
keyAPI *internal.KeyInternalAPI keyAPI *internal.KeyInternalAPI
cfg *config.KeyServer cfg *config.KeyServer
isLocalServerName func(gomatrixserverlib.ServerName) bool
} }
// NewSigningKeyUpdateConsumer creates a new SigningKeyUpdateConsumer. Call Start() to begin consuming from key servers. // NewSigningKeyUpdateConsumer creates a new SigningKeyUpdateConsumer. Call Start() to begin consuming from key servers.
@ -47,12 +48,13 @@ func NewSigningKeyUpdateConsumer(
keyAPI *internal.KeyInternalAPI, keyAPI *internal.KeyInternalAPI,
) *SigningKeyUpdateConsumer { ) *SigningKeyUpdateConsumer {
return &SigningKeyUpdateConsumer{ return &SigningKeyUpdateConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Matrix.JetStream.Prefixed("KeyServerSigningKeyConsumer"), durable: cfg.Matrix.JetStream.Prefixed("KeyServerSigningKeyConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
keyAPI: keyAPI, keyAPI: keyAPI,
cfg: cfg, cfg: cfg,
isLocalServerName: cfg.Matrix.IsLocalServerName,
} }
} }
@ -77,7 +79,7 @@ func (t *SigningKeyUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil { if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
logrus.WithError(err).Error("failed to split user id") logrus.WithError(err).Error("failed to split user id")
return true return true
} else if serverName == t.cfg.Matrix.ServerName { } else if t.isLocalServerName(serverName) {
logrus.Warn("dropping device key update from ourself") logrus.Warn("dropping device key update from ourself")
return true return true
} else if serverName != origin { } else if serverName != origin {