diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go index fa00d5256..57b127f27 100644 --- a/src/github.com/matrix-org/dendrite/appservice/appservice.go +++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go @@ -51,13 +51,10 @@ func SetupAppServiceAPIComponent( // events to be sent out. workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) for i, appservice := range base.Cfg.Derived.ApplicationServices { - eventCount := 0 - m := sync.Mutex{} ws := types.ApplicationServiceWorkerState{ - AppService: appservice, - Cond: sync.NewCond(&m), - EventsReady: &eventCount, + AppService: appservice, + Cond: sync.NewCond(&m), } workerStates[i] = ws } diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index 9284eae1a..bc1d3bf20 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -167,7 +167,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( } else { // Tell our worker to send out new messages by updating remaining message // count and waking them up with a broadcast - ws.NotifyNewEvent() + ws.NotifyNewEvents() } } } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index e7322c283..285bbf483 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -107,12 +107,13 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( ) ( txnID, maxID int, events []gomatrixserverlib.Event, + eventsRemaining bool, err error, ) { // Retrieve events from the database. Unsuccessfully sent events first eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) if err != nil { - return 0, 0, nil, err + return } defer func() { err = eventRows.Close() @@ -122,15 +123,15 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( }).WithError(err).Fatalf("appservice unable to select new events to send") } }() - events, maxID, txnID, err = retrieveEvents(eventRows, limit) + events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit) if err != nil { - return 0, 0, nil, err + return } return } -func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, err error) { +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) { // Get current time for use in calculating event age nowMilli := time.Now().UnixNano() / int64(time.Millisecond) @@ -148,18 +149,18 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. &txnID, ) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, false, err } // Unmarshal eventJSON if err = json.Unmarshal(eventJSON, &event); err != nil { - return nil, 0, 0, err + return nil, 0, 0, false, err } // If txnID has changed on this event from the previous event, then we've // reached the end of a transaction's events. Return only those events. if lastTxnID > invalidTxnID && lastTxnID != txnID { - return events, maxID, lastTxnID, nil + return events, maxID, lastTxnID, true, nil } lastTxnID = txnID @@ -167,7 +168,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. if txnID == -1 { // Return if we've hit the limit if eventsProcessed++; eventsProcessed > limit { - return events, maxID, lastTxnID, nil + return events, maxID, lastTxnID, true, nil } } @@ -178,7 +179,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. // Portion of the event that is unsigned due to rapid change // TODO: Consider removing age as not many app services use it if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil { - return nil, 0, 0, err + return nil, 0, 0, false, err } events = append(events, event) diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go index eec2a0312..b68989fb1 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID( ctx context.Context, appServiceID string, limit int, -) (int, int, []gomatrixserverlib.Event, error) { +) (int, int, []gomatrixserverlib.Event, bool, error) { return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } diff --git a/src/github.com/matrix-org/dendrite/appservice/types/types.go b/src/github.com/matrix-org/dendrite/appservice/types/types.go index c66ea5cf3..aac731550 100644 --- a/src/github.com/matrix-org/dendrite/appservice/types/types.go +++ b/src/github.com/matrix-org/dendrite/appservice/types/types.go @@ -31,26 +31,33 @@ type ApplicationServiceWorkerState struct { AppService config.ApplicationService Cond *sync.Cond // Events ready to be sent - EventsReady *int + EventsReady bool // Backoff exponent (2^x secs). Max 6, aka 64s. Backoff int } -// NotifyNewEvent wakes up all waiting goroutines, notifying that a new event -// has been placed into the event queue for this application service worker. -// Additionally it increments EventsReady by one. -func (a *ApplicationServiceWorkerState) NotifyNewEvent() { +// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain +// in the event queue for this application service worker. +func (a *ApplicationServiceWorkerState) NotifyNewEvents() { a.Cond.L.Lock() - *a.EventsReady++ + a.EventsReady = true a.Cond.Broadcast() a.Cond.L.Unlock() } +// FinishEventProcessing marks all events of this worker as being sent to the +// application service. +func (a *ApplicationServiceWorkerState) FinishEventProcessing() { + a.Cond.L.Lock() + a.EventsReady = false + a.Cond.L.Unlock() +} + // WaitForNewEvents causes the calling goroutine to wait on the worker state's // condition for a broadcast or similar wakeup, if there are no events ready. func (a *ApplicationServiceWorkerState) WaitForNewEvents() { a.Cond.L.Lock() - if *a.EventsReady <= 0 { + if !a.EventsReady { a.Cond.Wait() } a.Cond.L.Unlock() diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index 5abb6bd54..8f966c949 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -84,9 +84,9 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { }).WithError(err).Fatal("appservice worker unable to read queued events from DB") return } - ws.Cond.L.Lock() - *ws.EventsReady = eventCount - ws.Cond.L.Unlock() + if eventCount > 0 { + ws.NotifyNewEvents() + } // Loop forever and keep waiting for more events to send for { @@ -94,7 +94,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.WaitForNewEvents() // Batch events up into a transaction - eventsCount, txnID, maxEventID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID) + transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID) if err != nil { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, @@ -115,9 +115,11 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // We sent successfully, hooray! ws.Backoff = 0 - ws.Cond.L.Lock() - *ws.EventsReady -= eventsCount - ws.Cond.L.Unlock() + // Transactions have a maximum event size, so there may still be some events + // left over to send. Keep sending until none are left + if !eventsRemaining { + ws.FinishEventProcessing() + } // Remove sent events from the DB err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) @@ -157,12 +159,13 @@ func createTransaction( db *storage.Database, appserviceID string, ) ( - eventsCount, txnID, maxID int, transactionJSON []byte, + txnID, maxID int, + eventsRemaining bool, err error, ) { // Retrieve the latest events from the DB (will return old events if they weren't successfully sent) - txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) + txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) if err != nil { log.WithFields(log.Fields{ "appservice": appserviceID, @@ -176,12 +179,12 @@ func createTransaction( // If not, grab next available ID from the DB txnID, err = db.GetLatestTxnID(ctx) if err != nil { - return 0, 0, 0, nil, err + return nil, 0, 0, false, err } // Mark new events with current transactionID if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil { - return 0, 0, 0, nil, err + return nil, 0, 0, false, err } } @@ -195,7 +198,6 @@ func createTransaction( return } - eventsCount = len(events) return }