Implement /search, partitially

This commit is contained in:
Till Faelligen 2022-05-18 08:12:11 +02:00
parent 2329cf7478
commit 06d4d0b32b
10 changed files with 304 additions and 48 deletions

View file

@ -19,9 +19,12 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
@ -33,19 +36,23 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
) )
// OutputClientDataConsumer consumes events that originated in the client API server. // OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct { type OutputClientDataConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string nats *nats.Conn
topic string durable string
db storage.Database topic string
stream types.StreamProvider topicReIndex string
notifier *notifier.Notifier db storage.Database
serverName gomatrixserverlib.ServerName stream types.StreamProvider
producer *producers.UserAPIReadProducer notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
producer *producers.UserAPIReadProducer
fts *fulltext.Search
} }
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@ -53,26 +60,86 @@ func NewOutputClientDataConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.SyncAPI, cfg *config.SyncAPI,
js nats.JetStreamContext, js nats.JetStreamContext,
nats *nats.Conn,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, stream types.StreamProvider,
producer *producers.UserAPIReadProducer, producer *producers.UserAPIReadProducer,
fts *fulltext.Search,
) *OutputClientDataConsumer { ) *OutputClientDataConsumer {
return &OutputClientDataConsumer{ return &OutputClientDataConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"), topicReIndex: cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex),
db: store, durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
notifier: notifier, nats: nats,
stream: stream, db: store,
serverName: cfg.Matrix.ServerName, notifier: notifier,
producer: producer, stream: stream,
serverName: cfg.Matrix.ServerName,
producer: producer,
fts: fts,
} }
} }
// Start consuming from room servers // Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error { func (s *OutputClientDataConsumer) Start() error {
_, err := s.nats.Subscribe(s.topicReIndex, func(msg *nats.Msg) {
if err := msg.Ack(); err != nil {
return
}
ctx := context.Background()
logrus.Debugf("Starting to index events")
var offset int
start := time.Now()
count := 0
for {
evs, err := s.db.ReIndex(ctx, 1000, int64(offset))
if err != nil {
logrus.WithError(err).Errorf("unable to get events to index")
return
}
if len(evs) == 0 {
break
}
logrus.Debugf("Indexing %d events", len(evs))
elements := make([]fulltext.IndexElement, 0, len(evs))
for _, ev := range evs {
e := fulltext.IndexElement{
EventID: ev.EventID(),
RoomID: ev.RoomID(),
}
switch ev.Type() {
case "m.room.message":
e.Content = gjson.GetBytes(ev.Content(), "body").String()
case gomatrixserverlib.MRoomName:
e.Content = gjson.GetBytes(ev.Content(), "name").String()
case gomatrixserverlib.MRoomTopic:
e.Content = gjson.GetBytes(ev.Content(), "topic").String()
default:
continue
}
if strings.TrimSpace(e.Content) == "" {
continue
}
elements = append(elements, e)
}
if err = s.fts.BatchIndex(elements); err != nil {
logrus.WithError(err).Error("unable to index events")
continue
}
offset += len(evs)
count += len(elements)
}
logrus.Debugf("Indexed %d events in %v", count, time.Since(start))
})
if err != nil {
return err
}
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.DeliverAll(), nats.ManualAck(),

View file

@ -33,11 +33,11 @@ import (
type ContextRespsonse struct { type ContextRespsonse struct {
End string `json:"end"` End string `json:"end"`
Event gomatrixserverlib.ClientEvent `json:"event"` Event *gomatrixserverlib.ClientEvent `json:"event,omitempty"`
EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"` EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"`
EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"` EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"`
Start string `json:"start"` Start string `json:"start"`
State []gomatrixserverlib.ClientEvent `json:"state"` State []gomatrixserverlib.ClientEvent `json:"state,omitempty"`
} }
func Context( func Context(
@ -139,8 +139,9 @@ func Context(
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll) eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll)
newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache) newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache)
ev := gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll)
response := ContextRespsonse{ response := ContextRespsonse{
Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll), Event: &ev,
EventsAfter: eventsAfterClient, EventsAfter: eventsAfterClient,
EventsBefore: eventsBeforeClient, EventsBefore: eventsBeforeClient,
State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll), State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll),

View file

@ -100,7 +100,7 @@ func Setup(
v3mux.Handle("/search", v3mux.Handle("/search",
httputil.MakeAuthAPI("search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return Search(req, device, fts) return Search(req, device, syncDB, fts, req.FormValue("next_batch"))
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
} }

View file

@ -1,47 +1,164 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing package routing
import ( import (
"net/http" "net/http"
"strconv"
"github.com/blevesearch/bleve/v2/search"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
) )
func Search(req *http.Request, device *api.Device, fts *fulltext.Search) util.JSONResponse { func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts *fulltext.Search, from string) util.JSONResponse {
var searchReq SearchRequest var (
searchReq SearchRequest
err error
ctx = req.Context()
)
resErr := httputil.UnmarshalJSONRequest(req, &searchReq) resErr := httputil.UnmarshalJSONRequest(req, &searchReq)
if resErr != nil { if resErr != nil {
logrus.Error("failed to unmarshal search request")
return *resErr return *resErr
} }
result, err := fts.Search(searchReq.SearchCategories.RoomEvents.SearchTerm) nextBatch := 0
if from != "" {
nextBatch, err = strconv.Atoi(from)
if err != nil {
return jsonerror.InternalServerError()
}
}
if searchReq.SearchCategories.RoomEvents.Filter.Limit == 0 {
searchReq.SearchCategories.RoomEvents.Filter.Limit = 5
}
// only search rooms the user is actually joined to
joinedRooms, err := syncDB.RoomIDsWithMembership(ctx, device.UserID, "join")
if err != nil { if err != nil {
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
rooms := joinedRooms
if searchReq.SearchCategories.RoomEvents.Filter.Rooms != nil {
rooms = append(*searchReq.SearchCategories.RoomEvents.Filter.Rooms, joinedRooms...)
}
logrus.Debugf("Searching FTS for rooms %v - %s", rooms, searchReq.SearchCategories.RoomEvents.SearchTerm)
orderByTime := false
if searchReq.SearchCategories.RoomEvents.OrderBy == "recent" {
logrus.Debugf("Ordering by recently added")
orderByTime = true
}
result, err := fts.Search(
searchReq.SearchCategories.RoomEvents.SearchTerm,
rooms,
searchReq.SearchCategories.RoomEvents.Filter.Limit,
nextBatch,
orderByTime,
)
if err != nil {
logrus.WithError(err).Error("failed to search fulltext")
return jsonerror.InternalServerError()
}
logrus.Debugf("Search took %s", result.Took)
results := []Results{} results := []Results{}
for _, x := range result.Hits {
wantEvents := make([]string, len(result.Hits))
eventScore := make(map[string]*search.DocumentMatch)
for _, hit := range result.Hits {
logrus.Debugf("%+v\n", hit.Fields)
wantEvents = append(wantEvents, hit.ID)
eventScore[hit.ID] = hit
}
roomFilter := &gomatrixserverlib.RoomEventFilter{
Rooms: &rooms,
}
evs, err := syncDB.Events(ctx, wantEvents)
if err != nil {
logrus.WithError(err).Error("failed to get events from database")
return jsonerror.InternalServerError()
}
for _, event := range evs {
id, _, err := syncDB.SelectContextEvent(ctx, event.RoomID(), event.EventID())
if err != nil {
logrus.WithError(err).Error("failed to query context event")
return jsonerror.InternalServerError()
}
roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.BeforeLimit
eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, event.RoomID(), roomFilter)
if err != nil {
logrus.WithError(err).Error("failed to query before context event")
return jsonerror.InternalServerError()
}
roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.AfterLimit
_, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, event.RoomID(), roomFilter)
if err != nil {
logrus.WithError(err).Error("failed to query after context event")
return jsonerror.InternalServerError()
}
results = append(results, Results{ results = append(results, Results{
Rank: x.Score, Context: ContextRespsonse{
EventsAfter: gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatSync),
EventsBefore: gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatSync),
},
Rank: eventScore[event.EventID()].Score,
Result: Result{ Result: Result{
Content: Content{ Content: Content{
Body: x.String(), Body: gjson.GetBytes(event.Content(), "body").Str,
Format: gjson.GetBytes(event.Content(), "format").Str,
FormattedBody: gjson.GetBytes(event.Content(), "formatted_body").Str,
Msgtype: gjson.GetBytes(event.Content(), "msgtype").Str,
}, },
EventID: x.ID, EventID: event.EventID(),
OriginServerTs: event.OriginServerTS(),
RoomID: event.RoomID(),
Sender: event.Sender(),
Type: event.Type(),
Unsigned: event.Unsigned(),
}, },
}) })
} }
nb := ""
if int(result.Total) > nextBatch+len(results) {
nb = strconv.Itoa(len(results) + nextBatch)
}
res := SearchResponse{ res := SearchResponse{
SearchCategories: SearchCategories{ SearchCategories: SearchCategories{
RoomEvents: RoomEvents{ RoomEvents: RoomEvents{
Count: len(results), Count: int(result.Total),
Groups: Groups{}, Groups: Groups{},
Results: results, Results: results,
NextBatch: nb,
}, },
}, },
} }
@ -55,6 +172,11 @@ func Search(req *http.Request, device *api.Device, fts *fulltext.Search) util.JS
type SearchRequest struct { type SearchRequest struct {
SearchCategories struct { SearchCategories struct {
RoomEvents struct { RoomEvents struct {
EventContext struct {
AfterLimit int `json:"after_limit,omitempty"`
BeforeLimit int `json:"before_limit,omitempty"`
IncludeProfile bool `json:"include_profile,omitempty"`
} `json:"event_context"`
Filter gomatrixserverlib.StateFilter `json:"filter"` Filter gomatrixserverlib.StateFilter `json:"filter"`
Groupings struct { Groupings struct {
GroupBy []struct { GroupBy []struct {
@ -88,22 +210,22 @@ type Content struct {
FormattedBody string `json:"formatted_body"` FormattedBody string `json:"formatted_body"`
Msgtype string `json:"msgtype"` Msgtype string `json:"msgtype"`
} }
type Unsigned struct {
Age int `json:"age"`
}
type Result struct { type Result struct {
Content Content `json:"content"` Content Content `json:"content"`
EventID string `json:"event_id"` EventID string `json:"event_id"`
OriginServerTs int64 `json:"origin_server_ts"` OriginServerTs gomatrixserverlib.Timestamp `json:"origin_server_ts"`
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
Sender string `json:"sender"` Sender string `json:"sender"`
Type string `json:"type"` Type string `json:"type"`
Unsigned Unsigned `json:"unsigned"` Unsigned []byte `json:"unsigned,omitempty"`
} }
type Results struct { type Results struct {
Rank float64 `json:"rank"` Context ContextRespsonse `json:"context"`
Result Result `json:"result"` Rank float64 `json:"rank"`
Result Result `json:"result"`
} }
type RoomEvents struct { type RoomEvents struct {
Count int `json:"count"` Count int `json:"count"`
Groups Groups `json:"groups"` Groups Groups `json:"groups"`

View file

@ -157,6 +157,7 @@ type Database interface {
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error)
} }
type Presence interface { type Presence interface {

View file

@ -164,6 +164,8 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3" " ORDER BY id ASC LIMIT $3"
const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type = ANY($1) ORDER BY id ASC LIMIT $2 OFFSET $3"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
@ -178,6 +180,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt
selectSearchStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -200,6 +203,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
{&s.selectSearchStmt, selectSearchSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -617,3 +621,26 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
} }
return result, rows.Err() return result, rows.Err()
} }
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, pq.StringArray(types), limit, offset)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
var result []gomatrixserverlib.HeaderedEvent
var eventID string
for rows.Next() {
var ev gomatrixserverlib.HeaderedEvent
var eventBytes []byte
if err = rows.Scan(&eventID, &eventBytes); err != nil {
return nil, err
}
if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, err
}
result = append(result, ev)
}
return result, rows.Err()
}

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
"github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/internal/fulltext"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -396,8 +397,8 @@ func (d *Database) WriteEvent(
e := fulltext.IndexElement{ e := fulltext.IndexElement{
EventID: ev.EventID(), EventID: ev.EventID(),
Type: ev.Type(),
RoomID: ev.RoomID(), RoomID: ev.RoomID(),
Time: time.Now(),
} }
switch ev.Type() { switch ev.Type() {
@ -413,6 +414,7 @@ func (d *Database) WriteEvent(
} }
} }
if e.Content != "" { if e.Content != "" {
logrus.Debugf("Indexing element: %+v", e)
if err := d.FTS.Index(e); err != nil { if err := d.FTS.Index(e); err != nil {
logrus.WithError(err).Warn("failed to write to fulltext index") logrus.WithError(err).Warn("failed to write to fulltext index")
} }
@ -1079,6 +1081,14 @@ func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, igno
return s.Ignores.UpsertIgnores(ctx, userID, ignores) return s.Ignores.UpsertIgnores(ctx, userID, ignores)
} }
func (s *Database) ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.ReIndex(ctx, nil, limit, offset, []string{
gomatrixserverlib.MRoomName,
gomatrixserverlib.MRoomTopic,
"m.room.message",
})
}
func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
} }

View file

@ -114,6 +114,8 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type = ANY($1) LIMIT $2 OFFSET $3 ORDER BY id ASC"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
@ -124,6 +126,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt
selectSearchStmt *sql.Stmt
} }
func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) {
@ -143,6 +146,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
{&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
{&s.selectSearchStmt, selectSearchSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -613,3 +617,26 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
} }
return return
} }
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, types, limit, offset)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
var result []gomatrixserverlib.HeaderedEvent
var eventID string
for rows.Next() {
var ev gomatrixserverlib.HeaderedEvent
var eventBytes []byte
if err = rows.Scan(&eventID, &eventBytes); err != nil {
return nil, err
}
if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, err
}
result = append(result, ev)
}
return result, rows.Err()
}

View file

@ -67,6 +67,7 @@ type Events interface {
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error)
} }
// Topology keeps track of the depths and stream positions for all events. // Topology keeps track of the depths and stream positions for all events.

View file

@ -48,7 +48,7 @@ func AddPublicRoutes(
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
fts, err := fulltext.New("./fts.bleve") fts, err := fulltext.New("/work/fts.bleve")
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to create full text") logrus.WithError(err).Panicf("failed to create full text")
} }
@ -101,8 +101,8 @@ func AddPublicRoutes(
} }
clientConsumer := consumers.NewOutputClientDataConsumer( clientConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.AccountDataStreamProvider,
userAPIReadUpdateProducer, userAPIReadUpdateProducer, fts,
) )
if err = clientConsumer.Start(); err != nil { if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer") logrus.WithError(err).Panicf("failed to start client data consumer")