Remove fulltext from BaseDendrite

This commit is contained in:
Till Faelligen 2023-03-15 13:02:15 +01:00
parent d88f71ab71
commit 8c404440ef
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
4 changed files with 34 additions and 36 deletions

View file

@ -18,10 +18,10 @@
package fulltext package fulltext
import ( import (
"context"
"strings" "strings"
"github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2"
// side effect imports to allow all possible languages // side effect imports to allow all possible languages
_ "github.com/blevesearch/bleve/v2/analysis/lang/ar" _ "github.com/blevesearch/bleve/v2/analysis/lang/ar"
_ "github.com/blevesearch/bleve/v2/analysis/lang/cjk" _ "github.com/blevesearch/bleve/v2/analysis/lang/cjk"
@ -77,12 +77,16 @@ func (i *IndexElement) SetContentType(v string) {
} }
// New opens a new/existing fulltext index // New opens a new/existing fulltext index
func New(cfg config.Fulltext) (fts *Search, err error) { func New(ctx context.Context, cfg config.Fulltext) (fts *Search, err error) {
fts = &Search{} fts = &Search{}
fts.FulltextIndex, err = openIndex(cfg) fts.FulltextIndex, err = openIndex(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go func() {
<-ctx.Done()
_ = fts.Close()
}()
return fts, nil return fts, nil
} }

View file

@ -18,6 +18,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -25,7 +26,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
) )
func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search { func mustOpenIndex(t *testing.T, tempDir string) (*fulltext.Search, *process.ProcessContext) {
t.Helper() t.Helper()
cfg := config.Fulltext{ cfg := config.Fulltext{
Enabled: true, Enabled: true,
@ -36,11 +37,12 @@ func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search {
cfg.IndexPath = config.Path(tempDir) cfg.IndexPath = config.Path(tempDir)
cfg.InMemory = false cfg.InMemory = false
} }
fts, err := fulltext.New(cfg) ctx := process.NewProcessContext()
fts, err := fulltext.New(ctx.Context(), cfg)
if err != nil { if err != nil {
t.Fatal("failed to open fulltext index:", err) t.Fatal("failed to open fulltext index:", err)
} }
return fts return fts, ctx
} }
func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) (eventIDs, roomIDs []string) { func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) (eventIDs, roomIDs []string) {
@ -93,19 +95,17 @@ func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) (
func TestOpen(t *testing.T) { func TestOpen(t *testing.T) {
dataDir := t.TempDir() dataDir := t.TempDir()
fts := mustOpenIndex(t, dataDir) _, ctx := mustOpenIndex(t, dataDir)
if err := fts.Close(); err != nil { ctx.ShutdownDendrite()
t.Fatal("unable to close fulltext index", err)
}
// open existing index // open existing index
fts = mustOpenIndex(t, dataDir) _, ctx = mustOpenIndex(t, dataDir)
defer fts.Close() ctx.ShutdownDendrite()
} }
func TestIndex(t *testing.T) { func TestIndex(t *testing.T) {
fts := mustOpenIndex(t, "") fts, ctx := mustOpenIndex(t, "")
defer fts.Close() defer ctx.ShutdownDendrite()
// add some data // add some data
var streamPos int64 = 1 var streamPos int64 = 1
@ -128,8 +128,8 @@ func TestIndex(t *testing.T) {
} }
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
fts := mustOpenIndex(t, "") fts, ctx := mustOpenIndex(t, "")
defer fts.Close() defer ctx.ShutdownDendrite()
eventIDs, roomIDs := mustAddTestData(t, fts, 0) eventIDs, roomIDs := mustAddTestData(t, fts, 0)
res1, err := fts.Search("lorem", roomIDs[:1], nil, 50, 0, false) res1, err := fts.Search("lorem", roomIDs[:1], nil, 50, 0, false)
if err != nil { if err != nil {
@ -224,7 +224,8 @@ func TestSearch(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
f := mustOpenIndex(t, "") f, ctx := mustOpenIndex(t, "")
defer ctx.ShutdownDendrite()
eventIDs, roomIDs := mustAddTestData(t, f, 0) eventIDs, roomIDs := mustAddTestData(t, f, 0)
var searchRooms []string var searchRooms []string
for _, x := range tt.args.roomIndex { for _, x := range tt.args.roomIndex {

View file

@ -42,7 +42,6 @@ import (
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -83,7 +82,6 @@ type BaseDendrite struct {
Database *sql.DB Database *sql.DB
DatabaseWriter sqlutil.Writer DatabaseWriter sqlutil.Writer
EnableMetrics bool EnableMetrics bool
Fulltext *fulltext.Search
startupLock sync.Mutex startupLock sync.Mutex
} }
@ -130,14 +128,6 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base
logrus.WithError(err).Panicf("failed to start opentracing") logrus.WithError(err).Panicf("failed to start opentracing")
} }
var fts *fulltext.Search
if cfg.SyncAPI.Fulltext.Enabled {
fts, err = fulltext.New(cfg.SyncAPI.Fulltext)
if err != nil {
logrus.WithError(err).Panicf("failed to create full text")
}
}
if cfg.Global.Sentry.Enabled { if cfg.Global.Sentry.Enabled {
logrus.Info("Setting up Sentry for debugging...") logrus.Info("Setting up Sentry for debugging...")
err = sentry.Init(sentry.ClientOptions{ err = sentry.Init(sentry.ClientOptions{
@ -212,7 +202,6 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base
Database: db, // set if monolith with global connection pool only Database: db, // set if monolith with global connection pool only
DatabaseWriter: writer, // set if monolith with global connection pool only DatabaseWriter: writer, // set if monolith with global connection pool only
EnableMetrics: enableMetrics, EnableMetrics: enableMetrics,
Fulltext: fts,
} }
} }
@ -493,12 +482,6 @@ func (b *BaseDendrite) WaitForShutdown() {
logrus.Warnf("failed to flush all Sentry events!") logrus.Warnf("failed to flush all Sentry events!")
} }
} }
if b.Fulltext != nil {
err := b.Fulltext.Close()
if err != nil {
logrus.Warnf("failed to close full text search!")
}
}
logrus.Warnf("Dendrite is exiting now") logrus.Warnf("Dendrite is exiting now")
} }

View file

@ -17,6 +17,7 @@ package syncapi
import ( import (
"context" "context"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
@ -59,6 +60,15 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to load notifier ") logrus.WithError(err).Panicf("failed to load notifier ")
} }
var fts *fulltext.Search
if cfg.Fulltext.Enabled {
fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext)
if err != nil {
logrus.WithError(err).Panicf("failed to create full text")
}
base.ProcessContext.ComponentStarted()
}
federationPresenceProducer := &producers.FederationAPIPresenceProducer{ federationPresenceProducer := &producers.FederationAPIPresenceProducer{
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
JetStream: js, JetStream: js,
@ -86,7 +96,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider, base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, base.Fulltext, streams.InviteStreamProvider, rsAPI, fts,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")
@ -94,7 +104,7 @@ func AddPublicRoutes(
clientConsumer := consumers.NewOutputClientDataConsumer( clientConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, natsClient, syncDB, notifier, base.ProcessContext, cfg, js, natsClient, syncDB, notifier,
streams.AccountDataStreamProvider, base.Fulltext, streams.AccountDataStreamProvider, fts,
) )
if err = clientConsumer.Start(); err != nil { if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer") logrus.WithError(err).Panicf("failed to start client data consumer")
@ -130,6 +140,6 @@ func AddPublicRoutes(
routing.Setup( routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI, base.PublicClientAPIMux, requestPool, syncDB, userAPI,
rsAPI, cfg, base.Caches, base.Fulltext, rsAPI, cfg, base.Caches, fts,
) )
} }