From 5aa6b47a64f0920a60d50359e0262633d3d534dc Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 17 Jan 2023 16:47:10 -0700 Subject: [PATCH] Order relay queue select to return oldest entries first --- .../storage/postgres/relay_queue_table.go | 1 + relayapi/storage/shared/storage.go | 3 +- relayapi/storage/sqlite3/relay_queue_table.go | 1 + .../storage/tables/relay_queue_table_test.go | 41 +++++++++++++++++++ 4 files changed, 45 insertions(+), 1 deletion(-) diff --git a/relayapi/storage/postgres/relay_queue_table.go b/relayapi/storage/postgres/relay_queue_table.go index a8ba72902..e97cf8cc0 100644 --- a/relayapi/storage/postgres/relay_queue_table.go +++ b/relayapi/storage/postgres/relay_queue_table.go @@ -52,6 +52,7 @@ const deleteQueueEntriesSQL = "" + const selectQueueEntriesSQL = "" + "SELECT json_nid FROM relayapi_queue" + " WHERE server_name = $1" + + " ORDER BY json_nid" + " LIMIT $2" const selectQueueEntryCountSQL = "" + diff --git a/relayapi/storage/shared/storage.go b/relayapi/storage/shared/storage.go index 824f092fc..fe1a86cfb 100644 --- a/relayapi/storage/shared/storage.go +++ b/relayapi/storage/shared/storage.go @@ -109,7 +109,8 @@ func (d *Database) GetTransaction( ctx context.Context, userID gomatrixserverlib.UserID, ) (*gomatrixserverlib.Transaction, *receipt.Receipt, error) { - nids, err := d.RelayQueue.SelectQueueEntries(ctx, nil, userID.Domain(), 1) + entriesRequested := 1 + nids, err := d.RelayQueue.SelectQueueEntries(ctx, nil, userID.Domain(), entriesRequested) if err != nil { return nil, nil, fmt.Errorf("d.SelectQueueEntries: %w", err) } diff --git a/relayapi/storage/sqlite3/relay_queue_table.go b/relayapi/storage/sqlite3/relay_queue_table.go index 72a2d64c3..49c6b4de5 100644 --- a/relayapi/storage/sqlite3/relay_queue_table.go +++ b/relayapi/storage/sqlite3/relay_queue_table.go @@ -53,6 +53,7 @@ const deleteQueueEntriesSQL = "" + const selectQueueEntriesSQL = "" + "SELECT json_nid FROM relayapi_queue" + " WHERE server_name = $1" + + " ORDER BY json_nid" + " LIMIT $2" const selectQueueEntryCountSQL = "" + diff --git a/relayapi/storage/tables/relay_queue_table_test.go b/relayapi/storage/tables/relay_queue_table_test.go index dc45c02af..99f9922c0 100644 --- a/relayapi/storage/tables/relay_queue_table_test.go +++ b/relayapi/storage/tables/relay_queue_table_test.go @@ -107,6 +107,47 @@ func TestShouldRetrieveInsertedQueueTransaction(t *testing.T) { }) } +func TestShouldRetrieveOldestInsertedQueueTransaction(t *testing.T) { + ctx := context.Background() + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, close := mustCreateQueueTable(t, dbType) + defer close() + + transactionID := gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano())) + serverName := gomatrixserverlib.ServerName("domain") + nid := int64(2) + err := db.Table.InsertQueueEntry(ctx, nil, transactionID, serverName, nid) + if err != nil { + t.Fatalf("Failed inserting transaction: %s", err.Error()) + } + + transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano())) + serverName = gomatrixserverlib.ServerName("domain") + oldestNID := int64(1) + err = db.Table.InsertQueueEntry(ctx, nil, transactionID, serverName, oldestNID) + if err != nil { + t.Fatalf("Failed inserting transaction: %s", err.Error()) + } + + retrievedNids, err := db.Table.SelectQueueEntries(ctx, nil, serverName, 1) + if err != nil { + t.Fatalf("Failed retrieving transaction: %s", err.Error()) + } + + assert.Equal(t, oldestNID, retrievedNids[0]) + assert.Equal(t, 1, len(retrievedNids)) + + retrievedNids, err = db.Table.SelectQueueEntries(ctx, nil, serverName, 10) + if err != nil { + t.Fatalf("Failed retrieving transaction: %s", err.Error()) + } + + assert.Equal(t, oldestNID, retrievedNids[0]) + assert.Equal(t, nid, retrievedNids[1]) + assert.Equal(t, 2, len(retrievedNids)) + }) +} + func TestShouldDeleteQueueTransaction(t *testing.T) { ctx := context.Background() test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {