* Fix sync server comment
* Remove unnecessary printlns
* Use logrus Fields
* Worker state methods
* Remove sillyness
This commit is contained in:
Andrew Morgan 2018-06-24 20:32:39 +01:00
parent db2e40cac9
commit 79754a2ab7
4 changed files with 58 additions and 43 deletions

View file

@ -76,10 +76,8 @@ func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room
// server output log. It is not safe for this function to be called from
// multiple goroutines, or else the sync stream position may race and be
// incorrectly calculated.
// onMessage is called when the appservice component receives a new event from
// the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
@ -186,10 +184,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.Cond.L.Lock()
*ws.EventsReady++
ws.Cond.Broadcast()
ws.Cond.L.Unlock()
ws.NotifyNewEvent()
}
}
}

View file

@ -17,7 +17,6 @@ package storage
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -156,7 +155,6 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application
&txnID,
)
if err != nil {
fmt.Println("Failed:", err.Error())
return nil, 0, 0, err
}
@ -213,12 +211,6 @@ func (s *eventsStatements) insertEvent(
appServiceID string,
event *gomatrixserverlib.Event,
) (err error) {
// If event has no content, strip the json
content := event.Content()
if string(content) == "{\"disable\":true}" {
content = []byte("{}")
}
_, err = s.insertEventStmt.ExecContext(
ctx,
appServiceID,
@ -227,7 +219,7 @@ func (s *eventsStatements) insertEvent(
event.RoomID(),
event.Type(),
event.Sender(),
content,
event.Content(),
-1, // No transaction ID yet
)
return

View file

@ -18,6 +18,11 @@ import (
"github.com/matrix-org/dendrite/common/config"
)
const (
// AppServiceDeviceID is the AS dummy device ID
AppServiceDeviceID = "AS_Device"
)
// ApplicationServiceWorkerState is a type that couples an application service,
// a lockable condition as well as some other state variables, allowing the
// roomserver to notify appservice workers when there are events ready to send
@ -31,7 +36,22 @@ type ApplicationServiceWorkerState struct {
Backoff int
}
const (
// AppServiceDeviceID is the AS dummy device ID
AppServiceDeviceID = "AS_Device"
)
// NotifyNewEvent wakes up all waiting goroutines, notifying that a new event
// has been placed into the event queue for this application service worker.
// Additionally it increments EventsReady by one.
func (a *ApplicationServiceWorkerState) NotifyNewEvent() {
a.Cond.L.Lock()
*a.EventsReady++
a.Cond.Broadcast()
a.Cond.L.Unlock()
}
// WaitForNewEvents causes the calling goroutine to wait on the worker state's
// condition for a broadcast or similar wakeup, if there are no events ready.
func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
a.Cond.L.Lock()
if *a.EventsReady <= 0 {
a.Cond.Wait()
}
a.Cond.L.Unlock()
}

View file

@ -66,15 +66,18 @@ func SetupTransactionWorkers(
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
log.Infof("Starting Application Service %s", ws.AppService.ID)
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).Info("starting application service")
ctx := context.Background()
// Initialize transaction ID counter
var err error
currentTransactionID, err = db.GetTxnIDWithAppServiceID(ctx, ws.AppService.ID)
if err != nil && err != sql.ErrNoRows {
log.WithError(err).Fatalf("appservice %s worker unable to get latest transaction ID from DB",
ws.AppService.ID)
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to get latest transaction ID from DB")
return
}
@ -86,8 +89,9 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB",
ws.AppService.ID)
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to read queued events from DB")
return
}
ws.Cond.L.Lock()
@ -97,17 +101,14 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// Loop forever and keep waiting for more events to send
for {
// Wait for more events if we've sent all the events in the database
if *ws.EventsReady <= 0 {
ws.Cond.L.Lock()
ws.Cond.Wait()
ws.Cond.L.Unlock()
}
ws.WaitForNewEvents()
// Batch events up into a transaction
eventsCount, txnID, maxEventID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to create transaction",
ws.AppService.ID)
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to create transaction")
return
}
@ -131,16 +132,18 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {
log.WithError(err).Fatalf("unable to remove appservice events from the database for %s",
ws.AppService.ID)
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("unable to remove appservice events from the database")
return
}
// Update transactionID
currentTransactionID++
if err = db.UpsertTxnIDWithAppServiceID(ctx, ws.AppService.ID, currentTransactionID); err != nil {
log.WithError(err).Fatalf("unable to update transaction ID for %s",
ws.AppService.ID)
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("unable to update transaction ID")
return
}
}
@ -152,8 +155,10 @@ func backoff(ws *types.ApplicationServiceWorkerState, err error) {
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
ws.AppService.ID, backoffDuration)
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
backoffDuration)
ws.Backoff++
if ws.Backoff > 6 {
@ -178,8 +183,9 @@ func createTransaction(
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB",
appserviceID)
log.WithFields(log.Fields{
"appservice": appserviceID,
}).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
return
}
@ -225,14 +231,16 @@ func send(
defer func() {
err := resp.Body.Close()
if err != nil {
log.WithError(err).Errorf("Unable to close response body from application service %s", appservice.ID)
log.WithFields(log.Fields{
"appservice": appservice.ID,
}).WithError(err).Error("unable to close response body from application service")
}
}()
// Check the AS received the events correctly
if resp.StatusCode != http.StatusOK {
// TODO: Handle non-200 error codes from application services
return fmt.Errorf("Non-OK status code %d returned from AS", resp.StatusCode)
return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
}
return nil