Make use of sqlutil.StatementList, dedupe NIDs, timeout test

This commit is contained in:
Till Faelligen 2022-12-22 15:43:29 +01:00
parent 455321dcbf
commit 2e5ef9c711
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
12 changed files with 91 additions and 182 deletions

View file

@ -334,7 +334,13 @@ func TestPurgeRoom(t *testing.T) {
// wait for all consumers to process the purge event // wait for all consumers to process the purge event
var sum = 1 var sum = 1
timeout := time.Second * 5
deadline, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for sum > 0 { for sum > 0 {
if deadline.Err() != nil {
t.Fatalf("test timed out after %s", timeout)
}
sum = 0 sum = 0
consumerCh := jsCtx.Consumers(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) consumerCh := jsCtx.Consumers(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
for x := range consumerCh { for x := range consumerCh {

View file

@ -160,10 +160,15 @@ func (s *purgeStatements) PurgeStateBlocks(
if err != nil { if err != nil {
return err return err
} }
params := make([]interface{}, len(stateBlockNIDs))
params := make([]interface{}, len(stateBlockNIDs)+1) seenNIDs := make(map[types.StateBlockNID]struct{}, len(stateBlockNIDs))
// dedupe NIDs
for k, v := range stateBlockNIDs { for k, v := range stateBlockNIDs {
if _, ok := seenNIDs[v]; ok {
continue
}
params[k] = v params[k] = v
seenNIDs[v] = struct{}{}
} }
query := "DELETE FROM roomserver_state_block WHERE state_block_nid IN($1)" query := "DELETE FROM roomserver_state_block WHERE state_block_nid IN($1)"

View file

@ -63,19 +63,12 @@ func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremiti
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertBackwardExtremityStmt, insertBackwardExtremitySQL},
} {&s.selectBackwardExtremitiesForRoomStmt, selectBackwardExtremitiesForRoomSQL},
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { {&s.deleteBackwardExtremityStmt, deleteBackwardExtremitySQL},
return nil, err {&s.purgeBackwardExtremitiesStmt, purgeBackwardExtremitiesSQL},
} }.Prepare(db)
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return nil, err
}
if s.purgeBackwardExtremitiesStmt, err = db.Prepare(purgeBackwardExtremitiesSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(

View file

@ -79,22 +79,13 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertInviteEventStmt, insertInviteEventSQL},
} {&s.selectInviteEventsInRangeStmt, selectInviteEventsInRangeSQL},
if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { {&s.deleteInviteEventStmt, deleteInviteEventSQL},
return nil, err {&s.selectMaxInviteIDStmt, selectMaxInviteIDSQL},
} {&s.purgeInvitesStmt, purgeInvitesSQL},
if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil { }.Prepare(db)
return nil, err
}
if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
return nil, err
}
if s.purgeInvitesStmt, err = db.Prepare(purgeInvitesSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *inviteEventsStatements) InsertInviteEvent( func (s *inviteEventsStatements) InsertInviteEvent(

View file

@ -65,8 +65,8 @@ const selectPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1" " WHERE event_id = $1"
// Select the max topological position for the room, then sort by stream position and take the highest, // Select the max topological position for the room, then sort by stream position and take the highest,
// returning both topological and stream positions. // returning both topological and stream positions.
const selectMaxPositionInTopologySQL = "" + const selectMaxPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE topological_position=(" + " WHERE topological_position=(" +
@ -99,31 +99,16 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertEventInTopologyStmt, insertEventInTopologySQL},
} {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL},
if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL},
return nil, err {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL},
} {&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL},
if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL},
return nil, err {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL},
} {&s.purgeEventsTopologyStmt, purgeEventsTopologySQL},
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { }.Prepare(db)
return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err
}
if s.purgeEventsTopologyStmt, err = db.Prepare(purgeEventsTopologySQL); err != nil {
return nil, err
}
return s, nil
} }
// InsertEventInTopology inserts the given event in the room's topology, based // InsertEventInTopology inserts the given event in the room's topology, based

View file

