dendrite/pushserver/consumers/eduserver.go
2022-02-10 11:45:05 +00:00

120 lines
3.5 KiB
Go

package consumers
import (
"context"
"encoding/json"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/pushserver/util"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type OutputReceiptEventConsumer struct {
ctx context.Context
cfg *config.PushServer
jetstream nats.JetStreamContext
durable string
db storage.Database
pgClient pushgateway.Client
receiptTopic string
syncProducer *producers.SyncAPI
}
// NewOutputReceiptEventConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputReceiptEventConsumer(
process *process.ProcessContext,
cfg *config.PushServer,
js nats.JetStreamContext,
store storage.Database,
pgClient pushgateway.Client,
syncProducer *producers.SyncAPI,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("PushServerEDUServerConsumer"),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
pgClient: pgClient,
syncProducer: syncProducer,
}
}
func (s *OutputReceiptEventConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.receiptTopic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
}
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var event eduapi.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.WithError(err).Errorf("pushserver EDU consumer: message parse failure")
return true
}
localpart, domain, err := gomatrixserverlib.SplitID('@', event.UserID)
if err != nil {
return true
}
if domain != s.cfg.Matrix.ServerName {
return true
}
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
"event_type": event.Type,
}).Tracef("Received message from EDU server: %#v", event)
// TODO: we cannot know if this EventID caused a notification, so
// we should first resolve it and find the closest earlier
// notification.
updated, err := s.db.SetNotificationsRead(ctx, localpart, event.RoomID, event.EventID, true)
if err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer")
return false
}
if updated {
if err := s.syncProducer.GetAndSendNotificationData(ctx, event.UserID, event.RoomID); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: GetAndSendNotificationData failed")
return false
}
if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: NotifyUserCounts failed")
return false
}
}
return true
}