diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index f0ca2106f..d2d150d60 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" @@ -32,6 +33,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" ) // OutputRoomEventConsumer consumes events that originated in the room server. @@ -47,6 +49,7 @@ type OutputRoomEventConsumer struct { inviteStream types.StreamProvider notifier *notifier.Notifier producer *producers.UserAPIStreamEventProducer + fts *fulltext.Search } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -60,6 +63,7 @@ func NewOutputRoomEventConsumer( inviteStream types.StreamProvider, rsAPI api.SyncRoomserverAPI, producer *producers.UserAPIStreamEventProducer, + fts *fulltext.Search, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -73,6 +77,7 @@ func NewOutputRoomEventConsumer( inviteStream: inviteStream, rsAPI: rsAPI, producer: producer, + fts: fts, } } @@ -252,6 +257,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( }).Panicf("roomserver output log: write new event failure") return nil } + if err = s.writeFTS(ev, pduPos); err != nil { + log.WithError(err).Warn("failed to write fulltext") + } if err = s.producer.SendStreamEvent(ev.RoomID(), ev, pduPos); err != nil { log.WithError(err).Errorf("Failed to send stream output event for event %s", ev.EventID()) @@ -301,6 +309,10 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return nil } + if err = s.writeFTS(ev, pduPos); err != nil { + log.WithError(err).Warn("failed to write fulltext") + } + if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) return err @@ -457,3 +469,35 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head event.Event, err = event.SetUnsigned(prev) return event, err } + +func (s *OutputRoomEventConsumer) writeFTS(ev *gomatrixserverlib.HeaderedEvent, pduPosition types.StreamPosition) error { + e := fulltext.IndexElement{ + EventID: ev.EventID(), + RoomID: ev.RoomID(), + StreamPosition: int64(pduPosition), + } + e.SetContentType(ev.Type()) + + switch ev.Type() { + case "m.room.message": + e.Content = gjson.GetBytes(ev.Content(), "body").String() + case gomatrixserverlib.MRoomName: + e.Content = gjson.GetBytes(ev.Content(), "name").String() + case gomatrixserverlib.MRoomTopic: + e.Content = gjson.GetBytes(ev.Content(), "topic").String() + case gomatrixserverlib.MRoomRedaction: + log.Debugf("Redacting event: %s", ev.Redacts()) + if err := s.fts.Delete(ev.Redacts()); err != nil { + return fmt.Errorf("failed to delete entry from fulltext index: %w", err) + } + default: + return nil + } + if e.Content != "" { + log.Debugf("Indexing element: %+v", e) + if err := s.fts.Index(e); err != nil { + return err + } + } + return nil +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 315c1396c..efb450eed 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -53,7 +53,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to create full text") } - syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database, fts) + syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } @@ -104,6 +104,7 @@ func AddPublicRoutes( roomConsumer := consumers.NewOutputRoomEventConsumer( base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer, + fts, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer")