diff --git a/go.mod b/go.mod index 4f9b77784..d4baac122 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/MFAshby/stdemuxerhook v1.0.0 github.com/Masterminds/semver/v3 v3.1.1 - github.com/blevesearch/bleve/v2 v2.3.2 // indirect + github.com/blevesearch/bleve/v2 v2.3.2 github.com/codeclysm/extract v2.2.0+incompatible github.com/containerd/containerd v1.6.2 // indirect github.com/docker/docker v20.10.14+incompatible diff --git a/internal/fulltext/bleve.go b/internal/fulltext/bleve.go index 5dca20809..2265c2ef2 100644 --- a/internal/fulltext/bleve.go +++ b/internal/fulltext/bleve.go @@ -15,32 +15,42 @@ package fulltext import ( - "strings" - "time" - "github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2/analysis/lang/en" "github.com/blevesearch/bleve/v2/search/query" + "github.com/matrix-org/gomatrixserverlib" ) // Search contains all existing bleve.Index type Search struct { - MessageIndex bleve.Index + FulltextIndex bleve.Index } // IndexElement describes the layout of an element to index type IndexElement struct { - EventID string `json:"event_id,omitempty"` - RoomID string `json:"room_id,omitempty"` - Content string `json:"content,omitempty"` - Time time.Time `json:"timestamp,omitempty"` + EventID string + RoomID string + Content string + ContentType string + StreamPosition int64 +} + +// SetContentType sets i.ContentType given an identifier +func (i *IndexElement) SetContentType(v string) { + switch v { + case "m.room.message": + i.ContentType = "content.body" + case gomatrixserverlib.MRoomName: + i.ContentType = "content.name" + case gomatrixserverlib.MRoomTopic: + i.ContentType = "content.topic" + } } // New opens a new/existing fulltext index -func New(path string) (*Search, error) { - fts := &Search{} - var err error - fts.MessageIndex, err = openIndex(path) +func New(path string) (fts *Search, err error) { + fts = &Search{} + fts.FulltextIndex, err = openIndex(path) if err != nil { return nil, err } @@ -49,17 +59,17 @@ func New(path string) (*Search, error) { // Close closes the fulltext index func (f *Search) Close() error { - return f.MessageIndex.Close() + return f.FulltextIndex.Close() } -// Index indexes a given element +// FulltextIndex indexes a given element func (f *Search) Index(e IndexElement) error { - return f.MessageIndex.Index(e.EventID, e) + return f.FulltextIndex.Index(e.EventID, e) } // BatchIndex indexes the given elements func (f *Search) BatchIndex(elements []IndexElement) error { - batch := f.MessageIndex.NewBatch() + batch := f.FulltextIndex.NewBatch() for _, element := range elements { err := batch.Index(element.EventID, element) @@ -67,40 +77,54 @@ func (f *Search) BatchIndex(elements []IndexElement) error { return err } } - return f.MessageIndex.Batch(batch) + return f.FulltextIndex.Batch(batch) } // Delete deletes an indexed element by the eventID func (f *Search) Delete(eventID string) error { - return f.MessageIndex.Delete(eventID) + return f.FulltextIndex.Delete(eventID) } -// Search searches the index given a search term -func (f *Search) Search(term string, roomIDs []string, limit, from int, orderByTime bool) (*bleve.SearchResult, error) { - terms := strings.Split(term, " ") - +// Search searches the index given a search term, roomIDs and keys. +func (f *Search) Search(term string, roomIDs, keys []string, limit, from int, orderByStreamPos bool) (*bleve.SearchResult, error) { qry := bleve.NewConjunctionQuery() - for _, t := range terms { - qry.AddQuery(bleve.NewQueryStringQuery(t)) - } + termQuery := bleve.NewBooleanQuery() + matchQuery := bleve.NewMatchQuery(term) + matchQuery.SetField("Content") + termQuery.AddMust(matchQuery) + qry.AddQuery(termQuery) + + roomQuery := bleve.NewBooleanQuery() for _, roomID := range roomIDs { roomSearch := bleve.NewMatchQuery(roomID) - roomSearch.SetField("room_id") - roomSearch.SetOperator(query.MatchQueryOperatorAnd) - qry.AddQuery(roomSearch) + roomSearch.SetField("RoomID") + roomSearch.SetOperator(query.MatchQueryOperatorOr) + roomQuery.AddShould(roomSearch) + } + if len(roomIDs) > 0 { + qry.AddQuery(roomQuery) + } + keyQuery := bleve.NewBooleanQuery() + for _, key := range keys { + keySearch := bleve.NewMatchQuery(key) + keySearch.SetField("ContentType") + keySearch.SetOperator(query.MatchQueryOperatorOr) + keyQuery.AddShould(keySearch) + } + if len(keys) > 0 { + keyQuery.SetMinShould(1) + qry.AddQuery(keyQuery) } - s := bleve.NewSearchRequest(qry) - s.Size = limit - s.From = from + s := bleve.NewSearchRequestOptions(qry, limit, from, false) s.SortBy([]string{"_score"}) - if orderByTime { - s.SortBy([]string{"-timestamp"}) + if orderByStreamPos { + s.SortBy([]string{"-StreamPosition"}) } - return f.MessageIndex.Search(s) + return f.FulltextIndex.Search(s) } func openIndex(path string) (bleve.Index, error) { @@ -112,22 +136,17 @@ func openIndex(path string) (bleve.Index, error) { enFieldMapping.Analyzer = en.AnalyzerName eventMapping := bleve.NewDocumentMapping() + eventMapping.AddFieldMappingsAt("Content", enFieldMapping) + eventMapping.AddFieldMappingsAt("StreamPosition", bleve.NewNumericFieldMapping()) - eventMapping.AddFieldMappingsAt("content", enFieldMapping) - eventMapping.AddFieldMappingsAt("room_id", bleve.NewTextFieldMapping()) - - idMapping := bleve.NewTextFieldMapping() - idMapping.IncludeInAll = false - idMapping.Index = false - idMapping.IncludeTermVectors = false - idMapping.SkipFreqNorm = true - eventMapping.AddFieldMappingsAt("event_id", idMapping) + idFieldMapping := bleve.NewKeywordFieldMapping() + eventMapping.AddFieldMappingsAt("ContentType", idFieldMapping) + eventMapping.AddFieldMappingsAt("RoomID", idFieldMapping) + eventMapping.AddFieldMappingsAt("EventID", idFieldMapping) mapping := bleve.NewIndexMapping() - mapping.AddDocumentMapping("event", eventMapping) - mapping.DefaultType = "event" - mapping.TypeField = "type" - mapping.DefaultAnalyzer = "en" + mapping.AddDocumentMapping("Event", eventMapping) + mapping.DefaultType = "Event" index, err := bleve.New(path, mapping) if err != nil { @@ -135,20 +154,3 @@ func openIndex(path string) (bleve.Index, error) { } return index, nil } - -type IndexElements []IndexElement - -// Len implements sort.Interface -func (ie IndexElements) Len() int { - return len(ie) -} - -// Less implements sort.Interface -func (ie IndexElements) Less(i, j int) bool { - return ie[i].Time.After(ie[j].Time) -} - -// Swap implements sort.Interface -func (ie IndexElements) Swap(i, j int) { - ie[i], ie[j] = ie[j], ie[i] -} diff --git a/internal/fulltext/bleve_test.go b/internal/fulltext/bleve_test.go index 850f2696e..21db78e7e 100644 --- a/internal/fulltext/bleve_test.go +++ b/internal/fulltext/bleve_test.go @@ -15,119 +15,225 @@ package fulltext_test import ( - "sort" + "reflect" "testing" - "time" "github.com/matrix-org/dendrite/internal/fulltext" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) -func TestSearch(t *testing.T) { - // create new index - dataDir := t.TempDir() - fts, err := fulltext.New(dataDir) +func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search { + t.Helper() + fts, err := fulltext.New(tempDir) if err != nil { t.Fatal("failed to open fulltext index:", err) } - if err = fts.Close(); err != nil { + return fts +} + +func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) (eventIDs, roomIDs []string) { + t.Helper() + // create some more random data + var batchItems []fulltext.IndexElement + streamPos := firstStreamPos + + wantRoomID := util.RandomString(16) + + for i := 0; i < 30; i++ { + streamPos++ + eventID := util.RandomString(16) + // Create more data for the first room + if i > 15 { + wantRoomID = util.RandomString(16) + } + e := fulltext.IndexElement{ + EventID: eventID, + RoomID: wantRoomID, + Content: "lorem ipsum", + StreamPosition: streamPos, + } + e.SetContentType("m.room.message") + batchItems = append(batchItems, e) + roomIDs = append(roomIDs, wantRoomID) + eventIDs = append(eventIDs, eventID) + } + e := fulltext.IndexElement{ + EventID: util.RandomString(16), + RoomID: wantRoomID, + Content: "Roomname testing", + StreamPosition: streamPos, + } + e.SetContentType(gomatrixserverlib.MRoomName) + batchItems = append(batchItems, e) + e = fulltext.IndexElement{ + EventID: util.RandomString(16), + RoomID: wantRoomID, + Content: "Room topic fulltext", + StreamPosition: streamPos, + } + e.SetContentType(gomatrixserverlib.MRoomTopic) + batchItems = append(batchItems, e) + if err := fts.BatchIndex(batchItems); err != nil { + t.Fatalf("failed to batch insert elements: %v", err) + } + return eventIDs, roomIDs +} + +func TestOpen(t *testing.T) { + dataDir := t.TempDir() + fts := mustOpenIndex(t, dataDir) + if err := fts.Close(); err != nil { t.Fatal("unable to close fulltext index", err) } // open existing index - fts, err = fulltext.New(dataDir) - if err != nil { - t.Fatal("failed to open fulltext index:", err) - } + fts = mustOpenIndex(t, dataDir) + defer fts.Close() +} + +func TestIndex(t *testing.T) { + fts := mustOpenIndex(t, t.TempDir()) defer fts.Close() - if fts == nil { - t.Fatal("fts is nil") - } // add some data + var streamPos int64 = 1 roomID := util.RandomString(8) - e := fulltext.IndexElement{ - EventID: util.RandomString(16), - RoomID: roomID, - Content: "lorem ipsum", - Time: time.Now(), - } - - if err = fts.Index(e); err != nil { - t.Fatal("failed to index element", err) - } - eventID := util.RandomString(16) - e = fulltext.IndexElement{ - EventID: eventID, - RoomID: roomID, - Content: "lorem ipsum", - Time: time.Now(), + e := fulltext.IndexElement{ + EventID: eventID, + RoomID: roomID, + Content: "lorem ipsum", + StreamPosition: streamPos, } + e.SetContentType("m.room.message") - if err = fts.Index(e); err != nil { + if err := fts.Index(e); err != nil { t.Fatal("failed to index element", err) } - // search data - res, err := fts.Search("lorem", nil, 10, 0, false) - if err != nil { - t.Fatal(err) - } - if res.Total != 2 { - t.Fatalf("expected %d results, got %d", 2, res.Total) - } - - // remove element - if err = fts.Delete(eventID); err != nil { - t.Fatal(err) - } - // create some more random data - var batchItems []fulltext.IndexElement - - wantRoomID := util.RandomString(8) - for i := 0; i < 30; i++ { - eventID = util.RandomString(16) - e = fulltext.IndexElement{ - EventID: eventID, - RoomID: wantRoomID, - Content: "lorem ipsum", - Time: time.Now(), - } - batchItems = append(batchItems, e) - } - - // Index the data - if err = fts.BatchIndex(batchItems); err != nil { - t.Fatal("failed to batch index") - } - - // search for lorem, but only in a given room - searchRooms := []string{roomID} - res, err = fts.Search("lorem", searchRooms, 10, 0, false) - if err != nil { - t.Fatal(err) - } - if res.Total != 1 { - t.Fatalf("expected %d results, got %d", 1, res.Total) - } - - // can get sorted results - res, err = fts.Search("lorem", []string{wantRoomID}, 10, 0, true) - if err != nil { - t.Fatal(err) - } - - if res.Hits[0].ID != eventID { - t.Fatalf("expected %s to be first, got %s", eventID, res.Hits[0].ID) - } - - sort.Sort(fulltext.IndexElements(batchItems)) - if eventID != batchItems[0].EventID { - t.Fatalf("expected %s to be first, got %s", eventID, batchItems[0].EventID) - } - - // test back pagination - + mustAddTestData(t, fts, streamPos) +} + +func TestDelete(t *testing.T) { + fts := mustOpenIndex(t, t.TempDir()) + defer fts.Close() + eventIDs, roomIDs := mustAddTestData(t, fts, 0) + res1, err := fts.Search("lorem", roomIDs[:1], nil, 50, 0, false) + if err != nil { + t.Fatal(err) + } + + if err = fts.Delete(eventIDs[0]); err != nil { + t.Fatal(err) + } + + res2, err := fts.Search("lorem", roomIDs[:1], nil, 50, 0, false) + if err != nil { + t.Fatal(err) + } + + if res1.Total <= res2.Total { + t.Fatalf("got unexpected result: %d <= %d", res1.Total, res2.Total) + } +} + +func TestSearch(t *testing.T) { + type args struct { + term string + keys []string + limit int + from int + orderByStreamPos bool + roomIndex []int + } + tests := []struct { + name string + args args + wantCount int + wantErr bool + }{ + { + name: "Can search for many results in one room", + wantCount: 16, + args: args{ + term: "lorem", + roomIndex: []int{0}, + limit: 20, + }, + }, + { + name: "Can search for one result in one room", + wantCount: 1, + args: args{ + term: "lorem", + roomIndex: []int{16}, + limit: 20, + }, + }, + { + name: "Can search for many results in multiple rooms", + wantCount: 17, + args: args{ + term: "lorem", + roomIndex: []int{0, 16}, + limit: 20, + }, + }, + { + name: "Can search for many results in all rooms, reversed", + wantCount: 30, + args: args{ + term: "lorem", + limit: 30, + orderByStreamPos: true, + }, + }, + { + name: "Can search for specific search room name", + wantCount: 1, + args: args{ + term: "testing", + roomIndex: []int{}, + limit: 20, + keys: []string{"content.name"}, + }, + }, + { + name: "Can search for specific search room topic", + wantCount: 1, + args: args{ + term: "fulltext", + roomIndex: []int{}, + limit: 20, + keys: []string{"content.topic"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := mustOpenIndex(t, t.TempDir()) + eventIDs, roomIDs := mustAddTestData(t, f, 0) + var searchRooms []string + for _, x := range tt.args.roomIndex { + searchRooms = append(searchRooms, roomIDs[x]) + } + t.Logf("searching in rooms: %v - %v\n", searchRooms, tt.args.keys) + + got, err := f.Search(tt.args.term, searchRooms, tt.args.keys, tt.args.limit, tt.args.from, tt.args.orderByStreamPos) + if (err != nil) != tt.wantErr { + t.Errorf("Search() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(len(got.Hits), tt.wantCount) { + t.Errorf("Search() got = %v, want %v", len(got.Hits), tt.wantCount) + } + if tt.args.orderByStreamPos { + if got.Hits[0].ID != eventIDs[29] { + t.Fatalf("expected ID %s, got %s", eventIDs[29], got.Hits[0].ID) + } + } + }) + } } diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index 4cac3ee12..85f114598 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -75,6 +75,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts result, err := fts.Search( searchReq.SearchCategories.RoomEvents.SearchTerm, rooms, + []string{}, searchReq.SearchCategories.RoomEvents.Filter.Limit, nextBatch, orderByTime, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index c627bdae2..c6fef5d09 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -19,7 +19,6 @@ import ( "database/sql" "encoding/json" "fmt" - "time" "github.com/matrix-org/dendrite/internal/fulltext" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -396,10 +395,11 @@ func (d *Database) WriteEvent( }) e := fulltext.IndexElement{ - EventID: ev.EventID(), - RoomID: ev.RoomID(), - Time: time.Now(), + EventID: ev.EventID(), + RoomID: ev.RoomID(), + StreamPosition: int64(pduPosition), } + e.SetContentType(ev.Type()) switch ev.Type() { case "m.room.message":