Set readmarker from the internal userAPI

This commit is contained in:
Till Faelligen 2022-09-26 09:41:53 +02:00
parent 2d9767585f
commit bb741f14df
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
4 changed files with 60 additions and 135 deletions

View file

@ -16,14 +16,11 @@ package consumers
import (
"context"
"encoding/json"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/userapi/storage"
@ -34,122 +31,7 @@ import (
"github.com/matrix-org/dendrite/userapi/util"
)
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
type OutputClientDataConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
serverName gomatrixserverlib.ServerName
syncProducer *producers.SyncAPI
pgClient pushgateway.Client
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
// Call Start() to begin consuming from the EDU server.
func NewOutputClientDataConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
store storage.Database,
syncProducer *producers.SyncAPI,
pgClient pushgateway.Client,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("UserAPIAccountDataConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,
syncProducer: syncProducer,
pgClient: pgClient,
}
}
// Start consuming receipts events.
func (s *OutputClientDataConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, 1,
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
}
func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
userID := msg.Header.Get(jetstream.UserID)
var output eventutil.AccountData
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("client API server output log: message parse failure")
sentry.CaptureException(err)
return true
}
if output.Type != "m.fully_read" || output.ReadMarker == nil {
return true
}
localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
log.WithError(err).Error("userapi clientapi consumer: SplitID failure")
return true
}
if domain != s.serverName {
return true
}
log := log.WithFields(log.Fields{
"room_id": output.RoomID,
"user_id": userID,
})
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return true
}
if serverName != s.serverName {
return true
}
var notifyUsers bool
if output.ReadMarker.Read != "" {
_, err = s.db.SetNotificationsRead(ctx, localpart, output.RoomID, output.ReadMarker.Read, true)
if err != nil {
log.WithError(err).Error("userapi EDU consumer")
return false
}
notifyUsers = true
}
if output.ReadMarker.FullyRead != "" {
_, err := s.db.DeleteNotificationsUpTo(ctx, localpart, output.RoomID, output.ReadMarker.FullyRead)
if err != nil {
log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed")
return false
}
notifyUsers = true
}
if !notifyUsers {
return true
}
if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, output.RoomID); err != nil {
log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed")
return false
}
if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed")
return false
}
return true
}
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
// OutputReceiptEventConsumer consumes events that originated in the clientAPI.
type OutputReceiptEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
@ -203,6 +85,11 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats
return true
}
log := log.WithFields(log.Fields{
"room_id": roomID,
"user_id": userID,
})
localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
log.WithError(err).Error("userapi clientapi consumer: SplitID failure")
@ -212,17 +99,16 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats
return true
}
log := log.WithFields(log.Fields{
"room_id": roomID,
"user_id": userID,
})
_, err = s.db.SetNotificationsRead(ctx, localpart, roomID, readPos, true)
updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, readPos, true)
if err != nil {
log.WithError(err).Error("userapi EDU consumer")
return false
}
if !updated {
return true
}
if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil {
log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed")
return false

View file

@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
@ -39,6 +40,7 @@ import (
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
userapiUtil "github.com/matrix-org/dendrite/userapi/util"
)
type UserInternalAPI struct {
@ -51,6 +53,7 @@ type UserInternalAPI struct {
AppServices []config.ApplicationService
KeyAPI keyapi.UserKeyAPI
RSAPI rsapi.UserRoomserverAPI
PgClient pushgateway.Client
}
func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error {
@ -73,6 +76,11 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
ignoredUsers = &synctypes.IgnoredUsers{}
_ = json.Unmarshal(req.AccountData, ignoredUsers)
}
if req.DataType == "m.fully_read" {
if err := a.setFullyRead(ctx, req); err != nil {
return err
}
}
if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
RoomID: req.RoomID,
Type: req.DataType,
@ -84,6 +92,42 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
return nil
}
func (a *UserInternalAPI) setFullyRead(ctx context.Context, req *api.InputAccountDataRequest) error {
var output eventutil.ReadMarkerJSON
if err := json.Unmarshal(req.AccountData, &output); err != nil {
return err
}
localpart, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
logrus.WithError(err).Error("UserInternalAPI.setFullyRead: SplitID failure")
return nil
}
if domain != a.ServerName {
return nil
}
deleted, err := a.DB.DeleteNotificationsUpTo(ctx, localpart, req.RoomID, output.FullyRead)
if err != nil {
logrus.WithError(err).Errorf("UserInternalAPI.setFullyRead: DeleteNotificationsUpTo failed")
return err
}
// nothing changed, no need to send notification data/notify push gateway
if !deleted {
return nil
}
if err = a.SyncProducer.GetAndSendNotificationData(ctx, req.UserID, req.RoomID); err != nil {
logrus.WithError(err).Error("UserInternalAPI.setFullyRead: GetAndSendNotificationData failed")
return err
}
if err = userapiUtil.NotifyUserCountsAsync(ctx, a.PgClient, localpart, a.DB); err != nil {
logrus.WithError(err).Error("UserInternalAPI.setFullyRead: NotifyUserCounts failed")
return err
}
return nil
}
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
acc, err := a.DB.CreateAccount(ctx, req.Localpart, req.Password, req.AppServiceID, req.AccountType)
if err != nil {

View file

@ -4,12 +4,13 @@ import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/userapi/storage"
)
type JetStreamPublisher interface {

View file

@ -81,6 +81,7 @@ func NewInternalAPI(
KeyAPI: keyAPI,
RSAPI: rsAPI,
DisableTLSValidation: cfg.PushGatewayDisableTLSValidation,
PgClient: pgClient,
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
@ -90,13 +91,6 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start user API receipt consumer")
}
readMarkerConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, db, syncProducer, pgClient,
)
if err := readMarkerConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start user API read marker consumer")
}
eventConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer,
)