Use a bool for EventsReady instead of an int

This commit is contained in:
Andrew Morgan 2018-07-05 17:05:20 +01:00
parent 06bfc299ad
commit 496aa1eab8
6 changed files with 42 additions and 35 deletions

View file

@ -51,13 +51,10 @@ func SetupAppServiceAPIComponent(
// events to be sent out. // events to be sent out.
workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
for i, appservice := range base.Cfg.Derived.ApplicationServices { for i, appservice := range base.Cfg.Derived.ApplicationServices {
eventCount := 0
m := sync.Mutex{} m := sync.Mutex{}
ws := types.ApplicationServiceWorkerState{ ws := types.ApplicationServiceWorkerState{
AppService: appservice, AppService: appservice,
Cond: sync.NewCond(&m), Cond: sync.NewCond(&m),
EventsReady: &eventCount,
} }
workerStates[i] = ws workerStates[i] = ws
} }

View file

@ -167,7 +167,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
} else { } else {
// Tell our worker to send out new messages by updating remaining message // Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast // count and waking them up with a broadcast
ws.NotifyNewEvent() ws.NotifyNewEvents()
} }
} }
} }

View file

@ -107,12 +107,13 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
) ( ) (
txnID, maxID int, txnID, maxID int,
events []gomatrixserverlib.Event, events []gomatrixserverlib.Event,
eventsRemaining bool,
err error, err error,
) { ) {
// Retrieve events from the database. Unsuccessfully sent events first // Retrieve events from the database. Unsuccessfully sent events first
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
if err != nil { if err != nil {
return 0, 0, nil, err return
} }
defer func() { defer func() {
err = eventRows.Close() err = eventRows.Close()
@ -122,15 +123,15 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
}).WithError(err).Fatalf("appservice unable to select new events to send") }).WithError(err).Fatalf("appservice unable to select new events to send")
} }
}() }()
events, maxID, txnID, err = retrieveEvents(eventRows, limit) events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
if err != nil { if err != nil {
return 0, 0, nil, err return
} }
return return
} }
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, err error) { func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age // Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond) nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
@ -148,18 +149,18 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
&txnID, &txnID,
) )
if err != nil { if err != nil {
return nil, 0, 0, err return nil, 0, 0, false, err
} }
// Unmarshal eventJSON // Unmarshal eventJSON
if err = json.Unmarshal(eventJSON, &event); err != nil { if err = json.Unmarshal(eventJSON, &event); err != nil {
return nil, 0, 0, err return nil, 0, 0, false, err
} }
// If txnID has changed on this event from the previous event, then we've // If txnID has changed on this event from the previous event, then we've
// reached the end of a transaction's events. Return only those events. // reached the end of a transaction's events. Return only those events.
if lastTxnID > invalidTxnID && lastTxnID != txnID { if lastTxnID > invalidTxnID && lastTxnID != txnID {
return events, maxID, lastTxnID, nil return events, maxID, lastTxnID, true, nil
} }
lastTxnID = txnID lastTxnID = txnID
@ -167,7 +168,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
if txnID == -1 { if txnID == -1 {
// Return if we've hit the limit // Return if we've hit the limit
if eventsProcessed++; eventsProcessed > limit { if eventsProcessed++; eventsProcessed > limit {
return events, maxID, lastTxnID, nil return events, maxID, lastTxnID, true, nil
} }
} }
@ -178,7 +179,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
// Portion of the event that is unsigned due to rapid change // Portion of the event that is unsigned due to rapid change
// TODO: Consider removing age as not many app services use it // TODO: Consider removing age as not many app services use it
if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil { if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
return nil, 0, 0, err return nil, 0, 0, false, err
} }
events = append(events, event) events = append(events, event)

View file

@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID(
ctx context.Context, ctx context.Context,
appServiceID string, appServiceID string,
limit int, limit int,
) (int, int, []gomatrixserverlib.Event, error) { ) (int, int, []gomatrixserverlib.Event, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
} }

View file

