From baacb8ac6e668b06804e558206c7c88d42c10a76 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Thu, 19 May 2022 08:34:26 +0200 Subject: [PATCH] DB tweaks --- syncapi/storage/interface.go | 2 +- .../storage/postgres/output_room_events_table.go | 13 +++++++------ syncapi/storage/shared/syncserver.go | 4 ++-- syncapi/storage/sqlite3/output_room_events_table.go | 13 +++++++------ syncapi/storage/storage_test.go | 9 ++------- syncapi/storage/tables/interface.go | 2 +- 6 files changed, 20 insertions(+), 23 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index cd1b8ad21..4a1579350 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -157,7 +157,7 @@ type Database interface { IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error - ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error) + ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) } type Presence interface { diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 003e39e47..669e7b14d 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -164,7 +164,7 @@ const selectContextAfterEventSQL = "" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " ORDER BY id ASC LIMIT $3" -const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type = ANY($1) ORDER BY id ASC LIMIT $2 OFFSET $3" +const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3" type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt @@ -622,25 +622,26 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { return result, rows.Err() } -func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) { - rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, pq.StringArray(types), limit, offset) +func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) { + rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") - var result []gomatrixserverlib.HeaderedEvent var eventID string + var id int64 + result := make(map[int64]gomatrixserverlib.HeaderedEvent) for rows.Next() { var ev gomatrixserverlib.HeaderedEvent var eventBytes []byte - if err = rows.Scan(&eventID, &eventBytes); err != nil { + if err = rows.Scan(&id, &eventID, &eventBytes); err != nil { return nil, err } if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, err } - result = append(result, ev) + result[id] = ev } return result, rows.Err() } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 96d2bc9a4..2f11ff4f6 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1051,8 +1051,8 @@ func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, igno return s.Ignores.UpsertIgnores(ctx, userID, ignores) } -func (s *Database) ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.ReIndex(ctx, nil, limit, offset, []string{ +func (s *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) { + return s.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{ gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomTopic, "m.room.message", diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 39ec26339..f40e8b5b4 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -114,7 +114,7 @@ const selectContextAfterEventSQL = "" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters -const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) LIMIT $2 OFFSET $3 ORDER BY id ASC" +const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC" type outputRoomEventsStatements struct { db *sql.DB @@ -618,13 +618,13 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [ return } -func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) { +func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) { params := make([]interface{}, len(types)) for i := range types { params[i] = types[i] } + params = append(params, afterID) params = append(params, limit) - params = append(params, offset) selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1) stmt, err := s.db.Prepare(selectSQL) @@ -638,18 +638,19 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l } defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") - var result []gomatrixserverlib.HeaderedEvent var eventID string + var id int64 + result := make(map[int64]gomatrixserverlib.HeaderedEvent) for rows.Next() { var ev gomatrixserverlib.HeaderedEvent var eventBytes []byte - if err = rows.Scan(&eventID, &eventBytes); err != nil { + if err = rows.Scan(&id, &eventID, &eventBytes); err != nil { return nil, err } if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, err } - result = append(result, ev) + result[id] = ev } return result, rows.Err() } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 3bb5f015b..6f846d45e 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -6,7 +6,6 @@ import ( "reflect" "testing" - "github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" @@ -17,20 +16,16 @@ import ( var ctx = context.Background() func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { - fts, err := fulltext.New(t.TempDir()) - if err != nil { - t.Fatal("failed to create full text") - } + connStr, close := test.PrepareDBConnectionString(t, dbType) db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), - }, fts) + }) if err != nil { t.Fatalf("NewSyncServerDatasource returned %s", err) } return db, func() { close() - fts.Close() } } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index c4f657149..7a0e95062 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -67,7 +67,7 @@ type Events interface { SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) - ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) + ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) } // Topology keeps track of the depths and stream positions for all events.