diff --git a/appservice/appservice.go b/appservice/appservice.go index 8fe1b2fc4..00b2f2759 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -70,14 +70,14 @@ func NewInternalAPI( // Wrap application services in a type that relates the application service and // a sync.Cond object that can be used to notify workers when there are new // events to be sent out. - workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) + workerStates := make([]*types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) for i, appservice := range base.Cfg.Derived.ApplicationServices { m := sync.Mutex{} ws := types.ApplicationServiceWorkerState{ AppService: appservice, Cond: sync.NewCond(&m), } - workerStates[i] = ws + workerStates[i] = &ws // Create bot account for this AS if it doesn't already exist if err = generateAppServiceAccount(userAPI, appservice); err != nil { diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 37d4ef9c2..a2c56192e 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -39,7 +39,7 @@ type OutputRoomEventConsumer struct { asDB storage.Database rsAPI api.AppserviceRoomserverAPI serverName string - workerStates []types.ApplicationServiceWorkerState + workerStates []*types.ApplicationServiceWorkerState } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call @@ -50,7 +50,7 @@ func NewOutputRoomEventConsumer( js nats.JetStreamContext, appserviceDB storage.Database, rsAPI api.AppserviceRoomserverAPI, - workerStates []types.ApplicationServiceWorkerState, + workerStates []*types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -140,13 +140,13 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( // Check if this event is interesting to this application service if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { // Queue this event to be sent off to the application service - if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { - log.WithError(err).Warn("failed to insert incoming event into appservices database") + if id, err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { + log.WithError(err).Warnf("failed to insert incoming event into appservices database. id: %d", id) return err } else { // Tell our worker to send out new messages by updating remaining message // count and waking them up with a broadcast - ws.NotifyNewEvents() + ws.NotifyNewEvents(id) } } } diff --git a/appservice/storage/interface.go b/appservice/storage/interface.go index 25d35af6c..8c23f7072 100644 --- a/appservice/storage/interface.go +++ b/appservice/storage/interface.go @@ -21,9 +21,9 @@ import ( ) type Database interface { - StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error + StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) (int, error) GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) - CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error) + GetLatestId(ctx context.Context, appServiceID string) (int, error) UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error GetLatestTxnID(ctx context.Context) (int, error) diff --git a/appservice/storage/postgres/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go index a95be6b8a..857549daf 100644 --- a/appservice/storage/postgres/appservice_events_table.go +++ b/appservice/storage/postgres/appservice_events_table.go @@ -45,12 +45,13 @@ const selectEventsByApplicationServiceIDSQL = "" + "SELECT id, headered_event_json, txn_id " + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" -const countEventsByApplicationServiceIDSQL = "" + - "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" +const getLatestIdSQL = "" + + "SELECT id FROM appservice_events WHERE as_id = $1 ORDER BY id DESC LIMIT 1" const insertEventSQL = "" + "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + - "VALUES ($1, $2, $3)" + "VALUES ($1, $2, $3)" + + "RETURNING id" const updateTxnIDForEventsSQL = "" + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" @@ -66,7 +67,7 @@ const ( type eventsStatements struct { selectEventsByApplicationServiceIDStmt *sql.Stmt - countEventsByApplicationServiceIDStmt *sql.Stmt + getLatestIdStmt *sql.Stmt insertEventStmt *sql.Stmt updateTxnIDForEventsStmt *sql.Stmt deleteEventsBeforeAndIncludingIDStmt *sql.Stmt @@ -81,7 +82,7 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { return } - if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { + if s.getLatestIdStmt, err = db.Prepare(getLatestIdSQL); err != nil { return } if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { @@ -196,14 +197,12 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. return } -// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service -// IDs into the db. -func (s *eventsStatements) countEventsByApplicationServiceID( +func (s *eventsStatements) getLatestId( ctx context.Context, appServiceID string, ) (int, error) { var count int - err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + err := s.getLatestIdStmt.QueryRowContext(ctx, appServiceID).Scan(&count) if err != nil && err != sql.ErrNoRows { return 0, err } @@ -217,19 +216,19 @@ func (s *eventsStatements) insertEvent( ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent, -) (err error) { +) (id int, err error) { // Convert event to JSON before inserting - eventJSON, err := json.Marshal(event) + var eventJSON []byte + eventJSON, err = json.Marshal(event) if err != nil { - return err + return 0, err } - - _, err = s.insertEventStmt.ExecContext( + err = s.insertEventStmt.QueryRowContext( ctx, appServiceID, eventJSON, -1, // No transaction ID yet - ) + ).Scan(&id) return } diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go index a4c04b2cc..8b91f85b5 100644 --- a/appservice/storage/postgres/storage.go +++ b/appservice/storage/postgres/storage.go @@ -62,7 +62,7 @@ func (d *Database) StoreEvent( ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent, -) error { +) (int, error) { return d.events.insertEvent(ctx, appServiceID, event) } @@ -76,13 +76,16 @@ func (d *Database) GetEventsWithAppServiceID( return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } -// CountEventsWithAppServiceID returns the number of events destined for an -// application service given its ID. -func (d *Database) CountEventsWithAppServiceID( +// GetLatestId returns the latest incremental id associated with appservice. +func (d *Database) GetLatestId( ctx context.Context, appServiceID string, ) (int, error) { - return d.events.countEventsByApplicationServiceID(ctx, appServiceID) + id, err := d.events.getLatestId(ctx, appServiceID) + if err == sql.ErrNoRows { + return 0, nil + } + return id, err } // UpdateTxnIDForEvents takes in an application service ID and a diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go index 34b4859ea..9ede32997 100644 --- a/appservice/storage/sqlite3/appservice_events_table.go +++ b/appservice/storage/sqlite3/appservice_events_table.go @@ -46,12 +46,13 @@ const selectEventsByApplicationServiceIDSQL = "" + "SELECT id, headered_event_json, txn_id " + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" -const countEventsByApplicationServiceIDSQL = "" + - "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" +const getLatestIdSQL = "" + + "SELECT id FROM appservice_events WHERE as_id = $1 ORDER BY id DESC LIMIT 1" const insertEventSQL = "" + "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + - "VALUES ($1, $2, $3)" + "VALUES ($1, $2, $3)" + + "RETURNING id" const updateTxnIDForEventsSQL = "" + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" @@ -69,7 +70,7 @@ type eventsStatements struct { db *sql.DB writer sqlutil.Writer selectEventsByApplicationServiceIDStmt *sql.Stmt - countEventsByApplicationServiceIDStmt *sql.Stmt + getLatestIdStmt *sql.Stmt insertEventStmt *sql.Stmt updateTxnIDForEventsStmt *sql.Stmt deleteEventsBeforeAndIncludingIDStmt *sql.Stmt @@ -86,7 +87,7 @@ func (s *eventsStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { return } - if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { + if s.getLatestIdStmt, err = db.Prepare(getLatestIdSQL); err != nil { return } if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { @@ -201,14 +202,12 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. return } -// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service -// IDs into the db. -func (s *eventsStatements) countEventsByApplicationServiceID( +func (s *eventsStatements) getLatestId( ctx context.Context, appServiceID string, ) (int, error) { var count int - err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + err := s.getLatestIdStmt.QueryRowContext(ctx, appServiceID).Scan(&count) if err != nil && err != sql.ErrNoRows { return 0, err } @@ -222,22 +221,22 @@ func (s *eventsStatements) insertEvent( ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent, -) (err error) { +) (id int, err error) { // Convert event to JSON before inserting eventJSON, err := json.Marshal(event) if err != nil { - return err + return 0, err } - - return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - _, err := s.insertEventStmt.ExecContext( + err = s.writer.Do(s.db, nil, func(txn *sql.Tx) error { + err = s.insertEventStmt.QueryRowContext( ctx, appServiceID, eventJSON, -1, // No transaction ID yet - ) + ).Scan(&id) return err }) + return } // updateTxnIDForEvents sets the transactionID for a collection of events. Done diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index ad62b3628..454d4213d 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -61,7 +61,7 @@ func (d *Database) StoreEvent( ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent, -) error { +) (int, error) { return d.events.insertEvent(ctx, appServiceID, event) } @@ -75,13 +75,16 @@ func (d *Database) GetEventsWithAppServiceID( return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } -// CountEventsWithAppServiceID returns the number of events destined for an -// application service given its ID. -func (d *Database) CountEventsWithAppServiceID( +// GetLatestId returns the latest incremental id associated with appservice. +func (d *Database) GetLatestId( ctx context.Context, appServiceID string, ) (int, error) { - return d.events.countEventsByApplicationServiceID(ctx, appServiceID) + id, err := d.events.getLatestId(ctx, appServiceID) + if err == sql.ErrNoRows { + return 0, nil + } + return id, err } // UpdateTxnIDForEvents takes in an application service ID and a diff --git a/appservice/types/types.go b/appservice/types/types.go index 098face62..50b18d998 100644 --- a/appservice/types/types.go +++ b/appservice/types/types.go @@ -30,34 +30,26 @@ const ( type ApplicationServiceWorkerState struct { AppService config.ApplicationService Cond *sync.Cond - // Events ready to be sent - EventsReady bool + // Lastest incremental ID from appservice_events table that is ready to be sent to application service + latestId int // Backoff exponent (2^x secs). Max 6, aka 64s. Backoff int } // NotifyNewEvents wakes up all waiting goroutines, notifying that events remain // in the event queue for this application service worker. -func (a *ApplicationServiceWorkerState) NotifyNewEvents() { +func (a *ApplicationServiceWorkerState) NotifyNewEvents(id int) { a.Cond.L.Lock() - a.EventsReady = true + a.latestId = id a.Cond.Broadcast() a.Cond.L.Unlock() } -// FinishEventProcessing marks all events of this worker as being sent to the -// application service. -func (a *ApplicationServiceWorkerState) FinishEventProcessing() { - a.Cond.L.Lock() - a.EventsReady = false - a.Cond.L.Unlock() -} - // WaitForNewEvents causes the calling goroutine to wait on the worker state's // condition for a broadcast or similar wakeup, if there are no events ready. -func (a *ApplicationServiceWorkerState) WaitForNewEvents() { +func (a *ApplicationServiceWorkerState) WaitForNewEvents(id int) { a.Cond.L.Lock() - if !a.EventsReady { + if a.latestId <= id { a.Cond.Wait() } a.Cond.L.Unlock() diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go index 4dab00bd7..a32933e9d 100644 --- a/appservice/workers/transaction_scheduler.go +++ b/appservice/workers/transaction_scheduler.go @@ -44,7 +44,7 @@ var ( func SetupTransactionWorkers( client *http.Client, appserviceDB storage.Database, - workerStates []types.ApplicationServiceWorkerState, + workerStates []*types.ApplicationServiceWorkerState, ) error { // Create a worker that handles transmitting events to a single homeserver for _, workerState := range workerStates { @@ -58,31 +58,29 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. -func worker(client *http.Client, db storage.Database, ws types.ApplicationServiceWorkerState) { +func worker(client *http.Client, db storage.Database, ws *types.ApplicationServiceWorkerState) { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, }).Info("Starting application service") ctx := context.Background() // Initial check for any leftover events to send from last time - eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) + latestId, err := db.GetLatestId(ctx, ws.AppService.ID) if err != nil { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, }).WithError(err).Fatal("appservice worker unable to read queued events from DB") return } - if eventCount > 0 { - ws.NotifyNewEvents() - } - + ws.NotifyNewEvents(latestId) + id := 0 // Loop forever and keep waiting for more events to send for { // Wait for more events if we've sent all the events in the database - ws.WaitForNewEvents() + ws.WaitForNewEvents(id) // Batch events up into a transaction - transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID) + transactionJSON, txnID, maxEventID, _, err := createTransaction(ctx, db, ws.AppService.ID) if err != nil { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, @@ -90,6 +88,10 @@ func worker(client *http.Client, db storage.Database, ws types.ApplicationServic return } + // Transactions have a maximum event size (or new events may arrive while + // transaction is processed by Application Service), so there may still be + // some events left over to send. We will keep sending if id < ws.latestID. + id = maxEventID // Send the events off to the application service // Backoff if the application service does not respond @@ -99,19 +101,13 @@ func worker(client *http.Client, db storage.Database, ws types.ApplicationServic "appservice": ws.AppService.ID, }).WithError(err).Error("unable to send event") // Backoff - backoff(&ws, err) + backoff(ws, err) continue } // We sent successfully, hooray! ws.Backoff = 0 - // Transactions have a maximum event size, so there may still be some events - // left over to send. Keep sending until none are left - if !eventsRemaining { - ws.FinishEventProcessing() - } - // Remove sent events from the DB err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) if err != nil {