diff --git a/INSTALL.md b/INSTALL.md index a7e2d835e..ee3c6f1e0 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -72,7 +72,7 @@ Dendrite requires a postgres database engine, version 9.5 or later. ``` * Create databases: ```bash - for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do + for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do sudo -u postgres createdb -O dendrite dendrite_$i done ``` @@ -253,3 +253,14 @@ you want to support federation. ```bash ./bin/dendrite-federation-sender-server --config dendrite.yaml ``` + +### Run an appservice server + +This sends events from the network to [application +services](https://matrix.org/docs/spec/application_service/unstable.html) +running locally. This is only required if you want to support running +application services on your homeserver. + +```bash +./bin/dendrite-appservice-server --config dendrite.yaml +``` diff --git a/dendrite-config.yaml b/dendrite-config.yaml index ae926bab8..444417879 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -97,6 +97,7 @@ database: room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable" server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable" federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable" + appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable" public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable" # If using naffka you need to specify a naffka database # naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable" diff --git a/src/github.com/matrix-org/dendrite/appservice/README.md b/src/github.com/matrix-org/dendrite/appservice/README.md index 5b00386d3..d75557448 100644 --- a/src/github.com/matrix-org/dendrite/appservice/README.md +++ b/src/github.com/matrix-org/dendrite/appservice/README.md @@ -2,9 +2,9 @@ This component interfaces with external [Application Services](https://matrix.org/docs/spec/application_service/unstable.html). -This includes any HTTP endpoints that Application Services call, as well as talking -to any HTTP endpoints that Application Services provide themselves. +This includes any HTTP endpoints that application services call, as well as talking +to any HTTP endpoints that application services provide themselves. ## Consumers -This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services. \ No newline at end of file +This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services. \ No newline at end of file diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go index 9caf70fba..57b127f27 100644 --- a/src/github.com/matrix-org/dendrite/appservice/appservice.go +++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go @@ -15,8 +15,13 @@ package appservice import ( + "sync" + "github.com/matrix-org/dendrite/appservice/consumers" "github.com/matrix-org/dendrite/appservice/routing" + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" + "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/transactions" @@ -35,13 +40,38 @@ func SetupAppServiceAPIComponent( queryAPI api.RoomserverQueryAPI, transactionsCache *transactions.Cache, ) { + // Create a connection to the appservice postgres DB + appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to appservice db") + } + + // Wrap application services in a type that relates the application service and + // a sync.Cond object that can be used to notify workers when there are new + // events to be sent out. + workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) + for i, appservice := range base.Cfg.Derived.ApplicationServices { + m := sync.Mutex{} + ws := types.ApplicationServiceWorkerState{ + AppService: appservice, + Cond: sync.NewCond(&m), + } + workerStates[i] = ws + } + consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI, + base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, + queryAPI, aliasAPI, workerStates, ) if err := consumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start app service roomserver consumer") } + // Create application service transaction workers + if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil { + logrus.WithError(err).Panicf("failed to start app service transaction workers") + } + // Set up HTTP Endpoints routing.Setup( base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB, diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index a934bf44b..bc1d3bf20 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -17,8 +17,9 @@ package consumers import ( "context" "encoding/json" - "fmt" + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" @@ -29,29 +30,28 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -var ( - appServices []config.ApplicationService -) - // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db *accounts.Database + asDB *storage.Database query api.RoomserverQueryAPI alias api.RoomserverAliasAPI serverName string + workerStates []types.ApplicationServiceWorkerState } -// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call +// Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, store *accounts.Database, + appserviceDB *storage.Database, queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, + workerStates []types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { - appServices = cfg.Derived.ApplicationServices - consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, @@ -60,9 +60,11 @@ func NewOutputRoomEventConsumer( s := &OutputRoomEventConsumer{ roomServerConsumer: &consumer, db: store, + asDB: appserviceDB, query: queryAPI, alias: aliasAPI, serverName: string(cfg.Matrix.ServerName), + workerStates: workerStates, } consumer.ProcessMessage = s.onMessage @@ -74,9 +76,8 @@ func (s *OutputRoomEventConsumer) Start() error { return s.roomServerConsumer.Start() } -// onMessage is called when the sync server receives a new event from the room server output log. -// It is not safe for this function to be called from multiple goroutines, or else the -// sync stream position may race and be incorrectly calculated. +// onMessage is called when the appservice component receives a new event from +// the room server output log. func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent @@ -98,50 +99,37 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { "event_id": ev.EventID(), "room_id": ev.RoomID(), "type": ev.Type(), - }).Info("appservice received event from roomserver") + }).Info("appservice received an event from roomserver") - events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) if err != nil { return err } + events := append(missingEvents, ev) - // Create a context to thread through the whole filtering process - ctx := context.TODO() - - if err = s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs); err != nil { - return err - } - - // Check if any events need to passed on to external application services - return s.filterRoomserverEvents(ctx, append(events, ev)) + // Send event to any relevant application services + return s.filterRoomserverEvents(context.TODO(), events) } -// lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEventConsumer) lookupStateEvents( +// lookupMissingStateEvents looks up the state events that are added by a new event, +// and returns any not already present. +func (s *OutputRoomEventConsumer) lookupMissingStateEvents( addsStateEventIDs []string, event gomatrixserverlib.Event, ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. if len(addsStateEventIDs) == 0 { - // If the event is a membership update (e.g. for a profile update), it won't - // show up in AddsStateEventIDs, so we need to add it manually - if event.Type() == "m.room.member" { - return []gomatrixserverlib.Event{event}, nil - } - return nil, nil + return []gomatrixserverlib.Event{}, nil } // Fast path if the only state event added is the event itself. if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.Event{event}, nil + return []gomatrixserverlib.Event{}, nil } result := []gomatrixserverlib.Event{} missing := []string{} for _, id := range addsStateEventIDs { - // Append the current event in the results if its ID is in the events list - if id == event.EventID() { - result = append(result, event) - } else { + if id != event.EventID() { // If the event isn't the current one, add it to the list of events // to retrieve from the roomserver missing = append(missing, id) @@ -165,13 +153,22 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( // each namespace of each registered application service, and if there is a // match, adds the event to the queue for events to be sent to a particular // application service. -func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, events []gomatrixserverlib.Event) error { - for _, event := range events { - for _, appservice := range appServices { +func (s *OutputRoomEventConsumer) filterRoomserverEvents( + ctx context.Context, + events []gomatrixserverlib.Event, +) error { + for _, ws := range s.workerStates { + for _, event := range events { // Check if this event is interesting to this application service - if s.appserviceIsInterestedInEvent(ctx, event, appservice) { - // TODO: Queue this event to be sent off to the application service - fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID()) + if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { + // Queue this event to be sent off to the application service + if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { + log.WithError(err).Warn("failed to insert incoming event into appservices database") + } else { + // Tell our worker to send out new messages by updating remaining message + // count and waking them up with a broadcast + ws.NotifyNewEvents() + } } } } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go new file mode 100644 index 000000000..285bbf483 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -0,0 +1,248 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + "encoding/json" + "time" + + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +const appserviceEventsSchema = ` +-- Stores events to be sent to application services +CREATE TABLE IF NOT EXISTS appservice_events ( + -- An auto-incrementing id unique to each event in the table + id BIGSERIAL NOT NULL PRIMARY KEY, + -- The ID of the application service the event will be sent to + as_id TEXT NOT NULL, + -- JSON representation of the event + event_json TEXT NOT NULL, + -- The ID of the transaction that this event is a part of + txn_id BIGINT NOT NULL +); + +CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); +` + +const selectEventsByApplicationServiceIDSQL = "" + + "SELECT id, event_json, txn_id " + + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" + +const countEventsByApplicationServiceIDSQL = "" + + "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" + +const insertEventSQL = "" + + "INSERT INTO appservice_events(as_id, event_json, txn_id) " + + "VALUES ($1, $2, $3)" + +const updateTxnIDForEventsSQL = "" + + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" + +const deleteEventsBeforeAndIncludingIDSQL = "" + + "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" + +const ( + // A transaction ID number that no transaction should ever have. Used for + // checking again the default value. + invalidTxnID = -2 +) + +type eventsStatements struct { + selectEventsByApplicationServiceIDStmt *sql.Stmt + countEventsByApplicationServiceIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + updateTxnIDForEventsStmt *sql.Stmt + deleteEventsBeforeAndIncludingIDStmt *sql.Stmt +} + +func (s *eventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(appserviceEventsSchema) + if err != nil { + return + } + + if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil { + return + } + if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil { + return + } + + return +} + +// selectEventsByApplicationServiceID takes in an application service ID and +// returns a slice of events that need to be sent to that application service, +// as well as an int later used to remove these same events from the database +// once successfully sent to an application service. +func (s *eventsStatements) selectEventsByApplicationServiceID( + ctx context.Context, + applicationServiceID string, + limit int, +) ( + txnID, maxID int, + events []gomatrixserverlib.Event, + eventsRemaining bool, + err error, +) { + // Retrieve events from the database. Unsuccessfully sent events first + eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) + if err != nil { + return + } + defer func() { + err = eventRows.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice": applicationServiceID, + }).WithError(err).Fatalf("appservice unable to select new events to send") + } + }() + events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit) + if err != nil { + return + } + + return +} + +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) { + // Get current time for use in calculating event age + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + + // Iterate through each row and store event contents + // If txn_id changes dramatically, we've switched from collecting old events to + // new ones. Send back those events first. + lastTxnID := invalidTxnID + for eventsProcessed := 0; eventRows.Next(); { + var event gomatrixserverlib.Event + var eventJSON []byte + var id int + err = eventRows.Scan( + &id, + &eventJSON, + &txnID, + ) + if err != nil { + return nil, 0, 0, false, err + } + + // Unmarshal eventJSON + if err = json.Unmarshal(eventJSON, &event); err != nil { + return nil, 0, 0, false, err + } + + // If txnID has changed on this event from the previous event, then we've + // reached the end of a transaction's events. Return only those events. + if lastTxnID > invalidTxnID && lastTxnID != txnID { + return events, maxID, lastTxnID, true, nil + } + lastTxnID = txnID + + // Limit events that aren't part of an old transaction + if txnID == -1 { + // Return if we've hit the limit + if eventsProcessed++; eventsProcessed > limit { + return events, maxID, lastTxnID, true, nil + } + } + + if id > maxID { + maxID = id + } + + // Portion of the event that is unsigned due to rapid change + // TODO: Consider removing age as not many app services use it + if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil { + return nil, 0, 0, false, err + } + + events = append(events, event) + } + + return +} + +// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) countEventsByApplicationServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + var count int + err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + + return count, nil +} + +// insertEvent inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) insertEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) (err error) { + // Convert event to JSON before inserting + eventJSON, err := json.Marshal(event) + if err != nil { + return err + } + + _, err = s.insertEventStmt.ExecContext( + ctx, + appServiceID, + eventJSON, + -1, // No transaction ID yet + ) + return +} + +// updateTxnIDForEvents sets the transactionID for a collection of events. Done +// before sending them to an AppService. Referenced before sending to make sure +// we aren't constructing multiple transactions with the same events. +func (s *eventsStatements) updateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) (err error) { + _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + return +} + +// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. +func (s *eventsStatements) deleteEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) (err error) { + _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID) + return +} diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go new file mode 100644 index 000000000..b68989fb1 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -0,0 +1,110 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + // Import postgres database driver + _ "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" +) + +// Database stores events intended to be later sent to application services +type Database struct { + events eventsStatements + txnID txnStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + if err := d.events.prepare(d.db); err != nil { + return err + } + + return d.txnID.prepare(d.db) +} + +// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// for a transaction worker to pull and later send to an application service. +func (d *Database) StoreEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) error { + return d.events.insertEvent(ctx, appServiceID, event) +} + +// GetEventsWithAppServiceID returns a slice of events and their IDs intended to +// be sent to an application service given its ID. +func (d *Database) GetEventsWithAppServiceID( + ctx context.Context, + appServiceID string, + limit int, +) (int, int, []gomatrixserverlib.Event, bool, error) { + return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) +} + +// CountEventsWithAppServiceID returns the number of events destined for an +// application service given its ID. +func (d *Database) CountEventsWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.events.countEventsByApplicationServiceID(ctx, appServiceID) +} + +// UpdateTxnIDForEvents takes in an application service ID and a +// and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpdateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) error { + return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) +} + +// RemoveEventsBeforeAndIncludingID removes all events from the database that +// are less than or equal to a given maximum ID. IDs here are implemented as a +// serial, thus this should always delete events in chronological order. +func (d *Database) RemoveEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) error { + return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) +} + +// GetLatestTxnID returns the latest available transaction id +func (d *Database) GetLatestTxnID( + ctx context.Context, +) (int, error) { + return d.txnID.selectTxnID(ctx) +} diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go new file mode 100644 index 000000000..7b0fa3786 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go @@ -0,0 +1,52 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" +) + +const txnIDSchema = ` +-- Keeps a count of the current transaction ID +CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1; +` + +const selectTxnIDSQL = "SELECT nextval('txn_id_counter')" + +type txnStatements struct { + selectTxnIDStmt *sql.Stmt +} + +func (s *txnStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(txnIDSchema) + if err != nil { + return + } + + if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil { + return + } + + return +} + +// selectTxnID selects the latest ascending transaction ID +func (s *txnStatements) selectTxnID( + ctx context.Context, +) (txnID int, err error) { + err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID) + return +} diff --git a/src/github.com/matrix-org/dendrite/appservice/types/types.go b/src/github.com/matrix-org/dendrite/appservice/types/types.go index 3e702165b..aac731550 100644 --- a/src/github.com/matrix-org/dendrite/appservice/types/types.go +++ b/src/github.com/matrix-org/dendrite/appservice/types/types.go @@ -12,7 +12,53 @@ package types +import ( + "sync" + + "github.com/matrix-org/dendrite/common/config" +) + const ( // AppServiceDeviceID is the AS dummy device ID AppServiceDeviceID = "AS_Device" ) + +// ApplicationServiceWorkerState is a type that couples an application service, +// a lockable condition as well as some other state variables, allowing the +// roomserver to notify appservice workers when there are events ready to send +// externally to application services. +type ApplicationServiceWorkerState struct { + AppService config.ApplicationService + Cond *sync.Cond + // Events ready to be sent + EventsReady bool + // Backoff exponent (2^x secs). Max 6, aka 64s. + Backoff int +} + +// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain +// in the event queue for this application service worker. +func (a *ApplicationServiceWorkerState) NotifyNewEvents() { + a.Cond.L.Lock() + a.EventsReady = true + a.Cond.Broadcast() + a.Cond.L.Unlock() +} + +// FinishEventProcessing marks all events of this worker as being sent to the +// application service. +func (a *ApplicationServiceWorkerState) FinishEventProcessing() { + a.Cond.L.Lock() + a.EventsReady = false + a.Cond.L.Unlock() +} + +// WaitForNewEvents causes the calling goroutine to wait on the worker state's +// condition for a broadcast or similar wakeup, if there are no events ready. +func (a *ApplicationServiceWorkerState) WaitForNewEvents() { + a.Cond.L.Lock() + if !a.EventsReady { + a.Cond.Wait() + } + a.Cond.L.Unlock() +} diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go new file mode 100644 index 000000000..8f966c949 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -0,0 +1,234 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workers + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "math" + "net/http" + "time" + + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +var ( + // Maximum size of events sent in each transaction. + transactionBatchSize = 50 + // Timeout for sending a single transaction to an application service. + transactionTimeout = time.Second * 60 +) + +// SetupTransactionWorkers spawns a separate goroutine for each application +// service. Each of these "workers" handle taking all events intended for their +// app service, batch them up into a single transaction (up to a max transaction +// size), then send that off to the AS's /transactions/{txnID} endpoint. It also +// handles exponentially backing off in case the AS isn't currently available. +func SetupTransactionWorkers( + appserviceDB *storage.Database, + workerStates []types.ApplicationServiceWorkerState, +) error { + // Create a worker that handles transmitting events to a single homeserver + for _, workerState := range workerStates { + // Don't create a worker if this AS doesn't want to receive events + if workerState.AppService.URL != "" { + go worker(appserviceDB, workerState) + } + } + return nil +} + +// worker is a goroutine that sends any queued events to the application service +// it is given. +func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).Info("starting application service") + ctx := context.Background() + + // Grab the HTTP client for sending requests to app services + client := &http.Client{ + Timeout: transactionTimeout, + // TODO: Verify certificates + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, // nolint: gas + }, + }, + } + + // Initial check for any leftover events to send from last time + eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to read queued events from DB") + return + } + if eventCount > 0 { + ws.NotifyNewEvents() + } + + // Loop forever and keep waiting for more events to send + for { + // Wait for more events if we've sent all the events in the database + ws.WaitForNewEvents() + + // Batch events up into a transaction + transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("appservice worker unable to create transaction") + + return + } + + // Send the events off to the application service + // Backoff if the application service does not respond + err = send(client, ws.AppService, txnID, transactionJSON) + if err != nil { + // Backoff + backoff(&ws, err) + continue + } + + // We sent successfully, hooray! + ws.Backoff = 0 + + // Transactions have a maximum event size, so there may still be some events + // left over to send. Keep sending until none are left + if !eventsRemaining { + ws.FinishEventProcessing() + } + + // Remove sent events from the DB + err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID) + if err != nil { + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Fatal("unable to remove appservice events from the database") + return + } + } +} + +// backoff pauses the calling goroutine for a 2^some backoff exponent seconds +func backoff(ws *types.ApplicationServiceWorkerState, err error) { + // Calculate how long to backoff for + backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff))) + backoffSeconds := time.Second * backoffDuration + + log.WithFields(log.Fields{ + "appservice": ws.AppService.ID, + }).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds", + backoffDuration) + + ws.Backoff++ + if ws.Backoff > 6 { + ws.Backoff = 6 + } + + // Backoff + time.Sleep(backoffSeconds) +} + +// createTransaction takes in a slice of AS events, stores them in an AS +// transaction, and JSON-encodes the results. +func createTransaction( + ctx context.Context, + db *storage.Database, + appserviceID string, +) ( + transactionJSON []byte, + txnID, maxID int, + eventsRemaining bool, + err error, +) { + // Retrieve the latest events from the DB (will return old events if they weren't successfully sent) + txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) + if err != nil { + log.WithFields(log.Fields{ + "appservice": appserviceID, + }).WithError(err).Fatalf("appservice worker unable to read queued events from DB") + + return + } + + // Check if these events do not already have a transaction ID + if txnID == -1 { + // If not, grab next available ID from the DB + txnID, err = db.GetLatestTxnID(ctx) + if err != nil { + return nil, 0, 0, false, err + } + + // Mark new events with current transactionID + if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil { + return nil, 0, 0, false, err + } + } + + // Create a transaction and store the events inside + transaction := gomatrixserverlib.ApplicationServiceTransaction{ + Events: events, + } + + transactionJSON, err = json.Marshal(transaction) + if err != nil { + return + } + + return +} + +// send sends events to an application service. Returns an error if an OK was not +// received back from the application service or the request timed out. +func send( + client *http.Client, + appservice config.ApplicationService, + txnID int, + transaction []byte, +) error { + // POST a transaction to our AS + address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID) + resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) + if err != nil { + return err + } + defer func() { + err := resp.Body.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice": appservice.ID, + }).WithError(err).Error("unable to close response body from application service") + } + }() + + // Check the AS received the events correctly + if resp.StatusCode != http.StatusOK { + // TODO: Handle non-200 error codes from application services + return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode) + } + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go index 4ed54f952..e86654eca 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go @@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS account_accounts ( created_ts BIGINT NOT NULL, -- The password hash for this account. Can be NULL if this is a passwordless account. password_hash TEXT, - -- Identifies which Application Service this account belongs to, if any. + -- Identifies which application service this account belongs to, if any. appservice_id TEXT -- TODO: -- is_guest, is_admin, upgraded_ts, devices, any email reset stuff? diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go index 7683a427e..7032fe7bf 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go @@ -138,9 +138,9 @@ func (d *Database) UpdateDevice( } // RemoveDevice revokes a device by deleting the entry in the database -// matching with the given device ID and user ID localpart +// matching with the given device ID and user ID localpart. // If the device doesn't exist, it will not return an error -// If something went wrong during the deletion, it will return the SQL error +// If something went wrong during the deletion, it will return the SQL error. func (d *Database) RemoveDevice( ctx context.Context, deviceID, localpart string, ) error { diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go index 63cb013d4..4949dc01e 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go @@ -115,7 +115,7 @@ type registerRequest struct { InitialDisplayName *string `json:"initial_device_display_name"` - // Application Services place Type in the root of their registration + // Application services place Type in the root of their registration // request, whereas clients place it in the authDict struct. Type authtypes.LoginType `json:"type"` } @@ -281,16 +281,16 @@ func validateRecaptcha( } // UsernameIsWithinApplicationServiceNamespace checks to see if a username falls -// within any of the namespaces of a given Application Service. If no -// Application Service is given, it will check to see if it matches any -// Application Service's namespace. +// within any of the namespaces of a given application service. If no +// application service is given, it will check to see if it matches any +// application service's namespace. func UsernameIsWithinApplicationServiceNamespace( cfg *config.Dendrite, username string, appservice *config.ApplicationService, ) bool { if appservice != nil { - // Loop through given Application Service's namespaces and see if any match + // Loop through given application service's namespaces and see if any match for _, namespace := range appservice.NamespaceMap["users"] { // AS namespaces are checked for validity in config if namespace.RegexpObject.MatchString(username) { @@ -300,7 +300,7 @@ func UsernameIsWithinApplicationServiceNamespace( return false } - // Loop through all known Application Service's namespaces and see if any match + // Loop through all known application service's namespaces and see if any match for _, knownAppservice := range cfg.Derived.ApplicationServices { for _, namespace := range knownAppservice.NamespaceMap["users"] { // AS namespaces are checked for validity in config @@ -509,7 +509,7 @@ func handleRegistrationFlow( sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret) case authtypes.LoginTypeApplicationService: - // Check Application Service register user request is valid. + // Check application service register user request is valid. // The application service's ID is returned if so. appserviceID, err := validateApplicationService(cfg, req, r.Username) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-appservice-server/main.go similarity index 95% rename from src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go rename to src/github.com/matrix-org/dendrite/cmd/dendrite-appservice-server/main.go index 3c537bea0..347a04464 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-appservice-server/main.go @@ -22,7 +22,7 @@ import ( func main() { cfg := basecomponent.ParseFlags() - base := basecomponent.NewBaseDendrite(cfg, "AppService") + base := basecomponent.NewBaseDendrite(cfg, "AppServiceAPI") defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() diff --git a/src/github.com/matrix-org/dendrite/common/config/appservice.go b/src/github.com/matrix-org/dendrite/common/config/appservice.go index 86bc92c10..0333b3389 100644 --- a/src/github.com/matrix-org/dendrite/common/config/appservice.go +++ b/src/github.com/matrix-org/dendrite/common/config/appservice.go @@ -123,8 +123,6 @@ func setupRegexps(cfg *Dendrite) (err error) { } } - fmt.Println(exclusiveUsernameStrings, exclusiveAliasStrings) - // Join the regexes together into one big regex. // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)" // Later we can check if a username or alias matches any exclusive regex and @@ -194,13 +192,13 @@ func checkErrors(config *Dendrite) (err error) { // can have the same ID or token. if idMap[appservice.ID] { return configErrors([]string{fmt.Sprintf( - "Application Service ID %s must be unique", appservice.ID, + "Application service ID %s must be unique", appservice.ID, )}) } // Check if we've already seen this token if tokenMap[appservice.ASToken] { return configErrors([]string{fmt.Sprintf( - "Application Service Token %s must be unique", appservice.ASToken, + "Application service Token %s must be unique", appservice.ASToken, )}) } @@ -216,7 +214,7 @@ func checkErrors(config *Dendrite) (err error) { // namespace, which often ends up in an application service receiving events // it doesn't want, as an empty regex will match all events. return configErrors([]string{fmt.Sprintf( - "Application Service namespace can only contain a single regex tuple. Check your YAML.", + "Application service namespace can only contain a single regex tuple. Check your YAML.", )}) } } diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 8bbac80c6..bd6e361dc 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -162,6 +162,9 @@ type Dendrite struct { // The FederationSender database stores information used by the FederationSender // It is only accessed by the FederationSender. FederationSender DataSource `yaml:"federation_sender"` + // The AppServices database stores information used by the AppService component. + // It is only accessed by the AppService component. + AppService DataSource `yaml:"appservice"` // The PublicRoomsAPI database stores information used to compute the public // room directory. It is only accessed by the PublicRoomsAPI server. PublicRoomsAPI DataSource `yaml:"public_rooms_api"` @@ -231,15 +234,15 @@ type Dendrite struct { Params map[string]interface{} `json:"params"` } - // Application Services parsed from their config files + // Application services parsed from their config files // The paths of which were given above in the main config file ApplicationServices []ApplicationService - // Meta-regexes compiled from all exclusive Application Service + // Meta-regexes compiled from all exclusive application service // Regexes. // // When a user registers, we check that their username does not match any - // exclusive Application Service namespaces + // exclusive application service namespaces ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp // When a user creates a room alias, we check that it isn't already // reserved by an application service diff --git a/vendor/manifest b/vendor/manifest index 1bd43957d..71b834eed 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -135,7 +135,7 @@ { "importpath": "github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib", - "revision": "38a4f0f648bf357adc4bdb601cdc0535cee14e21", + "revision": "929828872b51e6733166553d6b1a20155b6ab829", "branch": "master" }, { diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go index 18e51b462..a67527947 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go @@ -15,21 +15,8 @@ package gomatrixserverlib -// ApplicationServiceEvent is an event format that is sent off to an -// application service as part of a transaction. -type ApplicationServiceEvent struct { - Age int64 `json:"age,omitempty"` - Content RawJSON `json:"content,omitempty"` - EventID string `json:"event_id,omitempty"` - OriginServerTimestamp int64 `json:"origin_server_ts,omitempty"` - RoomID string `json:"room_id,omitempty"` - Sender string `json:"sender,omitempty"` - Type string `json:"type,omitempty"` - UserID string `json:"user_id,omitempty"` -} - // ApplicationServiceTransaction is the transaction that is sent off to an // application service. type ApplicationServiceTransaction struct { - Events []ApplicationServiceEvent `json:"events"` + Events []Event `json:"events"` } diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go index ad4e77513..97b966b67 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/eventcontent.go @@ -261,6 +261,9 @@ func newPowerLevelContentFromAuthEvents(authEvents AuthEventProvider, creatorUse // If there is no power level event then the creator gets level 100 // https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L569 c.userLevels = map[string]int64{creatorUserID: 100} + // If there is no power level event then the state_default is level 0 + // https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L997 + c.stateDefaultLevel = 0 return } diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go index c2adcc0ca..5d61521bb 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/federationtypes.go @@ -37,6 +37,12 @@ type RespState struct { AuthEvents []Event `json:"auth_chain"` } +// A RespEventAuth is the content of a response to GET /_matrix/federation/v1/event_auth/{roomID}/{eventID} +type RespEventAuth struct { + // A list of events needed to authenticate the state events. + AuthEvents []Event `json:"auth_chain"` +} + // Events combines the auth events and the state events and returns // them in an order where every event comes after its auth events. // Each event will only appear once in the output list.