@ -87,28 +87,15 @@ func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
s := &peekStatements{ s := &peekStatements{
db: db, db: db,
} }
if s.insertPeekStmt, err = db.Prepare(insertPeekSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertPeekStmt, insertPeekSQL},
} {&s.deletePeekStmt, deletePeekSQL},
if s.deletePeekStmt, err = db.Prepare(deletePeekSQL); err != nil { {&s.deletePeeksStmt, deletePeeksSQL},
return nil, err {&s.selectPeeksInRangeStmt, selectPeeksInRangeSQL},
} {&s.selectPeekingDevicesStmt, selectPeekingDevicesSQL},
if s.deletePeeksStmt, err = db.Prepare(deletePeeksSQL); err != nil { {&s.selectMaxPeekIDStmt, selectMaxPeekIDSQL},
return nil, err {&s.purgePeeksStmt, purgePeeksSQL},
} }.Prepare(db)
if s.selectPeeksInRangeStmt, err = db.Prepare(selectPeeksInRangeSQL); err != nil {
return nil, err
}
if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil {
return nil, err
}
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
return nil, err
}
if s.purgePeeksStmt, err = db.Prepare(purgePeeksSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *peekStatements) InsertPeek( func (s *peekStatements) InsertPeek(

View file

@ -90,19 +90,12 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
r := &receiptStatements{ r := &receiptStatements{
db: db, db: db,
} }
if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { return r, sqlutil.StatementList{
return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err) {&r.upsertReceipt, upsertReceipt},
} {&r.selectRoomReceipts, selectRoomReceipts},
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { {&r.selectMaxReceiptID, selectMaxReceiptIDSQL},
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) {&r.purgeReceiptsStmt, purgeReceiptsSQL},
} }.Prepare(db)
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
}
if r.purgeReceiptsStmt, err = db.Prepare(purgeReceiptsSQL); err != nil {
return nil, fmt.Errorf("unable to prepare purgeReceiptsStmt statement: %w", err)
}
return r, nil
} }
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {

View file

@ -66,19 +66,12 @@ func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertBackwardExtremityStmt, insertBackwardExtremitySQL},
} {&s.selectBackwardExtremitiesForRoomStmt, selectBackwardExtremitiesForRoomSQL},
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { {&s.deleteBackwardExtremityStmt, deleteBackwardExtremitySQL},
return nil, err {&s.purgeBackwardExtremitiesStmt, purgeBackwardExtremitiesSQL},
} }.Prepare(db)
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return nil, err
}
if s.purgeBackwardExtremitiesStmt, err = db.Prepare(purgeBackwardExtremitiesSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(

View file

@ -79,22 +79,13 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Inv
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertInviteEventStmt, insertInviteEventSQL},
} {&s.selectInviteEventsInRangeStmt, selectInviteEventsInRangeSQL},
if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { {&s.deleteInviteEventStmt, deleteInviteEventSQL},
return nil, err {&s.selectMaxInviteIDStmt, selectMaxInviteIDSQL},
} {&s.purgeInvitesStmt, purgeInvitesSQL},
if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil { }.Prepare(db)
return nil, err
}
if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
return nil, err
}
if s.purgeInvitesStmt, err = db.Prepare(purgeInvitesSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *inviteEventsStatements) InsertInviteEvent( func (s *inviteEventsStatements) InsertInviteEvent(

View file

@ -94,31 +94,16 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertEventInTopologyStmt, insertEventInTopologySQL},
} {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL},
if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL},
return nil, err {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL},
} {&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL},
if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL},
return nil, err {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL},
} {&s.purgeEventsTopologyStmt, purgeEventsTopologySQL},
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { }.Prepare(db)
return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err
}
if s.purgeEventsTopologyStmt, err = db.Prepare(purgeEventsTopologySQL); err != nil {
return nil, err
}
return s, nil
} }
// insertEventInTopology inserts the given event in the room's topology, based // insertEventInTopology inserts the given event in the room's topology, based

View file

@ -88,28 +88,15 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *StreamIDStatements) (tables.Peeks
db: db, db: db,
streamIDStatements: streamID, streamIDStatements: streamID,
} }
if s.insertPeekStmt, err = db.Prepare(insertPeekSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertPeekStmt, insertPeekSQL},
} {&s.deletePeekStmt, deletePeekSQL},
if s.deletePeekStmt, err = db.Prepare(deletePeekSQL); err != nil { {&s.deletePeeksStmt, deletePeeksSQL},
return nil, err {&s.selectPeeksInRangeStmt, selectPeeksInRangeSQL},
} {&s.selectPeekingDevicesStmt, selectPeekingDevicesSQL},
if s.deletePeeksStmt, err = db.Prepare(deletePeeksSQL); err != nil { {&s.selectMaxPeekIDStmt, selectMaxPeekIDSQL},
return nil, err {&s.purgePeeksStmt, purgePeeksSQL},
} }.Prepare(db)
if s.selectPeeksInRangeStmt, err = db.Prepare(selectPeeksInRangeSQL); err != nil {
return nil, err
}
if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil {
return nil, err
}
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
return nil, err
}
if s.purgePeeksStmt, err = db.Prepare(purgePeeksSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *peekStatements) InsertPeek( func (s *peekStatements) InsertPeek(

View file

@ -88,19 +88,12 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Re
db: db, db: db,
streamIDStatements: streamID, streamIDStatements: streamID,
} }
if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { return r, sqlutil.StatementList{
return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err) {&r.upsertReceipt, upsertReceipt},
} {&r.selectRoomReceipts, selectRoomReceipts},
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { {&r.selectMaxReceiptID, selectMaxReceiptIDSQL},
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) {&r.purgeReceiptsStmt, purgeReceiptsSQL},
} }.Prepare(db)
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
}
if r.purgeReceiptsStmt, err = db.Prepare(purgeReceiptsSQL); err != nil {
return nil, fmt.Errorf("unable to prepare purgeReceiptsStmt statement: %w", err)
}
return r, nil
} }
// UpsertReceipt creates new user receipts // UpsertReceipt creates new user receipts