Remove FTS from storage layer

Fix query
This commit is contained in:
Till Faelligen 2022-05-18 17:01:04 +02:00
parent a6f2b46c98
commit b33d9e5045
5 changed files with 23 additions and 45 deletions

View file

@ -20,7 +20,6 @@ import (
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -37,7 +36,7 @@ type SyncServerDatasource struct {
} }
// NewDatabase creates a new sync server database // NewDatabase creates a new sync server database
func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, fts *fulltext.Search) (*SyncServerDatasource, error) { func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource var d SyncServerDatasource
var err error var err error
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
@ -122,7 +121,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
NotificationData: notificationData, NotificationData: notificationData,
Ignores: ignores, Ignores: ignores,
Presence: presence, Presence: presence,
FTS: fts,
} }
return &d, nil return &d, nil
} }

View file

@ -20,15 +20,12 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/fulltext"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -52,7 +49,6 @@ type Database struct {
NotificationData tables.NotificationData NotificationData tables.NotificationData
Ignores tables.Ignores Ignores tables.Ignores
Presence tables.Presence Presence tables.Presence
FTS *fulltext.Search
} }
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -394,32 +390,6 @@ func (d *Database) WriteEvent(
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition) return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
}) })
e := fulltext.IndexElement{
EventID: ev.EventID(),
RoomID: ev.RoomID(),
StreamPosition: int64(pduPosition),
}
e.SetContentType(ev.Type())
switch ev.Type() {
case "m.room.message":
e.Content = gjson.GetBytes(ev.Content(), "body").String()
case gomatrixserverlib.MRoomName:
e.Content = gjson.GetBytes(ev.Content(), "name").String()
case gomatrixserverlib.MRoomTopic:
e.Content = gjson.GetBytes(ev.Content(), "topic").String()
case gomatrixserverlib.MRoomRedaction:
if err := d.FTS.Delete(ev.Redacts()); err != nil {
logrus.WithError(err).Warn("failed to delete entry from fulltext index")
}
}
if e.Content != "" {
logrus.Debugf("Indexing element: %+v", e)
if err := d.FTS.Index(e); err != nil {
logrus.WithError(err).Warn("failed to write to fulltext index")
}
}
return pduPosition, returnErr return pduPosition, returnErr
} }

View file

@ -114,7 +114,7 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type = ANY($1) LIMIT $2 OFFSET $3 ORDER BY id ASC" const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) LIMIT $2 OFFSET $3 ORDER BY id ASC"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
db *sql.DB db *sql.DB
@ -126,7 +126,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt
selectSearchStmt *sql.Stmt //selectSearchStmt *sql.Stmt - prepared at runtime
} }
func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) {
@ -146,7 +146,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
{&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
{&s.selectSearchStmt, selectSearchSQL}, //{&s.selectSearchStmt, selectSearchSQL}, - prepared at runtime
}.Prepare(db) }.Prepare(db)
} }
@ -619,7 +619,20 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
} }
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) { func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, types, limit, offset) params := make([]interface{}, len(types))
for i := range types {
params[i] = types[i]
}
params = append(params, limit)
params = append(params, offset)
selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1)
stmt, err := s.db.Prepare(selectSQL)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -18,7 +18,6 @@ package sqlite3
import ( import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -37,7 +36,7 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database // NewDatabase creates a new sync server database
// nolint: gocyclo // nolint: gocyclo
func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, fts *fulltext.Search) (*SyncServerDatasource, error) { func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource var d SyncServerDatasource
var err error var err error
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil {
@ -46,7 +45,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err = d.prepare(dbProperties); err != nil { if err = d.prepare(dbProperties); err != nil {
return nil, err return nil, err
} }
d.FTS = fts
return &d, nil return &d, nil
} }

View file

@ -20,7 +20,6 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/postgres"
@ -28,12 +27,12 @@ import (
) )
// NewSyncServerDatasource opens a database connection. // NewSyncServerDatasource opens a database connection.
func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, fts *fulltext.Search) (Database, error) { func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(base, dbProperties, fts) return sqlite3.NewDatabase(base, dbProperties)
case dbProperties.ConnectionString.IsPostgres(): case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(base, dbProperties, fts) return postgres.NewDatabase(base, dbProperties)
default: default:
return nil, fmt.Errorf("unexpected database type") return nil, fmt.Errorf("unexpected database type")
} }