- Move to base

- Actually filter on correct rooms
- Error messages
This commit is contained in:
Till Faelligen 2022-05-19 10:11:35 +02:00
parent 74939ffb61
commit a72b8f22b5
4 changed files with 49 additions and 15 deletions

View file

@ -53,6 +53,7 @@ type OutputClientDataConsumer struct {
serverName gomatrixserverlib.ServerName
producer *producers.UserAPIReadProducer
fts *fulltext.Search
cfg *config.SyncAPI
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@ -80,6 +81,7 @@ func NewOutputClientDataConsumer(
serverName: cfg.Matrix.ServerName,
producer: producer,
fts: fts,
cfg: cfg,
}
}
@ -89,13 +91,18 @@ func (s *OutputClientDataConsumer) Start() error {
if err := msg.Ack(); err != nil {
return
}
if !s.cfg.Fulltext.Enabled {
logrus.Warn("Fulltext indexing is disabled")
return
}
ctx := context.Background()
logrus.Debugf("Starting to index events")
var offset int
start := time.Now()
count := 0
var id int64 = 0
for {
evs, err := s.db.ReIndex(ctx, 1000, int64(offset))
evs, err := s.db.ReIndex(ctx, 1000, id)
if err != nil {
logrus.WithError(err).Errorf("unable to get events to index")
return
@ -106,11 +113,14 @@ func (s *OutputClientDataConsumer) Start() error {
logrus.Debugf("Indexing %d events", len(evs))
elements := make([]fulltext.IndexElement, 0, len(evs))
for _, ev := range evs {
for streamPos, ev := range evs {
id = streamPos
e := fulltext.IndexElement{
EventID: ev.EventID(),
RoomID: ev.RoomID(),
StreamPosition: streamPos,
}
e.SetContentType(ev.Type())
switch ev.Type() {
case "m.room.message":

View file

@ -18,6 +18,7 @@ import (
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/httputil"
@ -100,6 +101,12 @@ func Setup(
v3mux.Handle("/search",
httputil.MakeAuthAPI("search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if !cfg.Fulltext.Enabled {
return util.JSONResponse{
Code: http.StatusNotImplemented,
JSON: jsonerror.Unknown("Search has been disabled by the server administrator."),
}
}
return Search(req, device, syncDB, fts, req.FormValue("next_batch"))
}),
).Methods(http.MethodPost, http.MethodOptions)

View file

@ -59,9 +59,32 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
if err != nil {
return jsonerror.InternalServerError()
}
rooms := joinedRooms
if len(joinedRooms) == 0 {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("User not joined to any rooms."),
}
}
joinedRoomsMap := make(map[string]struct{}, len(joinedRooms))
for _, roomID := range joinedRooms {
joinedRoomsMap[roomID] = struct{}{}
}
rooms := []string{}
if searchReq.SearchCategories.RoomEvents.Filter.Rooms != nil {
rooms = append(*searchReq.SearchCategories.RoomEvents.Filter.Rooms, joinedRooms...)
for _, roomID := range *searchReq.SearchCategories.RoomEvents.Filter.Rooms {
if _, ok := joinedRoomsMap[roomID]; ok {
rooms = append(rooms, roomID)
}
}
} else {
rooms = joinedRooms
}
if len(rooms) == 0 {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown("User not allowed to search in this room(s)."),
}
}
logrus.Debugf("Searching FTS for rooms %v - %s", rooms, searchReq.SearchCategories.RoomEvents.SearchTerm)

View file

@ -18,7 +18,6 @@ import (
"context"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/sirupsen/logrus"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
@ -48,11 +47,6 @@ func AddPublicRoutes(
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
fts, err := fulltext.New("/work/fts.bleve")
if err != nil {
logrus.WithError(err).Panicf("failed to create full text")
}
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
@ -104,7 +98,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer,
fts,
base.Fulltext,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@ -112,7 +106,7 @@ func AddPublicRoutes(
clientConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.AccountDataStreamProvider,
userAPIReadUpdateProducer, fts,
userAPIReadUpdateProducer, base.Fulltext,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
@ -149,6 +143,6 @@ func AddPublicRoutes(
routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI,
rsAPI, cfg, base.Caches, fts,
rsAPI, cfg, base.Caches, base.Fulltext,
)
}