Add a few tests, remove dead code

This commit is contained in:
Till Faelligen 2023-04-03 11:32:16 +02:00
parent 675926967d
commit c0a6137bf3
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
5 changed files with 160 additions and 96 deletions

View file

@ -135,15 +135,6 @@ FROM room_ids,
) AS x ) AS x
` `
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $8"
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -205,7 +196,6 @@ type outputRoomEventsStatements struct {
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeFilteredStmt *sql.Stmt selectStateInRangeFilteredStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt updateEventJSONStmt *sql.Stmt
@ -261,7 +251,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectMaxEventIDStmt, selectMaxEventIDSQL}, {&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
{&s.selectRecentEventsStmt, selectRecentEventsSQL}, {&s.selectRecentEventsStmt, selectRecentEventsSQL},
{&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL}, {&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
{&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
{&s.selectStateInRangeFilteredStmt, selectStateInRangeFilteredSQL}, {&s.selectStateInRangeFilteredStmt, selectStateInRangeFilteredSQL},
{&s.selectStateInRangeStmt, selectStateInRangeSQL}, {&s.selectStateInRangeStmt, selectStateInRangeSQL},
{&s.updateEventJSONStmt, updateEventJSONSQL}, {&s.updateEventJSONStmt, updateEventJSONSQL},
@ -529,39 +518,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return result, rows.Err() return result, rows.Err()
} }
// selectEarlyEvents returns the earliest events in the given room, starting
// from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
) ([]types.StreamEvent, error) {
senders, notSenders := getSendersRoomEventFilter(eventFilter)
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
rows, err := stmt.QueryContext(
ctx, roomID, r.Low(), r.High(),
pq.StringArray(senders),
pq.StringArray(notSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
eventFilter.Limit,
)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectEarlyEvents: rows.close() failed")
events, err := rowsToStreamEvents(rows)
if err != nil {
return nil, err
}
// The events need to be returned from oldest to latest, which isn't
// necessarily the way the SQL query returns them, so a sort is necessary to
// ensure the events are in the right order in the slice.
sort.SliceStable(events, func(i int, j int) bool {
return events[i].StreamPosition < events[j].StreamPosition
})
return events, nil
}
// selectEvents returns the events for the given event IDs. If an event is // selectEvents returns the events for the given event IDs. If an event is
// missing from the database, it will be omitted. // missing from the database, it will be omitted.
func (s *outputRoomEventsStatements) SelectEvents( func (s *outputRoomEventsStatements) SelectEvents(

View file

@ -0,0 +1,103 @@
package shared_test
import (
"context"
"reflect"
"testing"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
)
func newSyncDB(t *testing.T, dbType test.DBType) (storage.Database, func()) {
t.Helper()
cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType)
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
syncDB, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
if err != nil {
t.Fatalf("failed to create sync DB: %s", err)
}
return syncDB, closeDB
}
func TestFilterTable(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
tab, closeDB := newSyncDB(t, dbType)
defer closeDB()
// initially create a filter
filter := &gomatrixserverlib.Filter{}
filterID, err := tab.PutFilter(context.Background(), "alice", filter)
if err != nil {
t.Fatal(err)
}
// create the same filter again, we should receive the existing filter
secondFilterID, err := tab.PutFilter(context.Background(), "alice", filter)
if err != nil {
t.Fatal(err)
}
if secondFilterID != filterID {
t.Fatalf("expected second filter to be the same as the first: %s vs %s", filterID, secondFilterID)
}
// query the filter again
targetFilter := &gomatrixserverlib.Filter{}
if err = tab.GetFilter(context.Background(), targetFilter, "alice", filterID); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(filter, targetFilter) {
t.Fatalf("%#v vs %#v", filter, targetFilter)
}
// query non-existent filter
if err = tab.GetFilter(context.Background(), targetFilter, "bob", filterID); err == nil {
t.Fatalf("expected filter to not exist, but it does exist: %v", targetFilter)
}
})
}
func TestIgnores(t *testing.T) {
alice := test.NewUser(t)
bob := test.NewUser(t)
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
syncDB, closeDB := newSyncDB(t, dbType)
defer closeDB()
tab, err := syncDB.NewDatabaseTransaction(context.Background())
if err != nil {
t.Fatal(err)
}
defer tab.Rollback() // nolint: errcheck
ignoredUsers := &types.IgnoredUsers{List: map[string]interface{}{
bob.ID: "",
}}
if err = tab.UpdateIgnoresForUser(context.Background(), alice.ID, ignoredUsers); err != nil {
t.Fatal(err)
}
gotIgnoredUsers, err := tab.IgnoresForUser(context.Background(), alice.ID)
if err != nil {
t.Fatal(err)
}
// verify the ignored users matches those we stored
if !reflect.DeepEqual(gotIgnoredUsers, ignoredUsers) {
t.Fatalf("%#v vs %#v", gotIgnoredUsers, ignoredUsers)
}
// Bob doesn't have any ignored users, so should receive sql.ErrNoRows
if _, err = tab.IgnoresForUser(context.Background(), bob.ID); err == nil {
t.Fatalf("expected an error but got none")
}
})
}

