Send Application Service Events (#477)

* Prevent sql scanning into nil value in accounts_table

Signed-off-by: Andrew Morgan <andrewm@matrix.org>

* Remove uneccessary logging, null checking

* Don't forget to set the localpart

* Simplify error checking

* Store And Send Application Service Events

* Modify INSTALL.md and dendrite-config.yaml for the new appservice database
* Correct all instances of casing on 'application service' to align with
spec
* Store incoming events that an app service is interested in in the
database to be later read by transaction workers.
* Retrieve these events from transaction workers, one per AS.
* Minimal transaction ID data is stored as well to recover after
server failure.
* Send events to AS and exponentially backoff on failure.

Signed-off-by: Andrew Morgan <andrewm@matrix.org>

* Finish my own sentences.

* Fix up database interaction

* Change to event-based AS sending

* Reduce cyclomatic complexity

* Appease the errcheck gods

* Delete by int ID instead of string.

This was causing some events to not be deleted, as < an eventID doesn't
really make much sense.

* Check if there are more events to send before sleeping

* Send same transaction if last send attempt failed

* Don't backoff on non-200s, tight send loop, 1 event query

* Remove tight send loop. Fix events not being deleted

* Additionally order by event id, track main.go

* Return the last txnID, which our events are using

* Remove old main.go file

* Prevent duplicate events from being sent...

* Strip event content if it doesn't contain anything

Signed-off-by: Andrew Morgan <andrewm@matrix.org>

* Update gomatrixserverlib and use Unsigned AS event prop

* Fixes

* Fix sync server comment
* Remove unnecessary printlns
* Use logrus Fields
* Worker state methods
* Remove sillyness

* Fix up event filtering

* Handle transaction event limit in loop

* Switch to using a sequence for transaction IDs

* Don't verify self-signed AS certificates

* Fix logging

* Use gmsl.Event instead of AS-only event in transactions

Also clear up the logic on lookupStateEvents a little bit.

* Change invalid_txn_id to global (for efficiency)

* Use a bool for EventsReady instead of an int
This commit is contained in:
Andrew Morgan 2018-07-05 09:34:59 -07:00 committed by GitHub
parent 8f5526763c
commit 7736e247b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 806 additions and 80 deletions

View file

@ -72,7 +72,7 @@ Dendrite requires a postgres database engine, version 9.5 or later.
``` ```
* Create databases: * Create databases:
```bash ```bash
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do
sudo -u postgres createdb -O dendrite dendrite_$i sudo -u postgres createdb -O dendrite dendrite_$i
done done
``` ```
@ -253,3 +253,14 @@ you want to support federation.
```bash ```bash
./bin/dendrite-federation-sender-server --config dendrite.yaml ./bin/dendrite-federation-sender-server --config dendrite.yaml
``` ```
### Run an appservice server
This sends events from the network to [application
services](https://matrix.org/docs/spec/application_service/unstable.html)
running locally. This is only required if you want to support running
application services on your homeserver.
```bash
./bin/dendrite-appservice-server --config dendrite.yaml
```

View file

@ -97,6 +97,7 @@ database:
room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable" room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable"
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable" server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable" federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable" public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
# If using naffka you need to specify a naffka database # If using naffka you need to specify a naffka database
# naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable" # naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable"

View file

@ -2,9 +2,9 @@
This component interfaces with external [Application This component interfaces with external [Application
Services](https://matrix.org/docs/spec/application_service/unstable.html). Services](https://matrix.org/docs/spec/application_service/unstable.html).
This includes any HTTP endpoints that Application Services call, as well as talking This includes any HTTP endpoints that application services call, as well as talking
to any HTTP endpoints that Application Services provide themselves. to any HTTP endpoints that application services provide themselves.
## Consumers ## Consumers
This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services. This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services.

View file

@ -15,8 +15,13 @@
package appservice package appservice
import ( import (
"sync"
"github.com/matrix-org/dendrite/appservice/consumers" "github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/routing" "github.com/matrix-org/dendrite/appservice/routing"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/transactions" "github.com/matrix-org/dendrite/common/transactions"
@ -35,13 +40,38 @@ func SetupAppServiceAPIComponent(
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
) { ) {
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to appservice db")
}
// Wrap application services in a type that relates the application service and
// a sync.Cond object that can be used to notify workers when there are new
// events to be sent out.
workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
for i, appservice := range base.Cfg.Derived.ApplicationServices {
m := sync.Mutex{}
ws := types.ApplicationServiceWorkerState{
AppService: appservice,
Cond: sync.NewCond(&m),
}
workerStates[i] = ws
}
consumer := consumers.NewOutputRoomEventConsumer( consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI, base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
queryAPI, aliasAPI, workerStates,
) )
if err := consumer.Start(); err != nil { if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start app service roomserver consumer") logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
} }
// Create application service transaction workers
if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
logrus.WithError(err).Panicf("failed to start app service transaction workers")
}
// Set up HTTP Endpoints // Set up HTTP Endpoints
routing.Setup( routing.Setup(
base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB, base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB,

View file

@ -17,8 +17,9 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
@ -29,29 +30,28 @@ import (
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
var (
appServices []config.ApplicationService
)
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
db *accounts.Database db *accounts.Database
asDB *storage.Database
query api.RoomserverQueryAPI query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI alias api.RoomserverAliasAPI
serverName string serverName string
workerStates []types.ApplicationServiceWorkerState
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer( func NewOutputRoomEventConsumer(
cfg *config.Dendrite, cfg *config.Dendrite,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
store *accounts.Database, store *accounts.Database,
appserviceDB *storage.Database,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI, aliasAPI api.RoomserverAliasAPI,
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
appServices = cfg.Derived.ApplicationServices
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
@ -60,9 +60,11 @@ func NewOutputRoomEventConsumer(
s := &OutputRoomEventConsumer{ s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
asDB: appserviceDB,
query: queryAPI, query: queryAPI,
alias: aliasAPI, alias: aliasAPI,
serverName: string(cfg.Matrix.ServerName), serverName: string(cfg.Matrix.ServerName),
workerStates: workerStates,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -74,9 +76,8 @@ func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start() return s.roomServerConsumer.Start()
} }
// onMessage is called when the sync server receives a new event from the room server output log. // onMessage is called when the appservice component receives a new event from
// It is not safe for this function to be called from multiple goroutines, or else the // the room server output log.
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
@ -98,50 +99,37 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"event_id": ev.EventID(), "event_id": ev.EventID(),
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
"type": ev.Type(), "type": ev.Type(),
}).Info("appservice received event from roomserver") }).Info("appservice received an event from roomserver")
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil { if err != nil {
return err return err
} }
events := append(missingEvents, ev)
// Create a context to thread through the whole filtering process // Send event to any relevant application services
ctx := context.TODO() return s.filterRoomserverEvents(context.TODO(), events)
if err = s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs); err != nil {
return err
}
// Check if any events need to passed on to external application services
return s.filterRoomserverEvents(ctx, append(events, ev))
} }
// lookupStateEvents looks up the state events that are added by a new event. // lookupMissingStateEvents looks up the state events that are added by a new event,
func (s *OutputRoomEventConsumer) lookupStateEvents( // and returns any not already present.
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event, addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events. // Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 { if len(addsStateEventIDs) == 0 {
// If the event is a membership update (e.g. for a profile update), it won't return []gomatrixserverlib.Event{}, nil
// show up in AddsStateEventIDs, so we need to add it manually
if event.Type() == "m.room.member" {
return []gomatrixserverlib.Event{event}, nil
}
return nil, nil
} }
// Fast path if the only state event added is the event itself. // Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.Event{event}, nil return []gomatrixserverlib.Event{}, nil
} }
result := []gomatrixserverlib.Event{} result := []gomatrixserverlib.Event{}
missing := []string{} missing := []string{}
for _, id := range addsStateEventIDs { for _, id := range addsStateEventIDs {
// Append the current event in the results if its ID is in the events list if id != event.EventID() {
if id == event.EventID() {
result = append(result, event)
} else {
// If the event isn't the current one, add it to the list of events // If the event isn't the current one, add it to the list of events
// to retrieve from the roomserver // to retrieve from the roomserver
missing = append(missing, id) missing = append(missing, id)
@ -165,13 +153,22 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// each namespace of each registered application service, and if there is a // each namespace of each registered application service, and if there is a
// match, adds the event to the queue for events to be sent to a particular // match, adds the event to the queue for events to be sent to a particular
// application service. // application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, events []gomatrixserverlib.Event) error { func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context,
events []gomatrixserverlib.Event,
) error {
for _, ws := range s.workerStates {
for _, event := range events { for _, event := range events {
for _, appservice := range appServices {
// Check if this event is interesting to this application service // Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, appservice) { if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// TODO: Queue this event to be sent off to the application service // Queue this event to be sent off to the application service
fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID()) if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.NotifyNewEvents()
}
} }
} }
} }

View file

@ -0,0 +1,248 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
const appserviceEventsSchema = `
-- Stores events to be sent to application services
CREATE TABLE IF NOT EXISTS appservice_events (
-- An auto-incrementing id unique to each event in the table
id BIGSERIAL NOT NULL PRIMARY KEY,
-- The ID of the application service the event will be sent to
as_id TEXT NOT NULL,
-- JSON representation of the event
event_json TEXT NOT NULL,
-- The ID of the transaction that this event is a part of
txn_id BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, event_json, txn_id) " +
"VALUES ($1, $2, $3)"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
const deleteEventsBeforeAndIncludingIDSQL = "" +
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
const (
// A transaction ID number that no transaction should ever have. Used for
// checking again the default value.
invalidTxnID = -2
)
type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
insertEventStmt *sql.Stmt
updateTxnIDForEventsStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
}
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(appserviceEventsSchema)
if err != nil {
return
}
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return
}
if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
return
}
if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
return
}
return
}
// selectEventsByApplicationServiceID takes in an application service ID and
// returns a slice of events that need to be sent to that application service,
// as well as an int later used to remove these same events from the database
// once successfully sent to an application service.
func (s *eventsStatements) selectEventsByApplicationServiceID(
ctx context.Context,
applicationServiceID string,
limit int,
) (
txnID, maxID int,
events []gomatrixserverlib.Event,
eventsRemaining bool,
err error,
) {
// Retrieve events from the database. Unsuccessfully sent events first
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
if err != nil {
return
}
defer func() {
err = eventRows.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": applicationServiceID,
}).WithError(err).Fatalf("appservice unable to select new events to send")
}
}()
events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
if err != nil {
return
}
return
}
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
// Iterate through each row and store event contents
// If txn_id changes dramatically, we've switched from collecting old events to
// new ones. Send back those events first.
lastTxnID := invalidTxnID
for eventsProcessed := 0; eventRows.Next(); {
var event gomatrixserverlib.Event
var eventJSON []byte
var id int
err = eventRows.Scan(
&id,
&eventJSON,
&txnID,
)
if err != nil {
return nil, 0, 0, false, err
}
// Unmarshal eventJSON
if err = json.Unmarshal(eventJSON, &event); err != nil {
return nil, 0, 0, false, err
}
// If txnID has changed on this event from the previous event, then we've
// reached the end of a transaction's events. Return only those events.
if lastTxnID > invalidTxnID && lastTxnID != txnID {
return events, maxID, lastTxnID, true, nil
}
lastTxnID = txnID
// Limit events that aren't part of an old transaction
if txnID == -1 {
// Return if we've hit the limit
if eventsProcessed++; eventsProcessed > limit {
return events, maxID, lastTxnID, true, nil
}
}
if id > maxID {
maxID = id
}
// Portion of the event that is unsigned due to rapid change
// TODO: Consider removing age as not many app services use it
if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
return nil, 0, 0, false, err
}
events = append(events, event)
}
return
}
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) countEventsByApplicationServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
var count int
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return count, nil
}
// insertEvent inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) (err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.insertEventStmt.ExecContext(
ctx,
appServiceID,
eventJSON,
-1, // No transaction ID yet
)
return
}
// updateTxnIDForEvents sets the transactionID for a collection of events. Done
// before sending them to an AppService. Referenced before sending to make sure
// we aren't constructing multiple transactions with the same events.
func (s *eventsStatements) updateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) (err error) {
_, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
return
}
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) (err error) {
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
return
}

View file

@ -0,0 +1,110 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
// Import postgres database driver
_ "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
)
// Database stores events intended to be later sent to application services
type Database struct {
events eventsStatements
txnID txnStatements
db *sql.DB
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
}
return &result, nil
}
func (d *Database) prepare() error {
if err := d.events.prepare(d.db); err != nil {
return err
}
return d.txnID.prepare(d.db)
}
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
// be sent to an application service given its ID.
func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
) (int, int, []gomatrixserverlib.Event, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
}
// UpdateTxnIDForEvents takes in an application service ID and a
// and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpdateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) error {
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
}
// RemoveEventsBeforeAndIncludingID removes all events from the database that
// are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
}
// GetLatestTxnID returns the latest available transaction id
func (d *Database) GetLatestTxnID(
ctx context.Context,
) (int, error) {
return d.txnID.selectTxnID(ctx)
}

View file

@ -0,0 +1,52 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
)
const txnIDSchema = `
-- Keeps a count of the current transaction ID
CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1;
`
const selectTxnIDSQL = "SELECT nextval('txn_id_counter')"
type txnStatements struct {
selectTxnIDStmt *sql.Stmt
}
func (s *txnStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(txnIDSchema)
if err != nil {
return
}
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
return
}
return
}
// selectTxnID selects the latest ascending transaction ID
func (s *txnStatements) selectTxnID(
ctx context.Context,
) (txnID int, err error) {
err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
return
}

View file

@ -12,7 +12,53 @@
package types package types
import (
"sync"
"github.com/matrix-org/dendrite/common/config"
)
const ( const (
// AppServiceDeviceID is the AS dummy device ID // AppServiceDeviceID is the AS dummy device ID
AppServiceDeviceID = "AS_Device" AppServiceDeviceID = "AS_Device"
) )
// ApplicationServiceWorkerState is a type that couples an application service,
// a lockable condition as well as some other state variables, allowing the
// roomserver to notify appservice workers when there are events ready to send
// externally to application services.
type ApplicationServiceWorkerState struct {
AppService config.ApplicationService
Cond *sync.Cond
// Events ready to be sent
EventsReady bool
// Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int
}
// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
// in the event queue for this application service worker.
func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
a.Cond.L.Lock()
a.EventsReady = true
a.Cond.Broadcast()
a.Cond.L.Unlock()
}
// FinishEventProcessing marks all events of this worker as being sent to the
// application service.
func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
a.Cond.L.Lock()
a.EventsReady = false
a.Cond.L.Unlock()
}
// WaitForNewEvents causes the calling goroutine to wait on the worker state's
// condition for a broadcast or similar wakeup, if there are no events ready.
func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
a.Cond.L.Lock()
if !a.EventsReady {
a.Cond.Wait()
}
a.Cond.L.Unlock()
}

View file

@ -0,0 +1,234 @@
// Copyright 2018 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package workers
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"math"
"net/http"
"time"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
var (
// Maximum size of events sent in each transaction.
transactionBatchSize = 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 60
)
// SetupTransactionWorkers spawns a separate goroutine for each application
// service. Each of these "workers" handle taking all events intended for their
// app service, batch them up into a single transaction (up to a max transaction
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers(
appserviceDB *storage.Database,
workerStates []types.ApplicationServiceWorkerState,
) error {
// Create a worker that handles transmitting events to a single homeserver
for _, workerState := range workerStates {
// Don't create a worker if this AS doesn't want to receive events
if workerState.AppService.URL != "" {
go worker(appserviceDB, workerState)
}
}
return nil
}
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).Info("starting application service")
ctx := context.Background()
// Grab the HTTP client for sending requests to app services
client := &http.Client{
Timeout: transactionTimeout,
// TODO: Verify certificates
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // nolint: gas
},
},
}
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to read queued events from DB")
return
}
if eventCount > 0 {
ws.NotifyNewEvents()
}
// Loop forever and keep waiting for more events to send
for {
// Wait for more events if we've sent all the events in the database
ws.WaitForNewEvents()
// Batch events up into a transaction
transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to create transaction")
return
}
// Send the events off to the application service
// Backoff if the application service does not respond
err = send(client, ws.AppService, txnID, transactionJSON)
if err != nil {
// Backoff
backoff(&ws, err)
continue
}
// We sent successfully, hooray!
ws.Backoff = 0
// Transactions have a maximum event size, so there may still be some events
// left over to send. Keep sending until none are left
if !eventsRemaining {
ws.FinishEventProcessing()
}
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("unable to remove appservice events from the database")
return
}
}
}
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func backoff(ws *types.ApplicationServiceWorkerState, err error) {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
backoffDuration)
ws.Backoff++
if ws.Backoff > 6 {
ws.Backoff = 6
}
// Backoff
time.Sleep(backoffSeconds)
}
// createTransaction takes in a slice of AS events, stores them in an AS
// transaction, and JSON-encodes the results.
func createTransaction(
ctx context.Context,
db *storage.Database,
appserviceID string,
) (
transactionJSON []byte,
txnID, maxID int,
eventsRemaining bool,
err error,
) {
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil {
log.WithFields(log.Fields{
"appservice": appserviceID,
}).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
return
}
// Check if these events do not already have a transaction ID
if txnID == -1 {
// If not, grab next available ID from the DB
txnID, err = db.GetLatestTxnID(ctx)
if err != nil {
return nil, 0, 0, false, err
}
// Mark new events with current transactionID
if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
return nil, 0, 0, false, err
}
}
// Create a transaction and store the events inside
transaction := gomatrixserverlib.ApplicationServiceTransaction{
Events: events,
}
transactionJSON, err = json.Marshal(transaction)
if err != nil {
return
}
return
}
// send sends events to an application service. Returns an error if an OK was not
// received back from the application service or the request timed out.
func send(
client *http.Client,
appservice config.ApplicationService,
txnID int,
transaction []byte,
) error {
// POST a transaction to our AS
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": appservice.ID,
}).WithError(err).Error("unable to close response body from application service")
}
}()
// Check the AS received the events correctly
if resp.StatusCode != http.StatusOK {
// TODO: Handle non-200 error codes from application services
return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
}
return nil
}

View file

@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS account_accounts (
created_ts BIGINT NOT NULL, created_ts BIGINT NOT NULL,
-- The password hash for this account. Can be NULL if this is a passwordless account. -- The password hash for this account. Can be NULL if this is a passwordless account.
password_hash TEXT, password_hash TEXT,
-- Identifies which Application Service this account belongs to, if any. -- Identifies which application service this account belongs to, if any.
appservice_id TEXT appservice_id TEXT
-- TODO: -- TODO:
-- is_guest, is_admin, upgraded_ts, devices, any email reset stuff? -- is_guest, is_admin, upgraded_ts, devices, any email reset stuff?

View file

@ -138,9 +138,9 @@ func (d *Database) UpdateDevice(
} }
// RemoveDevice revokes a device by deleting the entry in the database // RemoveDevice revokes a device by deleting the entry in the database
// matching with the given device ID and user ID localpart // matching with the given device ID and user ID localpart.
// If the device doesn't exist, it will not return an error // If the device doesn't exist, it will not return an error
// If something went wrong during the deletion, it will return the SQL error // If something went wrong during the deletion, it will return the SQL error.
func (d *Database) RemoveDevice( func (d *Database) RemoveDevice(
ctx context.Context, deviceID, localpart string, ctx context.Context, deviceID, localpart string,
) error { ) error {

View file

@ -115,7 +115,7 @@ type registerRequest struct {
InitialDisplayName *string `json:"initial_device_display_name"` InitialDisplayName *string `json:"initial_device_display_name"`
// Application Services place Type in the root of their registration // Application services place Type in the root of their registration
// request, whereas clients place it in the authDict struct. // request, whereas clients place it in the authDict struct.
Type authtypes.LoginType `json:"type"` Type authtypes.LoginType `json:"type"`
} }
@ -281,16 +281,16 @@ func validateRecaptcha(
} }
// UsernameIsWithinApplicationServiceNamespace checks to see if a username falls // UsernameIsWithinApplicationServiceNamespace checks to see if a username falls
// within any of the namespaces of a given Application Service. If no // within any of the namespaces of a given application service. If no
// Application Service is given, it will check to see if it matches any // application service is given, it will check to see if it matches any
// Application Service's namespace. // application service's namespace.
func UsernameIsWithinApplicationServiceNamespace( func UsernameIsWithinApplicationServiceNamespace(
cfg *config.Dendrite, cfg *config.Dendrite,
username string, username string,
appservice *config.ApplicationService, appservice *config.ApplicationService,
) bool { ) bool {
if appservice != nil { if appservice != nil {
// Loop through given Application Service's namespaces and see if any match // Loop through given application service's namespaces and see if any match
for _, namespace := range appservice.NamespaceMap["users"] { for _, namespace := range appservice.NamespaceMap["users"] {
// AS namespaces are checked for validity in config // AS namespaces are checked for validity in config
if namespace.RegexpObject.MatchString(username) { if namespace.RegexpObject.MatchString(username) {
@ -300,7 +300,7 @@ func UsernameIsWithinApplicationServiceNamespace(
return false return false
} }
// Loop through all known Application Service's namespaces and see if any match // Loop through all known application service's namespaces and see if any match
for _, knownAppservice := range cfg.Derived.ApplicationServices { for _, knownAppservice := range cfg.Derived.ApplicationServices {
for _, namespace := range knownAppservice.NamespaceMap["users"] { for _, namespace := range knownAppservice.NamespaceMap["users"] {
// AS namespaces are checked for validity in config // AS namespaces are checked for validity in config
@ -509,7 +509,7 @@ func handleRegistrationFlow(
sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret) sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret)
case authtypes.LoginTypeApplicationService: case authtypes.LoginTypeApplicationService:
// Check Application Service register user request is valid. // Check application service register user request is valid.
// The application service's ID is returned if so. // The application service's ID is returned if so.
appserviceID, err := validateApplicationService(cfg, req, r.Username) appserviceID, err := validateApplicationService(cfg, req, r.Username)

View file

@ -22,7 +22,7 @@ import (
func main() { func main() {
cfg := basecomponent.ParseFlags() cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "AppService") base := basecomponent.NewBaseDendrite(cfg, "AppServiceAPI")
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()

View file

@ -123,8 +123,6 @@ func setupRegexps(cfg *Dendrite) (err error) {
} }
} }
fmt.Println(exclusiveUsernameStrings, exclusiveAliasStrings)
// Join the regexes together into one big regex. // Join the regexes together into one big regex.
// i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)" // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
// Later we can check if a username or alias matches any exclusive regex and // Later we can check if a username or alias matches any exclusive regex and
@ -194,13 +192,13 @@ func checkErrors(config *Dendrite) (err error) {
// can have the same ID or token. // can have the same ID or token.
if idMap[appservice.ID] { if idMap[appservice.ID] {
return configErrors([]string{fmt.Sprintf( return configErrors([]string{fmt.Sprintf(
"Application Service ID %s must be unique", appservice.ID, "Application service ID %s must be unique", appservice.ID,
)}) )})
} }
// Check if we've already seen this token // Check if we've already seen this token
if tokenMap[appservice.ASToken] { if tokenMap[appservice.ASToken] {
return configErrors([]string{fmt.Sprintf( return configErrors([]string{fmt.Sprintf(
"Application Service Token %s must be unique", appservice.ASToken, "Application service Token %s must be unique", appservice.ASToken,
)}) )})
} }
@ -216,7 +214,7 @@ func checkErrors(config *Dendrite) (err error) {
// namespace, which often ends up in an application service receiving events // namespace, which often ends up in an application service receiving events
// it doesn't want, as an empty regex will match all events. // it doesn't want, as an empty regex will match all events.
return configErrors([]string{fmt.Sprintf( return configErrors([]string{fmt.Sprintf(
"Application Service namespace can only contain a single regex tuple. Check your YAML.", "Application service namespace can only contain a single regex tuple. Check your YAML.",
)}) )})
} }
} }

View file

@ -162,6 +162,9 @@ type Dendrite struct {
// The FederationSender database stores information used by the FederationSender // The FederationSender database stores information used by the FederationSender
// It is only accessed by the FederationSender. // It is only accessed by the FederationSender.
FederationSender DataSource `yaml:"federation_sender"` FederationSender DataSource `yaml:"federation_sender"`
// The AppServices database stores information used by the AppService component.
// It is only accessed by the AppService component.
AppService DataSource `yaml:"appservice"`
// The PublicRoomsAPI database stores information used to compute the public // The PublicRoomsAPI database stores information used to compute the public
// room directory. It is only accessed by the PublicRoomsAPI server. // room directory. It is only accessed by the PublicRoomsAPI server.
PublicRoomsAPI DataSource `yaml:"public_rooms_api"` PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
@ -231,15 +234,15 @@ type Dendrite struct {
Params map[string]interface{} `json:"params"` Params map[string]interface{} `json:"params"`
} }
// Application Services parsed from their config files // Application services parsed from their config files
// The paths of which were given above in the main config file // The paths of which were given above in the main config file
ApplicationServices []ApplicationService ApplicationServices []ApplicationService
// Meta-regexes compiled from all exclusive Application Service // Meta-regexes compiled from all exclusive application service
// Regexes. // Regexes.
// //
// When a user registers, we check that their username does not match any // When a user registers, we check that their username does not match any
// exclusive Application Service namespaces // exclusive application service namespaces
ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
// When a user creates a room alias, we check that it isn't already // When a user creates a room alias, we check that it isn't already
// reserved by an application service // reserved by an application service

2
vendor/manifest vendored
View file

@ -135,7 +135,7 @@
{ {
"importpath": "github.com/matrix-org/gomatrixserverlib", "importpath": "github.com/matrix-org/gomatrixserverlib",
"repository": "https://github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib",
"revision": "38a4f0f648bf357adc4bdb601cdc0535cee14e21", "revision": "929828872b51e6733166553d6b1a20155b6ab829",
"branch": "master" "branch": "master"
}, },
{ {

View file

@ -15,21 +15,8 @@
package gomatrixserverlib package gomatrixserverlib
// ApplicationServiceEvent is an event format that is sent off to an
// application service as part of a transaction.
type ApplicationServiceEvent struct {
Age int64 `json:"age,omitempty"`
Content RawJSON `json:"content,omitempty"`
EventID string `json:"event_id,omitempty"`
OriginServerTimestamp int64 `json:"origin_server_ts,omitempty"`
RoomID string `json:"room_id,omitempty"`
Sender string `json:"sender,omitempty"`
Type string `json:"type,omitempty"`
UserID string `json:"user_id,omitempty"`
}
// ApplicationServiceTransaction is the transaction that is sent off to an // ApplicationServiceTransaction is the transaction that is sent off to an
// application service. // application service.
type ApplicationServiceTransaction struct { type ApplicationServiceTransaction struct {
Events []ApplicationServiceEvent `json:"events"` Events []Event `json:"events"`
} }

View file

@ -261,6 +261,9 @@ func newPowerLevelContentFromAuthEvents(authEvents AuthEventProvider, creatorUse
// If there is no power level event then the creator gets level 100 // If there is no power level event then the creator gets level 100
// https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L569 // https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L569
c.userLevels = map[string]int64{creatorUserID: 100} c.userLevels = map[string]int64{creatorUserID: 100}
// If there is no power level event then the state_default is level 0
// https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L997
c.stateDefaultLevel = 0
return return
} }

View file

@ -37,6 +37,12 @@ type RespState struct {
AuthEvents []Event `json:"auth_chain"` AuthEvents []Event `json:"auth_chain"`
} }
// A RespEventAuth is the content of a response to GET /_matrix/federation/v1/event_auth/{roomID}/{eventID}
type RespEventAuth struct {
// A list of events needed to authenticate the state events.
AuthEvents []Event `json:"auth_chain"`
}
// Events combines the auth events and the state events and returns // Events combines the auth events and the state events and returns
// them in an order where every event comes after its auth events. // them in an order where every event comes after its auth events.
// Each event will only appear once in the output list. // Each event will only appear once in the output list.