Order relay queue select to return oldest entries first

This commit is contained in:
Devon Hudson 2023-01-17 16:47:10 -07:00
parent 7c692e7eda
commit 5aa6b47a64
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
4 changed files with 45 additions and 1 deletions

View file

@ -52,6 +52,7 @@ const deleteQueueEntriesSQL = "" +
const selectQueueEntriesSQL = "" +
"SELECT json_nid FROM relayapi_queue" +
" WHERE server_name = $1" +
" ORDER BY json_nid" +
" LIMIT $2"
const selectQueueEntryCountSQL = "" +

View file

@ -109,7 +109,8 @@ func (d *Database) GetTransaction(
ctx context.Context,
userID gomatrixserverlib.UserID,
) (*gomatrixserverlib.Transaction, *receipt.Receipt, error) {
nids, err := d.RelayQueue.SelectQueueEntries(ctx, nil, userID.Domain(), 1)
entriesRequested := 1
nids, err := d.RelayQueue.SelectQueueEntries(ctx, nil, userID.Domain(), entriesRequested)
if err != nil {
return nil, nil, fmt.Errorf("d.SelectQueueEntries: %w", err)
}

View file

@ -53,6 +53,7 @@ const deleteQueueEntriesSQL = "" +
const selectQueueEntriesSQL = "" +
"SELECT json_nid FROM relayapi_queue" +
" WHERE server_name = $1" +
" ORDER BY json_nid" +
" LIMIT $2"
const selectQueueEntryCountSQL = "" +

View file

@ -107,6 +107,47 @@ func TestShouldRetrieveInsertedQueueTransaction(t *testing.T) {
})
}
func TestShouldRetrieveOldestInsertedQueueTransaction(t *testing.T) {
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close := mustCreateQueueTable(t, dbType)
defer close()
transactionID := gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano()))
serverName := gomatrixserverlib.ServerName("domain")
nid := int64(2)
err := db.Table.InsertQueueEntry(ctx, nil, transactionID, serverName, nid)
if err != nil {
t.Fatalf("Failed inserting transaction: %s", err.Error())
}
transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano()))
serverName = gomatrixserverlib.ServerName("domain")
oldestNID := int64(1)
err = db.Table.InsertQueueEntry(ctx, nil, transactionID, serverName, oldestNID)
if err != nil {
t.Fatalf("Failed inserting transaction: %s", err.Error())
}
retrievedNids, err := db.Table.SelectQueueEntries(ctx, nil, serverName, 1)
if err != nil {
t.Fatalf("Failed retrieving transaction: %s", err.Error())
}
assert.Equal(t, oldestNID, retrievedNids[0])
assert.Equal(t, 1, len(retrievedNids))
retrievedNids, err = db.Table.SelectQueueEntries(ctx, nil, serverName, 10)
if err != nil {
t.Fatalf("Failed retrieving transaction: %s", err.Error())
}
assert.Equal(t, oldestNID, retrievedNids[0])
assert.Equal(t, nid, retrievedNids[1])
assert.Equal(t, 2, len(retrievedNids))
})
}
func TestShouldDeleteQueueTransaction(t *testing.T) {
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {