dendrite/pushserver/consumers/eduserver.go

120 lines
3.4 KiB
Go

package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/pushserver/util"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
type OutputReceiptEventConsumer struct {
cfg *config.PushServer
rsConsumer *internal.ContinualConsumer
db storage.Database
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
}
func NewOutputReceiptEventConsumer(
process *process.ProcessContext,
cfg *config.PushServer,
kafkaConsumer sarama.Consumer,
store storage.Database,
pgClient pushgateway.Client,
syncProducer *producers.SyncAPI,
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "pushserver/eduserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputReceiptEventConsumer{
cfg: cfg,
rsConsumer: &consumer,
db: store,
pgClient: pgClient,
syncProducer: syncProducer,
}
consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputReceiptEventConsumer) Start() error {
return s.rsConsumer.Start()
}
func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
ctx := context.Background()
var event eduapi.OutputReceiptEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.WithError(err).Errorf("pushserver EDU consumer: message parse failure")
return nil
}
localpart, domain, err := gomatrixserverlib.SplitID('@', event.UserID)
if err != nil {
return err
}
if domain != s.cfg.Matrix.ServerName {
return fmt.Errorf("pushserver EDU consumer: not a local user: %v", event.UserID)
}
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
"event_type": event.Type,
}).Tracef("Received message from EDU server: %#v", event)
// TODO: we cannot know if this EventID caused a notification, so
// we should first resolve it and find the closest earlier
// notification.
updated, err := s.db.SetNotificationsRead(ctx, localpart, event.RoomID, event.EventID, true)
if err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer")
return nil
}
if updated {
if err := s.syncProducer.GetAndSendNotificationData(ctx, event.UserID, event.RoomID); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: GetAndSendNotificationData failed")
return nil
}
if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: NotifyUserCounts failed")
return nil
}
}
return nil
}