diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index c5349232a..b62338d29 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -76,10 +76,8 @@ func (s *OutputRoomEventConsumer) Start() error { return s.roomServerConsumer.Start() } -// onMessage is called when the sync server receives a new event from the room -// server output log. It is not safe for this function to be called from -// multiple goroutines, or else the sync stream position may race and be -// incorrectly calculated. +// onMessage is called when the appservice component receives a new event from +// the room server output log. func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent @@ -186,10 +184,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( } else { // Tell our worker to send out new messages by updating remaining message // count and waking them up with a broadcast - ws.Cond.L.Lock() - *ws.EventsReady++ - ws.Cond.Broadcast() - ws.Cond.L.Unlock() + ws.NotifyNewEvent() } } } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index c0a1f296b..16b90d604 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -17,7 +17,6 @@ package storage import ( "context" "database/sql" - "fmt" "time" "github.com/matrix-org/gomatrixserverlib" @@ -156,7 +155,6 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application &txnID, ) if err != nil { - fmt.Println("Failed:", err.Error()) return nil, 0, 0, err } @@ -213,12 +211,6 @@ func (s *eventsStatements) insertEvent( appServiceID string, event *gomatrixserverlib.Event, ) (err error) { - // If event has no content, strip the json - content := event.Content() - if string(content) == "{\"disable\":true}" { - content = []byte("{}") - } - _, err = s.insertEventStmt.ExecContext( ctx, appServiceID, @@ -227,7 +219,7 @@ func (s *eventsStatements) insertEvent( event.RoomID(), event.Type(), event.Sender(), - content, + event.Content(), -1, // No transaction ID yet ) return diff --git a/src/github.com/matrix-org/dendrite/appservice/types/types.go b/src/github.com/matrix-org/dendrite/appservice/types/types.go index 93ad9e142..c66ea5cf3 100644 --- a/src/github.com/matrix-org/dendrite/appservice/types/types.go +++ b/src/github.com/matrix-org/dendrite/appservice/types/types.go @@ -18,6 +18,11 @@ import ( "github.com/matrix-org/dendrite/common/config" ) +const ( + // AppServiceDeviceID is the AS dummy device ID + AppServiceDeviceID = "AS_Device" +) + // ApplicationServiceWorkerState is a type that couples an application service, // a lockable condition as well as some other state variables, allowing the // roomserver to notify appservice workers when there are events ready to send @@ -31,7 +36,22 @@ type ApplicationServiceWorkerState struct { Backoff int } -const ( - // AppServiceDeviceID is the AS dummy device ID - AppServiceDeviceID = "AS_Device" -) +// NotifyNewEvent wakes up all waiting goroutines, notifying that a new event +// has been placed into the event queue for this application service worker. +// Additionally it increments EventsReady by one. +func (a *ApplicationServiceWorkerState) NotifyNewEvent() { + a.Cond.L.Lock() + *a.EventsReady++ + a.Cond.Broadcast() + a.Cond.L.Unlock() +} + +// WaitForNewEvents causes the calling goroutine to wait on the worker state's +// condition for a broadcast or similar wakeup, if there are no events ready. +func (a *ApplicationServiceWorkerState) WaitForNewEvents() { + a.Cond.L.Lock() + if *a.EventsReady <= 0 { + a.Cond.Wait() + } + a.Cond.L.Unlock() +} diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index 47bb78d91..34800192d 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -66,15 +66,18 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { - log.Infof("Starting Application Service %s", ws.AppService.ID) + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).Info("starting application service") ctx := context.Background() // Initialize transaction ID counter var err error currentTransactionID, err = db.GetTxnIDWithAppServiceID(ctx, ws.AppService.ID) if err != nil && err != sql.ErrNoRows { - log.WithError(err).Fatalf("appservice %s worker unable to get latest transaction ID from DB", - ws.AppService.ID) + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to get latest transaction ID from DB") return } @@ -86,8 +89,9 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // Initial check for any leftover events to send from last time eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) if err != nil { - log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB", - ws.AppService.ID) + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to read queued events from DB") return } ws.Cond.L.Lock() @@ -97,17 +101,14 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // Loop forever and keep waiting for more events to send for { // Wait for more events if we've sent all the events in the database - if *ws.EventsReady <= 0 { - ws.Cond.L.Lock() - ws.Cond.Wait() - ws.Cond.L.Unlock() - } + ws.WaitForNewEvents() // Batch events up into a transaction eventsCount, txnID, maxEventID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID) if err != nil { - log.WithError(err).Fatalf("appservice %s worker unable to create transaction", - ws.AppService.ID) + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to create transaction") return } @@ -131,16 +132,18 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // Remove sent events from the DB err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) if err != nil { - log.WithError(err).Fatalf("unable to remove appservice events from the database for %s", - ws.AppService.ID) + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("unable to remove appservice events from the database") return } // Update transactionID currentTransactionID++ if err = db.UpsertTxnIDWithAppServiceID(ctx, ws.AppService.ID, currentTransactionID); err != nil { - log.WithError(err).Fatalf("unable to update transaction ID for %s", - ws.AppService.ID) + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("unable to update transaction ID") return } } @@ -152,8 +155,10 @@ func backoff(ws *types.ApplicationServiceWorkerState, err error) { backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff))) backoffSeconds := time.Second * backoffDuration - log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds", - ws.AppService.ID, backoffDuration) + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds", + backoffDuration) ws.Backoff++ if ws.Backoff > 6 { @@ -178,8 +183,9 @@ func createTransaction( // Retrieve the latest events from the DB (will return old events if they weren't successfully sent) txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) if err != nil { - log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB", - appserviceID) + log.WithFields(log.Fields{ + "appservice": appserviceID, + }).WithError(err).Fatalf("appservice worker unable to read queued events from DB") return } @@ -225,14 +231,16 @@ func send( defer func() { err := resp.Body.Close() if err != nil { - log.WithError(err).Errorf("Unable to close response body from application service %s", appservice.ID) + log.WithFields(log.Fields{ + "appservice": appservice.ID, + }).WithError(err).Error("unable to close response body from application service") } }() // Check the AS received the events correctly if resp.StatusCode != http.StatusOK { // TODO: Handle non-200 error codes from application services - return fmt.Errorf("Non-OK status code %d returned from AS", resp.StatusCode) + return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode) } return nil