diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go index 632dee001..23474b60a 100644 --- a/src/github.com/matrix-org/dendrite/appservice/appservice.go +++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go @@ -51,10 +51,13 @@ func SetupAppServiceAPIComponent( // events to be sent out. workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) for _, appservice := range base.Cfg.Derived.ApplicationServices { + eventCount := 0 + m := sync.Mutex{} ws := types.ApplicationServiceWorkerState{ - AppService: appservice, - Cond: sync.NewCond(&m), + AppService: appservice, + Cond: sync.NewCond(&m), + EventsReady: &eventCount, } workerStates = append(workerStates, ws) } diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index 802f63d9d..6403e6030 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -175,15 +175,15 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( for _, event := range events { for _, ws := range s.workerStates { // Check if this event is interesting to this application service - if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { + if s.appserviceIsInterestedInEvent(ctx, &event, ws.AppService) { // Queue this event to be sent off to the application service - if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { + if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { log.WithError(err).Warn("failed to insert incoming event into appservices database") } else { - // Tell our worker to send out new messages by setting dirty bit for that - // worker to true, and waking them up with a broadcast + // Tell our worker to send out new messages by updating remaining message + // count and waking them up with a broadcast ws.Cond.L.Lock() - ws.EventsReady = true + *ws.EventsReady++ ws.Cond.Broadcast() ws.Cond.L.Unlock() } @@ -196,7 +196,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( // appserviceIsInterestedInEvent returns a boolean depending on whether a given // event falls within one of a given application service's namespaces. -func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool { +func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.Event, appservice config.ApplicationService) bool { // Check sender of the event for _, userNamespace := range appservice.NamespaceMap["users"] { if userNamespace.RegexpObject.MatchString(event.Sender()) { diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 9020cc9b3..82fb59abe 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -17,6 +17,7 @@ package storage import ( "context" "database/sql" + "fmt" "time" "github.com/matrix-org/gomatrixserverlib" @@ -43,15 +44,19 @@ CREATE TABLE IF NOT EXISTS appservice_events ( -- The JSON representation of the event's content. Text to avoid db JSON parsing event_content TEXT, -- The ID of the transaction that this event is a part of - txn_id INTEGER + txn_id BIGINT NOT NULL ); CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` -const selectEventsByApplicationServiceIDSQL = "" + - "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, COUNT(id) OVER() AS full_count " + - "FROM appservice_events WHERE as_id = $1 ORDER BY id ASC LIMIT $2" +const selectPastEventsByApplicationServiceIDSQL = "" + + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " + + "FROM appservice_events WHERE as_id = $1 AND txn_id > -1 LIMIT $2" + +const selectCurrEventsByApplicationServiceIDSQL = "" + + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " + + "FROM appservice_events WHERE as_id = $1 AND txn_id = -1 LIMIT $2" const countEventsByApplicationServiceIDSQL = "" + "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" @@ -60,14 +65,19 @@ const insertEventSQL = "" + "INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id) " + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" +const updateTxnIDForEventsSQL = "" + + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" + const deleteEventsBeforeAndIncludingIDSQL = "" + - "DELETE FROM appservice_events WHERE id <= $1" + "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" type eventsStatements struct { - selectEventsByApplicationServiceIDStmt *sql.Stmt - countEventsByApplicationServiceIDStmt *sql.Stmt - insertEventStmt *sql.Stmt - deleteEventsBeforeAndIncludingIDStmt *sql.Stmt + selectPastEventsByApplicationServiceIDStmt *sql.Stmt + selectCurrEventsByApplicationServiceIDStmt *sql.Stmt + countEventsByApplicationServiceIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + updateTxnIDForEventsStmt *sql.Stmt + deleteEventsBeforeAndIncludingIDStmt *sql.Stmt } func (s *eventsStatements) prepare(db *sql.DB) (err error) { @@ -76,7 +86,10 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { return } - if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { + if s.selectPastEventsByApplicationServiceIDStmt, err = db.Prepare(selectPastEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.selectCurrEventsByApplicationServiceIDStmt, err = db.Prepare(selectCurrEventsByApplicationServiceIDSQL); err != nil { return } if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { @@ -85,6 +98,9 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return } + if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil { + return + } if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil { return } @@ -95,30 +111,57 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { // selectEventsByApplicationServiceID takes in an application service ID and // returns a slice of events that need to be sent to that application service, // as well as an int later used to remove these same events from the database -// once successfully sent to an application service. The total event count is -// used by a worker to determine if more events need to be pulled from the DB -// later. +// once successfully sent to an application service. func (s *eventsStatements) selectEventsByApplicationServiceID( ctx context.Context, applicationServiceID string, limit int, ) ( - maxID, totalEvents int, + txnID, maxID int, events []gomatrixserverlib.ApplicationServiceEvent, err error, ) { - eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) + // First check to see if there are any events part of an old transaction + eventRowsPast, err := s.selectPastEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) if err != nil { return 0, 0, nil, err } defer func() { - err = eventRows.Close() + err = eventRowsPast.Close() + if err != nil { + log.WithError(err).Fatalf("Appservice %s unable to select past events to send", + applicationServiceID) + } + }() + events, txnID, maxID, err = retrieveEvents(eventRowsPast) + if err != nil { + return 0, 0, nil, err + } + if len(events) > 0 { + return + } + + // Else, if there are old events with existing transaction IDs, grab a batch of new events + eventRowsCurr, err := s.selectCurrEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) + if err != nil { + return 0, 0, nil, err + } + defer func() { + err = eventRowsCurr.Close() if err != nil { log.WithError(err).Fatalf("Appservice %s unable to select new events to send", applicationServiceID) } }() + events, _, maxID, err = retrieveEvents(eventRowsCurr) + if err != nil { + return 0, 0, nil, err + } + return -1, maxID, events, err +} + +func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, txnID, maxID int, err error) { // Iterate through each row and store event contents for eventRows.Next() { var event gomatrixserverlib.ApplicationServiceEvent @@ -132,10 +175,11 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( &event.Type, &event.UserID, &eventContent, - &totalEvents, + &txnID, ) if err != nil { - return 0, 0, nil, err + fmt.Println("Failed:", err.Error()) + return nil, 0, 0, err } if eventContent.Valid { event.Content = gomatrixserverlib.RawJSON(eventContent.String) @@ -177,7 +221,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID( func (s *eventsStatements) insertEvent( ctx context.Context, appServiceID string, - event gomatrixserverlib.Event, + event *gomatrixserverlib.Event, ) (err error) { _, err = s.insertEventStmt.ExecContext( ctx, @@ -188,16 +232,29 @@ func (s *eventsStatements) insertEvent( event.Type(), event.Sender(), event.Content(), - nil, + -1, ) return } +// updateTxnIDForEvents sets the transactionID for a collection of events. Done +// before sending them to an AppService. Referenced before sending to make sure +// we aren't constructing multiple transactions with the same events. +func (s *eventsStatements) updateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) (err error) { + _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + return +} + // deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. func (s *eventsStatements) deleteEventsBeforeAndIncludingID( ctx context.Context, + appserviceID string, eventTableID int, ) (err error) { - _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventTableID) - return err + _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID) + return } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go index 4bc73c699..16f6f5ad6 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -56,7 +56,7 @@ func (d *Database) prepare() error { func (d *Database) StoreEvent( ctx context.Context, appServiceID string, - event gomatrixserverlib.Event, + event *gomatrixserverlib.Event, ) error { return d.events.insertEvent(ctx, appServiceID, event) } @@ -80,14 +80,26 @@ func (d *Database) CountEventsWithAppServiceID( return d.events.countEventsByApplicationServiceID(ctx, appServiceID) } +// UpdateTxnIDForEvents takes in an application service ID and a +// and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpdateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) error { + return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) +} + // RemoveEventsBeforeAndIncludingID removes all events from the database that // are less than or equal to a given maximum ID. IDs here are implemented as a // serial, thus this should always delete events in chronological order. func (d *Database) RemoveEventsBeforeAndIncludingID( ctx context.Context, + appserviceID string, eventTableID int, ) error { - return d.events.deleteEventsBeforeAndIncludingID(ctx, eventTableID) + return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) } // GetTxnIDWithAppServiceID takes in an application service ID and returns the diff --git a/src/github.com/matrix-org/dendrite/appservice/types/types.go b/src/github.com/matrix-org/dendrite/appservice/types/types.go index c0fa56b0c..93ad9e142 100644 --- a/src/github.com/matrix-org/dendrite/appservice/types/types.go +++ b/src/github.com/matrix-org/dendrite/appservice/types/types.go @@ -23,9 +23,10 @@ import ( // roomserver to notify appservice workers when there are events ready to send // externally to application services. type ApplicationServiceWorkerState struct { - AppService config.ApplicationService - Cond *sync.Cond - EventsReady bool + AppService config.ApplicationService + Cond *sync.Cond + // Events ready to be sent + EventsReady *int // Backoff exponent (2^x secs). Max 6, aka 64s. Backoff int } diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index e3432c9a2..d73bb56a5 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -32,11 +32,12 @@ import ( ) var ( - // TODO: Expose these in the config? // Maximum size of events sent in each transaction. + // Warning, if this is lowered and a number of events greater than the previous + // batch size were still to be sent, then a number of events equal to the + // difference will be ignored by the app service. + // TL;DR: Don't lower this number with any AS events still left in the database. transactionBatchSize = 50 - // Time to wait between checking for new events to send. - transactionBreakTime = time.Millisecond * 50 // Timeout for sending a single transaction to an application service. transactionTimeout = time.Second * 15 // The current transaction ID. Increments after every successful transaction. @@ -65,6 +66,7 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { + log.Infof("Starting Application Service %s", ws.AppService.ID) ctx := context.Background() // Initialize transaction ID counter @@ -88,40 +90,31 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.AppService.ID) return } - - // Wait if there are no new events to go out - if eventCount == 0 { - waitForEvents(&ws) - } + ws.Cond.L.Lock() + *ws.EventsReady = eventCount + ws.Cond.L.Unlock() // Loop forever and keep waiting for more events to send for { - // Set EventsReady to false for some reason (we just sent events?) - ws.Cond.L.Lock() - ws.EventsReady = false - ws.Cond.L.Unlock() - - maxID, totalEvents, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) - if err != nil { - log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB", - ws.AppService.ID) - - // Wait a little bit for DB to possibly recover - time.Sleep(transactionBreakTime) - continue + // Wait for more events if we've sent all the events in the database + if *ws.EventsReady <= 0 { + fmt.Println("Waiting") + ws.Cond.L.Lock() + ws.Cond.Wait() + ws.Cond.L.Unlock() } // Batch events up into a transaction - transactionJSON, err := createTransaction(events) + eventsCount, maxEventID, transactionID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID) if err != nil { - log.WithError(err).Fatalf("appservice %s worker unable to marshal events", + log.WithError(err).Fatalf("appservice %s worker unable to create transaction", ws.AppService.ID) return } // Send the events off to the application service - err = send(client, ws.AppService, transactionJSON) + err = send(client, ws.AppService, transactionID, transactionJSON) if err != nil { // Backoff backoff(err, &ws) @@ -131,8 +124,12 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // We sent successfully, hooray! ws.Backoff = 0 + ws.Cond.L.Lock() + *ws.EventsReady -= eventsCount + ws.Cond.L.Unlock() + // Remove sent events from the DB - err = db.RemoveEventsBeforeAndIncludingID(ctx, maxID) + err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) if err != nil { log.WithError(err).Fatalf("unable to remove appservice events from the database for %s", ws.AppService.ID) @@ -146,29 +143,15 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.AppService.ID) return } - - // Only wait for more events once we've sent all the events in the database - if totalEvents <= transactionBatchSize { - waitForEvents(&ws) - } } } -// waitForEvents pauses the calling goroutine while it waits for a broadcast message -func waitForEvents(ws *types.ApplicationServiceWorkerState) { - ws.Cond.L.Lock() - if !ws.EventsReady { - // Wait for a broadcast about new events - ws.Cond.Wait() - } - ws.Cond.L.Unlock() -} - // backoff pauses the calling goroutine for a 2^some backoff exponent seconds func backoff(err error, ws *types.ApplicationServiceWorkerState) { // Calculate how long to backoff for backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff))) backoffSeconds := time.Second * backoffDuration + log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds", ws.AppService.ID, backoffDuration) @@ -184,19 +167,47 @@ func backoff(err error, ws *types.ApplicationServiceWorkerState) { // createTransaction takes in a slice of AS events, stores them in an AS // transaction, and JSON-encodes the results. func createTransaction( - events []gomatrixserverlib.ApplicationServiceEvent, -) ([]byte, error) { - // Create a transactions and store the events inside + ctx context.Context, + db *storage.Database, + appserviceID string, +) ( + eventsCount, maxID, txnID int, + transactionJSON []byte, + err error, +) { + transactionID := currentTransactionID + + // Retrieve the latest events from the DB (will return old events if they weren't successfully sent) + txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) + if err != nil { + log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB", + appserviceID) + + return + } + + // Check if these are old events we are resending. If so, reuse old transactionID + if txnID != -1 { + transactionID = txnID + } else { + // Mark new events with current transactionID + err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, transactionID) + if err != nil { + return 0, 0, 0, nil, err + } + } + + // Create a transaction and store the events inside transaction := gomatrixserverlib.ApplicationServiceTransaction{ Events: events, } - transactionJSON, err := json.Marshal(transaction) + transactionJSON, err = json.Marshal(transaction) if err != nil { - return nil, err + return } - return transactionJSON, nil + return len(events), maxID, transactionID, transactionJSON, nil } // send sends events to an application service. Returns an error if an OK was not @@ -204,10 +215,11 @@ func createTransaction( func send( client *http.Client, appservice config.ApplicationService, + transactionID int, transaction []byte, ) error { - // POST a transaction to our AS. - address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID) + // POST a transaction to our AS + address := fmt.Sprintf("%s/transactions/%d", appservice.URL, transactionID) resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) if err != nil { return err