Refactor ApplicationServiceWorkerState to be more robust.

This commit is contained in:
danielaloni 2022-06-06 16:10:44 +03:00
parent 02597f15f0
commit 55b69b4108
9 changed files with 71 additions and 79 deletions

View file

@ -70,14 +70,14 @@ func NewInternalAPI(
// Wrap application services in a type that relates the application service and
// a sync.Cond object that can be used to notify workers when there are new
// events to be sent out.
workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
workerStates := make([]*types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
for i, appservice := range base.Cfg.Derived.ApplicationServices {
m := sync.Mutex{}
ws := types.ApplicationServiceWorkerState{
AppService: appservice,
Cond: sync.NewCond(&m),
}
workerStates[i] = ws
workerStates[i] = &ws
// Create bot account for this AS if it doesn't already exist
if err = generateAppServiceAccount(userAPI, appservice); err != nil {

View file

@ -39,7 +39,7 @@ type OutputRoomEventConsumer struct {
asDB storage.Database
rsAPI api.AppserviceRoomserverAPI
serverName string
workerStates []types.ApplicationServiceWorkerState
workerStates []*types.ApplicationServiceWorkerState
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
@ -50,7 +50,7 @@ func NewOutputRoomEventConsumer(
js nats.JetStreamContext,
appserviceDB storage.Database,
rsAPI api.AppserviceRoomserverAPI,
workerStates []types.ApplicationServiceWorkerState,
workerStates []*types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@ -140,13 +140,13 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
if id, err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
log.WithError(err).Warnf("failed to insert incoming event into appservices database. id: %d", id)
return err
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.NotifyNewEvents()
ws.NotifyNewEvents(id)
}
}
}

View file

@ -21,9 +21,9 @@ import (
)
type Database interface {
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) (int, error)
GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error)
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
GetLatestId(ctx context.Context, appServiceID string) (int, error)
UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
GetLatestTxnID(ctx context.Context) (int, error)

View file

@ -45,12 +45,13 @@ const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, headered_event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const getLatestIdSQL = "" +
"SELECT id FROM appservice_events WHERE as_id = $1 ORDER BY id DESC LIMIT 1"
const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
"VALUES ($1, $2, $3)"
"VALUES ($1, $2, $3)" +
"RETURNING id"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
@ -66,7 +67,7 @@ const (
type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
getLatestIdStmt *sql.Stmt
insertEventStmt *sql.Stmt
updateTxnIDForEventsStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
@ -81,7 +82,7 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
if s.getLatestIdStmt, err = db.Prepare(getLatestIdSQL); err != nil {
return
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
@ -196,14 +197,12 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
return
}
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) countEventsByApplicationServiceID(
func (s *eventsStatements) getLatestId(
ctx context.Context,
appServiceID string,
) (int, error) {
var count int
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
err := s.getLatestIdStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
@ -217,19 +216,19 @@ func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.HeaderedEvent,
) (err error) {
) (id int, err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
var eventJSON []byte
eventJSON, err = json.Marshal(event)
if err != nil {
return err
return 0, err
}
_, err = s.insertEventStmt.ExecContext(
err = s.insertEventStmt.QueryRowContext(
ctx,
appServiceID,
eventJSON,
-1, // No transaction ID yet
)
).Scan(&id)
return
}

View file

@ -62,7 +62,7 @@ func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.HeaderedEvent,
) error {
) (int, error) {
return d.events.insertEvent(ctx, appServiceID, event)
}
@ -76,13 +76,16 @@ func (d *Database) GetEventsWithAppServiceID(
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
// GetLatestId returns the latest incremental id associated with appservice.
func (d *Database) GetLatestId(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
id, err := d.events.getLatestId(ctx, appServiceID)
if err == sql.ErrNoRows {
return 0, nil
}
return id, err
}
// UpdateTxnIDForEvents takes in an application service ID and a

View file

@ -46,12 +46,13 @@ const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, headered_event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const getLatestIdSQL = "" +
"SELECT id FROM appservice_events WHERE as_id = $1 ORDER BY id DESC LIMIT 1"
const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
"VALUES ($1, $2, $3)"
"VALUES ($1, $2, $3)" +
"RETURNING id"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
@ -69,7 +70,7 @@ type eventsStatements struct {
db *sql.DB
writer sqlutil.Writer
selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
getLatestIdStmt *sql.Stmt
insertEventStmt *sql.Stmt
updateTxnIDForEventsStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
@ -86,7 +87,7 @@ func (s *eventsStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
if s.getLatestIdStmt, err = db.Prepare(getLatestIdSQL); err != nil {
return
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
@ -201,14 +202,12 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
return
}
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) countEventsByApplicationServiceID(
func (s *eventsStatements) getLatestId(
ctx context.Context,
appServiceID string,
) (int, error) {
var count int
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
err := s.getLatestIdStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
@ -222,22 +221,22 @@ func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.HeaderedEvent,
) (err error) {
) (id int, err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
if err != nil {
return err
return 0, err
}
return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
_, err := s.insertEventStmt.ExecContext(
err = s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
err = s.insertEventStmt.QueryRowContext(
ctx,
appServiceID,
eventJSON,
-1, // No transaction ID yet
)
).Scan(&id)
return err
})
return
}
// updateTxnIDForEvents sets the transactionID for a collection of events. Done

View file

@ -61,7 +61,7 @@ func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.HeaderedEvent,
) error {
) (int, error) {
return d.events.insertEvent(ctx, appServiceID, event)
}
@ -75,13 +75,16 @@ func (d *Database) GetEventsWithAppServiceID(
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
// GetLatestId returns the latest incremental id associated with appservice.
func (d *Database) GetLatestId(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
id, err := d.events.getLatestId(ctx, appServiceID)
if err == sql.ErrNoRows {
return 0, nil
}
return id, err
}
// UpdateTxnIDForEvents takes in an application service ID and a

View file

@ -30,34 +30,26 @@ const (
type ApplicationServiceWorkerState struct {
AppService config.ApplicationService
Cond *sync.Cond
// Events ready to be sent
EventsReady bool
// Lastest incremental ID from appservice_events table that is ready to be sent to application service
latestId int
// Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int
}
// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
// in the event queue for this application service worker.
func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
func (a *ApplicationServiceWorkerState) NotifyNewEvents(id int) {
a.Cond.L.Lock()
a.EventsReady = true
a.latestId = id
a.Cond.Broadcast()
a.Cond.L.Unlock()
}
// FinishEventProcessing marks all events of this worker as being sent to the
// application service.
func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
a.Cond.L.Lock()
a.EventsReady = false
a.Cond.L.Unlock()
}
// WaitForNewEvents causes the calling goroutine to wait on the worker state's
// condition for a broadcast or similar wakeup, if there are no events ready.
func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
func (a *ApplicationServiceWorkerState) WaitForNewEvents(id int) {
a.Cond.L.Lock()
if !a.EventsReady {
if a.latestId <= id {
a.Cond.Wait()
}
a.Cond.L.Unlock()

View file

@ -44,7 +44,7 @@ var (
func SetupTransactionWorkers(
client *http.Client,
appserviceDB storage.Database,
workerStates []types.ApplicationServiceWorkerState,
workerStates []*types.ApplicationServiceWorkerState,
) error {
// Create a worker that handles transmitting events to a single homeserver
for _, workerState := range workerStates {
@ -58,31 +58,29 @@ func SetupTransactionWorkers(
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(client *http.Client, db storage.Database, ws types.ApplicationServiceWorkerState) {
func worker(client *http.Client, db storage.Database, ws *types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).Info("Starting application service")
ctx := context.Background()
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
latestId, err := db.GetLatestId(ctx, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to read queued events from DB")
return
}
if eventCount > 0 {
ws.NotifyNewEvents()
}
ws.NotifyNewEvents(latestId)
id := 0
// Loop forever and keep waiting for more events to send
for {
// Wait for more events if we've sent all the events in the database
ws.WaitForNewEvents()
ws.WaitForNewEvents(id)
// Batch events up into a transaction
transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
transactionJSON, txnID, maxEventID, _, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
@ -90,6 +88,10 @@ func worker(client *http.Client, db storage.Database, ws types.ApplicationServic
return
}
// Transactions have a maximum event size (or new events may arrive while
// transaction is processed by Application Service), so there may still be
// some events left over to send. We will keep sending if id < ws.latestID.
id = maxEventID
// Send the events off to the application service
// Backoff if the application service does not respond
@ -99,19 +101,13 @@ func worker(client *http.Client, db storage.Database, ws types.ApplicationServic
"appservice": ws.AppService.ID,
}).WithError(err).Error("unable to send event")
// Backoff
backoff(&ws, err)
backoff(ws, err)
continue
}
// We sent successfully, hooray!
ws.Backoff = 0
// Transactions have a maximum event size, so there may still be some events
// left over to send. Keep sending until none are left
if !eventsRemaining {
ws.FinishEventProcessing()
}
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {