Remove internal.ContinualConsumer from syncapi

This commit is contained in:
Till Faelligen 2022-02-01 19:07:19 +01:00
parent 94e34b702d
commit dac2055579
4 changed files with 59 additions and 65 deletions

View file

@ -18,23 +18,24 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// OutputKeyChangeEventConsumer consumes events that originated in the key server. // OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct { type OutputKeyChangeEventConsumer struct {
ctx context.Context ctx context.Context
keyChangeConsumer *internal.ContinualConsumer jetstream nats.JetStreamContext
topic string
db storage.Database db storage.Database
notifier *notifier.Notifier notifier *notifier.Notifier
stream types.StreamProvider stream types.StreamProvider
@ -49,25 +50,17 @@ func NewOutputKeyChangeEventConsumer(
process *process.ProcessContext, process *process.ProcessContext,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
topic string, topic string,
kafkaConsumer sarama.Consumer, js nats.JetStreamContext,
keyAPI api.KeyInternalAPI, keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, stream types.StreamProvider,
) *OutputKeyChangeEventConsumer { ) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/keychange",
Topic: topic,
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputKeyChangeEventConsumer{ s := &OutputKeyChangeEventConsumer{
ctx: process.Context(), ctx: process.Context(),
keyChangeConsumer: &consumer, jetstream: js,
topic: topic,
db: store, db: store,
serverName: serverName, serverName: serverName,
keyAPI: keyAPI, keyAPI: keyAPI,
@ -76,26 +69,28 @@ func NewOutputKeyChangeEventConsumer(
stream: stream, stream: stream,
} }
consumer.ProcessMessage = s.onMessage
return s return s
} }
// Start consuming from the key server // Start consuming from the key server
func (s *OutputKeyChangeEventConsumer) Start() error { func (s *OutputKeyChangeEventConsumer) Start() error {
return s.keyChangeConsumer.Start() _, err := s.jetstream.Subscribe(
s.topic, s.onMessage,
)
return err
} }
func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputKeyChangeEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var m api.DeviceMessage var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &m); err != nil { if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic") logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil return true
} }
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil { if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come // This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types. // across an update that doesn't satisfy either types.
return nil return true
} }
switch m.Type { switch m.Type {
case api.TypeCrossSigningUpdate: case api.TypeCrossSigningUpdate:
@ -105,11 +100,13 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
default: default:
return s.onDeviceKeyMessage(m, m.DeviceChangeID) return s.onDeviceKeyMessage(m, m.DeviceChangeID)
} }
})
} }
func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) error { func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) bool {
if m.DeviceKeys == nil { if m.DeviceKeys == nil {
return nil return true
} }
output := m.DeviceKeys output := m.DeviceKeys
// work out who we need to notify about the new key // work out who we need to notify about the new key
@ -120,7 +117,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
if err != nil { if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err) sentry.CaptureException(err)
return err return true
} }
// make sure we get our own key updates too! // make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1 queryRes.UserIDsToCount[output.UserID] = 1
@ -131,10 +128,10 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
} }
return nil return true
} }
func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) error { func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) bool {
output := m.CrossSigningKeyUpdate output := m.CrossSigningKeyUpdate
// work out who we need to notify about the new key // work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse var queryRes roomserverAPI.QuerySharedUsersResponse
@ -144,7 +141,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
if err != nil { if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err) sentry.CaptureException(err)
return err return true
} }
// make sure we get our own key updates too! // make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1 queryRes.UserIDsToCount[output.UserID] = 1
@ -155,5 +152,5 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
} }
return nil return true
} }

View file

@ -18,9 +18,9 @@ import (
"context" "context"
"strings" "strings"
"github.com/Shopify/sarama"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -64,8 +64,8 @@ func DeviceListCatchup(
} }
// now also track users who we already share rooms with but who have updated their devices between the two tokens // now also track users who we already share rooms with but who have updated their devices between the two tokens
offset := sarama.OffsetOldest offset := jetstream.OffsetOldest
toOffset := sarama.OffsetNewest toOffset := jetstream.OffsetNewest
if to > 0 && to > from { if to > 0 && to > from {
toOffset = int64(to) toOffset = int64(to)
} }

View file

@ -19,7 +19,6 @@ import (
eduAPI "github.com/matrix-org/dendrite/eduserver/api" eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -27,8 +26,6 @@ import (
) )
type Database interface { type Database interface {
internal.PartitionStorer
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)

View file

@ -48,7 +48,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI, cfg *config.SyncAPI,
) { ) {
js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil { if err != nil {
@ -66,7 +66,7 @@ func AddPublicRoutes(
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg.Matrix.ServerName, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), process, cfg.Matrix.ServerName, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
consumer, keyAPI, rsAPI, syncDB, notifier, js, keyAPI, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider, streams.DeviceListStreamProvider,
) )
if err = keyChangeConsumer.Start(); err != nil { if err = keyChangeConsumer.Start(); err != nil {