Handle transaction event limit in loop

This commit is contained in:
Andrew Morgan 2018-06-24 23:33:34 +01:00
parent 9f863de5d6
commit 801b58c927
2 changed files with 13 additions and 9 deletions

View file

@ -51,7 +51,7 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC LIMIT $2"
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
@ -113,7 +113,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
err error,
) {
// Retrieve events from the database. Unsuccessfully sent events first
eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
if err != nil {
return 0, 0, nil, err
}
@ -124,7 +124,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
applicationServiceID)
}
}()
events, maxID, txnID, err = retrieveEvents(eventRowsCurr)
events, maxID, txnID, err = retrieveEvents(eventRowsCurr, limit)
if err != nil {
return 0, 0, nil, err
}
@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
return
}
func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) {
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
@ -140,7 +140,7 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application
// If txn_id changes dramatically, we've switched from collecting old events to
// new ones. Send back those events first.
lastTxnID := -2 // Invalid transaction ID
for eventRows.Next() {
for eventsProcessed := 0; eventRows.Next(); {
var event gomatrixserverlib.ApplicationServiceEvent
var eventContent sql.NullString
var id int
@ -165,6 +165,14 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application
}
lastTxnID = txnID
// Limit events that aren't part of an old transaction
if txnID == -1 {
// Return if we've hit the limit
if eventsProcessed++; eventsProcessed > limit {
return events, maxID, lastTxnID, nil
}
}
if eventContent.Valid {
event.Content = gomatrixserverlib.RawJSON(eventContent.String)
}

View file

@ -33,10 +33,6 @@ import (
var (
// Maximum size of events sent in each transaction.
// Warning, if this is lowered and a number of events greater than the previous
// batch size were still to be sent, then a number of events equal to the
// difference will be ignored by the app service.
// TL;DR: Don't lower this number with any AS events still left in the database.
transactionBatchSize = 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 60