Process FTS in the consumer

This commit is contained in:
Till Faelligen 2022-05-18 17:04:45 +02:00
parent b33d9e5045
commit 1b13fa3654
2 changed files with 46 additions and 1 deletions

View file

@ -21,6 +21,7 @@ import (
"fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -32,6 +33,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
@ -47,6 +49,7 @@ type OutputRoomEventConsumer struct {
inviteStream types.StreamProvider
notifier *notifier.Notifier
producer *producers.UserAPIStreamEventProducer
fts *fulltext.Search
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -60,6 +63,7 @@ func NewOutputRoomEventConsumer(
inviteStream types.StreamProvider,
rsAPI api.SyncRoomserverAPI,
producer *producers.UserAPIStreamEventProducer,
fts *fulltext.Search,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@ -73,6 +77,7 @@ func NewOutputRoomEventConsumer(
inviteStream: inviteStream,
rsAPI: rsAPI,
producer: producer,
fts: fts,
}
}
@ -252,6 +257,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write new event failure")
return nil
}
if err = s.writeFTS(ev, pduPos); err != nil {
log.WithError(err).Warn("failed to write fulltext")
}
if err = s.producer.SendStreamEvent(ev.RoomID(), ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to send stream output event for event %s", ev.EventID())
@ -301,6 +309,10 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return nil
}
if err = s.writeFTS(ev, pduPos); err != nil {
log.WithError(err).Warn("failed to write fulltext")
}
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
@ -457,3 +469,35 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
event.Event, err = event.SetUnsigned(prev)
return event, err
}
func (s *OutputRoomEventConsumer) writeFTS(ev *gomatrixserverlib.HeaderedEvent, pduPosition types.StreamPosition) error {
e := fulltext.IndexElement{
EventID: ev.EventID(),
RoomID: ev.RoomID(),
StreamPosition: int64(pduPosition),
}
e.SetContentType(ev.Type())
switch ev.Type() {
case "m.room.message":
e.Content = gjson.GetBytes(ev.Content(), "body").String()
case gomatrixserverlib.MRoomName:
e.Content = gjson.GetBytes(ev.Content(), "name").String()
case gomatrixserverlib.MRoomTopic:
e.Content = gjson.GetBytes(ev.Content(), "topic").String()
case gomatrixserverlib.MRoomRedaction:
log.Debugf("Redacting event: %s", ev.Redacts())
if err := s.fts.Delete(ev.Redacts()); err != nil {
return fmt.Errorf("failed to delete entry from fulltext index: %w", err)
}
default:
return nil
}
if e.Content != "" {
log.Debugf("Indexing element: %+v", e)
if err := s.fts.Index(e); err != nil {
return err
}
}
return nil
}

View file

@ -53,7 +53,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to create full text")
}
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database, fts)
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
@ -104,6 +104,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer,
fts,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")