diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 16b90d604..ab6848b43 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -51,7 +51,7 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); const selectEventsByApplicationServiceIDSQL = "" + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " + - "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC LIMIT $2" + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" const countEventsByApplicationServiceIDSQL = "" + "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" @@ -113,7 +113,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( err error, ) { // Retrieve events from the database. Unsuccessfully sent events first - eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) + eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) if err != nil { return 0, 0, nil, err } @@ -124,7 +124,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( applicationServiceID) } }() - events, maxID, txnID, err = retrieveEvents(eventRowsCurr) + events, maxID, txnID, err = retrieveEvents(eventRowsCurr, limit) if err != nil { return 0, 0, nil, err } @@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( return } -func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) { +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) { // Get current time for use in calculating event age nowMilli := time.Now().UnixNano() / int64(time.Millisecond) @@ -140,7 +140,7 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application // If txn_id changes dramatically, we've switched from collecting old events to // new ones. Send back those events first. lastTxnID := -2 // Invalid transaction ID - for eventRows.Next() { + for eventsProcessed := 0; eventRows.Next(); { var event gomatrixserverlib.ApplicationServiceEvent var eventContent sql.NullString var id int @@ -165,6 +165,14 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application } lastTxnID = txnID + // Limit events that aren't part of an old transaction + if txnID == -1 { + // Return if we've hit the limit + if eventsProcessed++; eventsProcessed > limit { + return events, maxID, lastTxnID, nil + } + } + if eventContent.Valid { event.Content = gomatrixserverlib.RawJSON(eventContent.String) } diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index 34800192d..04291a699 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -33,10 +33,6 @@ import ( var ( // Maximum size of events sent in each transaction. - // Warning, if this is lowered and a number of events greater than the previous - // batch size were still to be sent, then a number of events equal to the - // difference will be ignored by the app service. - // TL;DR: Don't lower this number with any AS events still left in the database. transactionBatchSize = 50 // Timeout for sending a single transaction to an application service. transactionTimeout = time.Second * 60