View file

@ -28,7 +28,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -81,12 +80,6 @@ const selectRecentEventsForSyncSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -118,7 +111,7 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC" const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type IN ($2)"
const purgeEventsSQL = "" + const purgeEventsSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1" "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
@ -429,42 +422,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return result, nil return result, nil
} }
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
) ([]types.StreamEvent, error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectEarlyEventsSQL,
[]interface{}{
roomID, r.Low(), r.High(),
},
eventFilter.Senders, eventFilter.NotSenders,
eventFilter.Types, eventFilter.NotTypes,
nil, eventFilter.ContainsURL, eventFilter.Limit, FilterOrderAsc,
)
if err != nil {
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectEarlyEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectEarlyEvents: rows.close() failed")
events, err := rowsToStreamEvents(rows)
if err != nil {
return nil, err
}
// The events need to be returned from oldest to latest, which isn't
// necessarily the way the SQL query returns them, so a sort is necessary to
// ensure the events are in the right order in the slice.
sort.SliceStable(events, func(i int, j int) bool {
return events[i].StreamPosition < events[j].StreamPosition
})
return events, nil
}
// selectEvents returns the events for the given event IDs. If an event is // selectEvents returns the events for the given event IDs. If an event is
// missing from the database, it will be omitted. // missing from the database, it will be omitted.
func (s *outputRoomEventsStatements) SelectEvents( func (s *outputRoomEventsStatements) SelectEvents(
@ -685,18 +642,18 @@ func (s *outputRoomEventsStatements) PurgeEvents(
} }
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) { func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
params := make([]interface{}, len(types)) params := make([]interface{}, len(types)+1)
params[0] = afterID
for i := range types { for i := range types {
params[i] = types[i] params[i+1] = types[i]
} }
params = append(params, afterID)
params = append(params, limit)
selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1)
stmt, err := s.db.Prepare(selectSQL) selectSQL := strings.Replace(selectSearchSQL, "($2)", sqlutil.QueryVariadicOffset(len(types), 1), 1)
stmt, params, err := prepareWithFilters(s.db, txn, selectSQL, params, nil, nil, nil, nil, nil, nil, int(limit), FilterOrderAsc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed") defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed")
rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, params...) rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, params...)
if err != nil { if err != nil {

View file

@ -67,8 +67,6 @@ type Events interface {
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
// SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *gomatrixserverlib.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *gomatrixserverlib.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely. // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.

View file

@ -103,3 +103,53 @@ func TestOutputRoomEventsTable(t *testing.T) {
} }
}) })
} }
func TestReindex(t *testing.T) {
ctx := context.Background()
alice := test.NewUser(t)
room := test.NewRoom(t, alice)
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomName, map[string]interface{}{
"name": "my new room name",
}, test.WithStateKey(""))
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomTopic, map[string]interface{}{
"topic": "my new room topic",
}, test.WithStateKey(""))
room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{
"msgbody": "my room message",
"type": "m.text",
})
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
tab, db, close := newOutputRoomEventsTable(t, dbType)
defer close()
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
for _, ev := range room.Events() {
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
if err != nil {
return fmt.Errorf("failed to InsertEvent: %s", err)
}
}
return nil
})
if err != nil {
t.Fatalf("err: %s", err)
}
events, err := tab.ReIndex(ctx, nil, 10, 0, []string{
gomatrixserverlib.MRoomName,
gomatrixserverlib.MRoomTopic,
"m.room.message"})
if err != nil {
t.Fatal(err)
}
wantEventCount := 3
if len(events) != wantEventCount {
t.Fatalf("expected %d events, got %d", wantEventCount, len(events))
}
})
}