@ -31,26 +31,33 @@ type ApplicationServiceWorkerState struct {
AppService config.ApplicationService AppService config.ApplicationService
Cond *sync.Cond Cond *sync.Cond
// Events ready to be sent // Events ready to be sent
EventsReady *int EventsReady bool
// Backoff exponent (2^x secs). Max 6, aka 64s. // Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int Backoff int
} }
// NotifyNewEvent wakes up all waiting goroutines, notifying that a new event // NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
// has been placed into the event queue for this application service worker. // in the event queue for this application service worker.
// Additionally it increments EventsReady by one. func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
func (a *ApplicationServiceWorkerState) NotifyNewEvent() {
a.Cond.L.Lock() a.Cond.L.Lock()
*a.EventsReady++ a.EventsReady = true
a.Cond.Broadcast() a.Cond.Broadcast()
a.Cond.L.Unlock() a.Cond.L.Unlock()
} }
// FinishEventProcessing marks all events of this worker as being sent to the
// application service.
func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
a.Cond.L.Lock()
a.EventsReady = false
a.Cond.L.Unlock()
}
// WaitForNewEvents causes the calling goroutine to wait on the worker state's // WaitForNewEvents causes the calling goroutine to wait on the worker state's
// condition for a broadcast or similar wakeup, if there are no events ready. // condition for a broadcast or similar wakeup, if there are no events ready.
func (a *ApplicationServiceWorkerState) WaitForNewEvents() { func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
a.Cond.L.Lock() a.Cond.L.Lock()
if *a.EventsReady <= 0 { if !a.EventsReady {
a.Cond.Wait() a.Cond.Wait()
} }
a.Cond.L.Unlock() a.Cond.L.Unlock()

View file

@ -84,9 +84,9 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
}).WithError(err).Fatal("appservice worker unable to read queued events from DB") }).WithError(err).Fatal("appservice worker unable to read queued events from DB")
return return
} }
ws.Cond.L.Lock() if eventCount > 0 {
*ws.EventsReady = eventCount ws.NotifyNewEvents()
ws.Cond.L.Unlock() }
// Loop forever and keep waiting for more events to send // Loop forever and keep waiting for more events to send
for { for {
@ -94,7 +94,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.WaitForNewEvents() ws.WaitForNewEvents()
// Batch events up into a transaction // Batch events up into a transaction
eventsCount, txnID, maxEventID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID) transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"appservice": ws.AppService.ID, "appservice": ws.AppService.ID,
@ -115,9 +115,11 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// We sent successfully, hooray! // We sent successfully, hooray!
ws.Backoff = 0 ws.Backoff = 0
ws.Cond.L.Lock() // Transactions have a maximum event size, so there may still be some events
*ws.EventsReady -= eventsCount // left over to send. Keep sending until none are left
ws.Cond.L.Unlock() if !eventsRemaining {
ws.FinishEventProcessing()
}
// Remove sent events from the DB // Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
@ -157,12 +159,13 @@ func createTransaction(
db *storage.Database, db *storage.Database,
appserviceID string, appserviceID string,
) ( ) (
eventsCount, txnID, maxID int,
transactionJSON []byte, transactionJSON []byte,
txnID, maxID int,
eventsRemaining bool,
err error, err error,
) { ) {
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent) // Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"appservice": appserviceID, "appservice": appserviceID,
@ -176,12 +179,12 @@ func createTransaction(
// If not, grab next available ID from the DB // If not, grab next available ID from the DB
txnID, err = db.GetLatestTxnID(ctx) txnID, err = db.GetLatestTxnID(ctx)
if err != nil { if err != nil {
return 0, 0, 0, nil, err return nil, 0, 0, false, err
} }
// Mark new events with current transactionID // Mark new events with current transactionID
if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil { if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
return 0, 0, 0, nil, err return nil, 0, 0, false, err
} }
} }
@ -195,7 +198,6 @@ func createTransaction(
return return
} }
eventsCount = len(events)
return return
} }