From a72b8f22b534fd3cadf2367b4f8a78fe2b04e183 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Thu, 19 May 2022 10:11:35 +0200 Subject: [PATCH] - Move to base - Actually filter on correct rooms - Error messages --- syncapi/consumers/clientapi.go | 18 ++++++++++++++---- syncapi/routing/routing.go | 7 +++++++ syncapi/routing/search.go | 27 +++++++++++++++++++++++++-- syncapi/syncapi.go | 12 +++--------- 4 files changed, 49 insertions(+), 15 deletions(-) diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 7ae9aae7a..f69e6139e 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -53,6 +53,7 @@ type OutputClientDataConsumer struct { serverName gomatrixserverlib.ServerName producer *producers.UserAPIReadProducer fts *fulltext.Search + cfg *config.SyncAPI } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. @@ -80,6 +81,7 @@ func NewOutputClientDataConsumer( serverName: cfg.Matrix.ServerName, producer: producer, fts: fts, + cfg: cfg, } } @@ -89,13 +91,18 @@ func (s *OutputClientDataConsumer) Start() error { if err := msg.Ack(); err != nil { return } + if !s.cfg.Fulltext.Enabled { + logrus.Warn("Fulltext indexing is disabled") + return + } ctx := context.Background() logrus.Debugf("Starting to index events") var offset int start := time.Now() count := 0 + var id int64 = 0 for { - evs, err := s.db.ReIndex(ctx, 1000, int64(offset)) + evs, err := s.db.ReIndex(ctx, 1000, id) if err != nil { logrus.WithError(err).Errorf("unable to get events to index") return @@ -106,11 +113,14 @@ func (s *OutputClientDataConsumer) Start() error { logrus.Debugf("Indexing %d events", len(evs)) elements := make([]fulltext.IndexElement, 0, len(evs)) - for _, ev := range evs { + for streamPos, ev := range evs { + id = streamPos e := fulltext.IndexElement{ - EventID: ev.EventID(), - RoomID: ev.RoomID(), + EventID: ev.EventID(), + RoomID: ev.RoomID(), + StreamPosition: streamPos, } + e.SetContentType(ev.Type()) switch ev.Type() { case "m.room.message": diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 6b95b511f..3ea988011 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -18,6 +18,7 @@ import ( "net/http" "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/internal/httputil" @@ -100,6 +101,12 @@ func Setup( v3mux.Handle("/search", httputil.MakeAuthAPI("search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + if !cfg.Fulltext.Enabled { + return util.JSONResponse{ + Code: http.StatusNotImplemented, + JSON: jsonerror.Unknown("Search has been disabled by the server administrator."), + } + } return Search(req, device, syncDB, fts, req.FormValue("next_batch")) }), ).Methods(http.MethodPost, http.MethodOptions) diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index 85f114598..68dc95fd2 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -59,9 +59,32 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts if err != nil { return jsonerror.InternalServerError() } - rooms := joinedRooms + if len(joinedRooms) == 0 { + return util.JSONResponse{ + Code: http.StatusNotFound, + JSON: jsonerror.NotFound("User not joined to any rooms."), + } + } + joinedRoomsMap := make(map[string]struct{}, len(joinedRooms)) + for _, roomID := range joinedRooms { + joinedRoomsMap[roomID] = struct{}{} + } + rooms := []string{} if searchReq.SearchCategories.RoomEvents.Filter.Rooms != nil { - rooms = append(*searchReq.SearchCategories.RoomEvents.Filter.Rooms, joinedRooms...) + for _, roomID := range *searchReq.SearchCategories.RoomEvents.Filter.Rooms { + if _, ok := joinedRoomsMap[roomID]; ok { + rooms = append(rooms, roomID) + } + } + } else { + rooms = joinedRooms + } + + if len(rooms) == 0 { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Unknown("User not allowed to search in this room(s)."), + } } logrus.Debugf("Searching FTS for rooms %v - %s", rooms, searchReq.SearchCategories.RoomEvents.SearchTerm) diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index efb450eed..e26f28d4f 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -18,7 +18,6 @@ import ( "context" "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/internal/fulltext" "github.com/sirupsen/logrus" keyapi "github.com/matrix-org/dendrite/keyserver/api" @@ -48,11 +47,6 @@ func AddPublicRoutes( js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - fts, err := fulltext.New("/work/fts.bleve") - if err != nil { - logrus.WithError(err).Panicf("failed to create full text") - } - syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") @@ -104,7 +98,7 @@ func AddPublicRoutes( roomConsumer := consumers.NewOutputRoomEventConsumer( base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer, - fts, + base.Fulltext, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") @@ -112,7 +106,7 @@ func AddPublicRoutes( clientConsumer := consumers.NewOutputClientDataConsumer( base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.AccountDataStreamProvider, - userAPIReadUpdateProducer, fts, + userAPIReadUpdateProducer, base.Fulltext, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") @@ -149,6 +143,6 @@ func AddPublicRoutes( routing.Setup( base.PublicClientAPIMux, requestPool, syncDB, userAPI, - rsAPI, cfg, base.Caches, fts, + rsAPI, cfg, base.Caches, base.Fulltext, ) }