mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-12 08:33:10 -06:00
zion hack - always send the notification data on a read receipt the notifications are stored in two different databases, and somehow the notification database prunes data, so trying to mark old notifications as read will fail, but the notification will still exist in the other db todo, revist when this refactor lands: https://github.com/matrix-org/dendrite/pull/2688/files it looks like we cleanup the notification table after a day func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( ctx, time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month ) return err } But we don't clean up the notifications in the syncAPI table. When we send a read receipt we first do a updated _, err := s.db.SetNotificationsRead(ctx, localpart, roomID, int64(read.Read), true) and only forward the message on if the table was updated. If a user waits more than a day to send a read receipt, they can't clear their notifications.
143 lines
4.4 KiB
Go
143 lines
4.4 KiB
Go
package consumers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
"github.com/matrix-org/dendrite/internal/pushgateway"
|
|
"github.com/matrix-org/dendrite/setup/config"
|
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
"github.com/matrix-org/dendrite/setup/process"
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
uapi "github.com/matrix-org/dendrite/userapi/api"
|
|
"github.com/matrix-org/dendrite/userapi/producers"
|
|
"github.com/matrix-org/dendrite/userapi/storage"
|
|
"github.com/matrix-org/dendrite/userapi/util"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/nats-io/nats.go"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type OutputReadUpdateConsumer struct {
|
|
ctx context.Context
|
|
cfg *config.UserAPI
|
|
jetstream nats.JetStreamContext
|
|
durable string
|
|
db storage.Database
|
|
pgClient pushgateway.Client
|
|
ServerName gomatrixserverlib.ServerName
|
|
topic string
|
|
userAPI uapi.UserInternalAPI
|
|
syncProducer *producers.SyncAPI
|
|
}
|
|
|
|
func NewOutputReadUpdateConsumer(
|
|
process *process.ProcessContext,
|
|
cfg *config.UserAPI,
|
|
js nats.JetStreamContext,
|
|
store storage.Database,
|
|
pgClient pushgateway.Client,
|
|
userAPI uapi.UserInternalAPI,
|
|
syncProducer *producers.SyncAPI,
|
|
) *OutputReadUpdateConsumer {
|
|
return &OutputReadUpdateConsumer{
|
|
ctx: process.Context(),
|
|
cfg: cfg,
|
|
jetstream: js,
|
|
db: store,
|
|
ServerName: cfg.Matrix.ServerName,
|
|
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
|
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
|
|
pgClient: pgClient,
|
|
userAPI: userAPI,
|
|
syncProducer: syncProducer,
|
|
}
|
|
}
|
|
|
|
func (s *OutputReadUpdateConsumer) Start() error {
|
|
if err := jetstream.JetStreamConsumer(
|
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
|
); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
|
var read types.ReadUpdate
|
|
if err := json.Unmarshal(msg.Data, &read); err != nil {
|
|
log.WithError(err).Error("userapi clientapi consumer: message parse failure")
|
|
return true
|
|
}
|
|
if read.FullyRead == 0 && read.Read == 0 {
|
|
return true
|
|
}
|
|
|
|
userID := string(msg.Header.Get(jetstream.UserID))
|
|
roomID := string(msg.Header.Get(jetstream.RoomID))
|
|
|
|
localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
|
|
if err != nil {
|
|
log.WithError(err).Error("userapi clientapi consumer: SplitID failure")
|
|
return true
|
|
}
|
|
if domain != s.ServerName {
|
|
log.Error("userapi clientapi consumer: not a local user")
|
|
return true
|
|
}
|
|
|
|
log := log.WithFields(log.Fields{
|
|
"room_id": roomID,
|
|
"user_id": userID,
|
|
})
|
|
log.Tracef("Received read update from sync API: %#v", read)
|
|
|
|
if read.Read > 0 {
|
|
/*updated*/ _, err := s.db.SetNotificationsRead(ctx, localpart, roomID, int64(read.Read), true)
|
|
if err != nil {
|
|
log.WithError(err).Error("userapi EDU consumer")
|
|
return false
|
|
}
|
|
|
|
// zion hack - always send the notification data
|
|
// the notifications are stored in two different databases, and somehow the notification database
|
|
// prunes data, so trying to mark old notifications as read will fail, but the notification will still exist in the other db
|
|
// todo, revist when this refactor lands: https://github.com/matrix-org/dendrite/pull/2688/files
|
|
|
|
//if updated {
|
|
if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil {
|
|
log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed")
|
|
return false
|
|
}
|
|
if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
|
|
log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed")
|
|
return false
|
|
}
|
|
//}
|
|
}
|
|
|
|
if read.FullyRead > 0 {
|
|
deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, roomID, int64(read.FullyRead))
|
|
if err != nil {
|
|
log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed")
|
|
return false
|
|
}
|
|
|
|
if deleted {
|
|
if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
|
|
log.WithError(err).Error("userapi clientapi consumer: NotifyUserCounts failed")
|
|
return false
|
|
}
|
|
|
|
if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, read.RoomID); err != nil {
|
|
log.WithError(err).Errorf("userapi clientapi consumer: GetAndSendNotificationData failed")
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|