diff --git a/userapi/consumers/clientapi.go b/userapi/consumers/clientapi.go index a4ae09d13..62098fb37 100644 --- a/userapi/consumers/clientapi.go +++ b/userapi/consumers/clientapi.go @@ -16,14 +16,11 @@ package consumers import ( "context" - "encoding/json" - "github.com/getsentry/sentry-go" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" - "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/userapi/storage" @@ -34,122 +31,7 @@ import ( "github.com/matrix-org/dendrite/userapi/util" ) -// OutputReceiptEventConsumer consumes events that originated in the EDU server. -type OutputClientDataConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - db storage.Database - serverName gomatrixserverlib.ServerName - syncProducer *producers.SyncAPI - pgClient pushgateway.Client -} - -// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. -// Call Start() to begin consuming from the EDU server. -func NewOutputClientDataConsumer( - process *process.ProcessContext, - cfg *config.UserAPI, - js nats.JetStreamContext, - store storage.Database, - syncProducer *producers.SyncAPI, - pgClient pushgateway.Client, -) *OutputReceiptEventConsumer { - return &OutputReceiptEventConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("UserAPIAccountDataConsumer"), - db: store, - serverName: cfg.Matrix.ServerName, - syncProducer: syncProducer, - pgClient: pgClient, - } -} - -// Start consuming receipts events. -func (s *OutputClientDataConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, 1, - s.onMessage, nats.DeliverAll(), nats.ManualAck(), - ) -} - -func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { - msg := msgs[0] // Guaranteed to exist if onMessage is called - - userID := msg.Header.Get(jetstream.UserID) - var output eventutil.AccountData - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("client API server output log: message parse failure") - sentry.CaptureException(err) - return true - } - - if output.Type != "m.fully_read" || output.ReadMarker == nil { - return true - } - - localpart, domain, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - log.WithError(err).Error("userapi clientapi consumer: SplitID failure") - return true - } - if domain != s.serverName { - return true - } - - log := log.WithFields(log.Fields{ - "room_id": output.RoomID, - "user_id": userID, - }) - - _, serverName, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - return true - } - if serverName != s.serverName { - return true - } - - var notifyUsers bool - if output.ReadMarker.Read != "" { - _, err = s.db.SetNotificationsRead(ctx, localpart, output.RoomID, output.ReadMarker.Read, true) - if err != nil { - log.WithError(err).Error("userapi EDU consumer") - return false - } - notifyUsers = true - } - - if output.ReadMarker.FullyRead != "" { - _, err := s.db.DeleteNotificationsUpTo(ctx, localpart, output.RoomID, output.ReadMarker.FullyRead) - if err != nil { - log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed") - return false - } - notifyUsers = true - } - - if !notifyUsers { - return true - } - - if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, output.RoomID); err != nil { - log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed") - return false - } - if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { - log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed") - return false - } - - return true -} - -// OutputReceiptEventConsumer consumes events that originated in the EDU server. +// OutputReceiptEventConsumer consumes events that originated in the clientAPI. type OutputReceiptEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext @@ -203,6 +85,11 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats return true } + log := log.WithFields(log.Fields{ + "room_id": roomID, + "user_id": userID, + }) + localpart, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { log.WithError(err).Error("userapi clientapi consumer: SplitID failure") @@ -212,17 +99,16 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats return true } - log := log.WithFields(log.Fields{ - "room_id": roomID, - "user_id": userID, - }) - - _, err = s.db.SetNotificationsRead(ctx, localpart, roomID, readPos, true) + updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, readPos, true) if err != nil { log.WithError(err).Error("userapi EDU consumer") return false } + if !updated { + return true + } + if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil { log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed") return false diff --git a/userapi/internal/api.go b/userapi/internal/api.go index dcbb73614..98f24ff6e 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/sqlutil" keyapi "github.com/matrix-org/dendrite/keyserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" @@ -39,6 +40,7 @@ import ( "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage/tables" + userapiUtil "github.com/matrix-org/dendrite/userapi/util" ) type UserInternalAPI struct { @@ -51,6 +53,7 @@ type UserInternalAPI struct { AppServices []config.ApplicationService KeyAPI keyapi.UserKeyAPI RSAPI rsapi.UserRoomserverAPI + PgClient pushgateway.Client } func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error { @@ -73,6 +76,11 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc ignoredUsers = &synctypes.IgnoredUsers{} _ = json.Unmarshal(req.AccountData, ignoredUsers) } + if req.DataType == "m.fully_read" { + if err := a.setFullyRead(ctx, req); err != nil { + return err + } + } if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{ RoomID: req.RoomID, Type: req.DataType, @@ -84,6 +92,42 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc return nil } +func (a *UserInternalAPI) setFullyRead(ctx context.Context, req *api.InputAccountDataRequest) error { + var output eventutil.ReadMarkerJSON + + if err := json.Unmarshal(req.AccountData, &output); err != nil { + return err + } + localpart, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + logrus.WithError(err).Error("UserInternalAPI.setFullyRead: SplitID failure") + return nil + } + if domain != a.ServerName { + return nil + } + + deleted, err := a.DB.DeleteNotificationsUpTo(ctx, localpart, req.RoomID, output.FullyRead) + if err != nil { + logrus.WithError(err).Errorf("UserInternalAPI.setFullyRead: DeleteNotificationsUpTo failed") + return err + } + // nothing changed, no need to send notification data/notify push gateway + if !deleted { + return nil + } + + if err = a.SyncProducer.GetAndSendNotificationData(ctx, req.UserID, req.RoomID); err != nil { + logrus.WithError(err).Error("UserInternalAPI.setFullyRead: GetAndSendNotificationData failed") + return err + } + if err = userapiUtil.NotifyUserCountsAsync(ctx, a.PgClient, localpart, a.DB); err != nil { + logrus.WithError(err).Error("UserInternalAPI.setFullyRead: NotifyUserCounts failed") + return err + } + return nil +} + func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error { acc, err := a.DB.CreateAccount(ctx, req.Localpart, req.Password, req.AppServiceID, req.AccountType) if err != nil { diff --git a/userapi/producers/syncapi.go b/userapi/producers/syncapi.go index 27cfc2848..f556ea352 100644 --- a/userapi/producers/syncapi.go +++ b/userapi/producers/syncapi.go @@ -4,12 +4,13 @@ import ( "context" "encoding/json" - "github.com/matrix-org/dendrite/internal/eventutil" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/userapi/storage" ) type JetStreamPublisher interface { diff --git a/userapi/userapi.go b/userapi/userapi.go index 6cd6b1899..d26b4e19a 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -81,6 +81,7 @@ func NewInternalAPI( KeyAPI: keyAPI, RSAPI: rsAPI, DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, + PgClient: pgClient, } receiptConsumer := consumers.NewOutputReceiptEventConsumer( @@ -90,13 +91,6 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start user API receipt consumer") } - readMarkerConsumer := consumers.NewOutputClientDataConsumer( - base.ProcessContext, cfg, js, db, syncProducer, pgClient, - ) - if err := readMarkerConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start user API read marker consumer") - } - eventConsumer := consumers.NewOutputRoomEventConsumer( base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer, )