DB tweaks

This commit is contained in:
Till Faelligen 2022-05-19 08:34:26 +02:00
parent d2af5cfa1d
commit baacb8ac6e
6 changed files with 20 additions and 23 deletions

View file

@ -157,7 +157,7 @@ type Database interface {
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error)
} }
type Presence interface { type Presence interface {

View file

@ -164,7 +164,7 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3" " ORDER BY id ASC LIMIT $3"
const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type = ANY($1) ORDER BY id ASC LIMIT $2 OFFSET $3" const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
@ -622,25 +622,26 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return result, rows.Err() return result, rows.Err()
} }
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) { func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, pq.StringArray(types), limit, offset) rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
var result []gomatrixserverlib.HeaderedEvent
var eventID string var eventID string
var id int64
result := make(map[int64]gomatrixserverlib.HeaderedEvent)
for rows.Next() { for rows.Next() {
var ev gomatrixserverlib.HeaderedEvent var ev gomatrixserverlib.HeaderedEvent
var eventBytes []byte var eventBytes []byte
if err = rows.Scan(&eventID, &eventBytes); err != nil { if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
return nil, err return nil, err
} }
if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, err return nil, err
} }
result = append(result, ev) result[id] = ev
} }
return result, rows.Err() return result, rows.Err()
} }

View file

@ -1051,8 +1051,8 @@ func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, igno
return s.Ignores.UpsertIgnores(ctx, userID, ignores) return s.Ignores.UpsertIgnores(ctx, userID, ignores)
} }
func (s *Database) ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error) { func (s *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.ReIndex(ctx, nil, limit, offset, []string{ return s.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomName,
gomatrixserverlib.MRoomTopic, gomatrixserverlib.MRoomTopic,
"m.room.message", "m.room.message",

View file

@ -114,7 +114,7 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) LIMIT $2 OFFSET $3 ORDER BY id ASC" const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
db *sql.DB db *sql.DB
@ -618,13 +618,13 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
return return
} }
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) { func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
params := make([]interface{}, len(types)) params := make([]interface{}, len(types))
for i := range types { for i := range types {
params[i] = types[i] params[i] = types[i]
} }
params = append(params, afterID)
params = append(params, limit) params = append(params, limit)
params = append(params, offset)
selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1) selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1)
stmt, err := s.db.Prepare(selectSQL) stmt, err := s.db.Prepare(selectSQL)
@ -638,18 +638,19 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
} }
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
var result []gomatrixserverlib.HeaderedEvent
var eventID string var eventID string
var id int64
result := make(map[int64]gomatrixserverlib.HeaderedEvent)
for rows.Next() { for rows.Next() {
var ev gomatrixserverlib.HeaderedEvent var ev gomatrixserverlib.HeaderedEvent
var eventBytes []byte var eventBytes []byte
if err = rows.Scan(&eventID, &eventBytes); err != nil { if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
return nil, err return nil, err
} }
if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, err return nil, err
} }
result = append(result, ev) result[id] = ev
} }
return result, rows.Err() return result, rows.Err()
} }

View file

@ -6,7 +6,6 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -17,20 +16,16 @@ import (
var ctx = context.Background() var ctx = context.Background()
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
fts, err := fulltext.New(t.TempDir())
if err != nil {
t.Fatal("failed to create full text")
}
connStr, close := test.PrepareDBConnectionString(t, dbType) connStr, close := test.PrepareDBConnectionString(t, dbType)
db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{ db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr), ConnectionString: config.DataSource(connStr),
}, fts) })
if err != nil { if err != nil {
t.Fatalf("NewSyncServerDatasource returned %s", err) t.Fatalf("NewSyncServerDatasource returned %s", err)
} }
return db, func() { return db, func() {
close() close()
fts.Close()
} }
} }

View file

@ -67,7 +67,7 @@ type Events interface {
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error)
} }
// Topology keeps track of the depths and stream positions for all events. // Topology keeps track of the depths and stream positions for all events.