Fix consumers in sync API

This commit is contained in:
Neil Alexander 2022-11-14 14:08:33 +00:00
parent 52ff26a608
commit ad7b93ef81
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 52 additions and 63 deletions

View file

@ -28,22 +28,20 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
notifier *notifier.Notifier
stream streams.StreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.SyncRoomserverAPI
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
notifier *notifier.Notifier
stream streams.StreamProvider
rsAPI roomserverAPI.SyncRoomserverAPI
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@ -59,15 +57,14 @@ func NewOutputKeyChangeEventConsumer(
stream streams.StreamProvider,
) *OutputKeyChangeEventConsumer {
s := &OutputKeyChangeEventConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"),
topic: topic,
db: store,
serverName: cfg.Matrix.ServerName,
rsAPI: rsAPI,
notifier: notifier,
stream: stream,
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"),
topic: topic,
db: store,
rsAPI: rsAPI,
notifier: notifier,
stream: stream,
}
return s

View file

@ -34,14 +34,13 @@ import (
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
type OutputReceiptEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
stream streams.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
stream streams.StreamProvider
notifier *notifier.Notifier
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@ -55,14 +54,13 @@ func NewOutputReceiptEventConsumer(
stream streams.StreamProvider,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
db: store,
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
db: store,
notifier: notifier,
stream: stream,
}
}

View file

@ -364,13 +364,9 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom
// TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
if membership == gomatrixserverlib.Join {
// check it's a local join
_, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
if err != nil {
if _, _, err := s.cfg.Matrix.SplitLocalID('@', *ev.StateKey()); err != nil {
return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
if domain != s.cfg.Matrix.ServerName {
return sp, nil
}
// cancel any peeks for it
peekSP, peekErr := s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey())
@ -390,9 +386,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
if msg.Event.StateKey() == nil {
return
}
if _, serverName, err := gomatrixserverlib.SplitID('@', *msg.Event.StateKey()); err != nil {
return
} else if serverName != s.cfg.Matrix.ServerName {
if _, _, err := s.cfg.Matrix.SplitLocalID('@', *msg.Event.StateKey()); err != nil {
return
}
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)

View file

@ -37,15 +37,15 @@ import (
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
type OutputSendToDeviceEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
keyAPI keyapi.SyncKeyAPI
serverName gomatrixserverlib.ServerName // our server name
stream streams.StreamProvider
notifier *notifier.Notifier
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
keyAPI keyapi.SyncKeyAPI
isLocalServerName func(gomatrixserverlib.ServerName) bool
stream streams.StreamProvider
notifier *notifier.Notifier
}
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
@ -60,15 +60,15 @@ func NewOutputSendToDeviceEventConsumer(
stream streams.StreamProvider,
) *OutputSendToDeviceEventConsumer {
return &OutputSendToDeviceEventConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
db: store,
keyAPI: keyAPI,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
stream: stream,
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
db: store,
keyAPI: keyAPI,
isLocalServerName: cfg.Matrix.IsLocalServerName,
notifier: notifier,
stream: stream,
}
}
@ -89,7 +89,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
return true
}
if domain != s.serverName {
if !s.isLocalServerName(domain) {
log.Tracef("ignoring send-to-device event with destination %s", domain)
return true
}
@ -114,7 +114,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
if output.Type == "m.room_key_request" {
requestingDeviceID := gjson.GetBytes(output.SendToDeviceEvent.Content, "requesting_device_id").Str
_, senderDomain, _ := gomatrixserverlib.SplitID('@', output.Sender)
if requestingDeviceID != "" && senderDomain != s.serverName {
if requestingDeviceID != "" && !s.isLocalServerName(senderDomain) {
// Mark the requesting device as stale, if we don't know about it.
if err = s.keyAPI.PerformMarkAsStaleIfNeeded(ctx, &keyapi.PerformMarkAsStaleRequest{
UserID: output.Sender, Domain: senderDomain, DeviceID: requestingDeviceID,