diff --git a/dendrite-config.yaml b/dendrite-config.yaml index d04707a61..b19add182 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -52,9 +52,13 @@ media: kafka: # Where the kafka servers are running. addresses: ["localhost:9092"] + # Whether to use naffka instead of kafka. + # Naffka can only be used when running dendrite as a single monolithic server. + # Kafka can be used both with a monolithic server and when running the + # components as separate servers. + use_naffka: false # The names of the kafka topics to use. topics: - input_room_event: roomserverInput output_room_event: roomserverOutput output_client_data: clientapiOutput user_updates: userUpdates @@ -71,6 +75,7 @@ database: # The TCP host:port pairs to bind the internal HTTP APIs to. # These shouldn't be exposed to the public internet. +# These aren't needed when running dendrite as a monolithic server. listen: room_server: "localhost:7770" client_api: "localhost:7771" diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go index 1e2320787..314ca42bc 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -17,12 +17,13 @@ package consumers import ( "encoding/json" - log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" + + log "github.com/Sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -35,12 +36,12 @@ type OutputRoomEvent struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } - roomServerURL := cfg.RoomServerURL() +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + store *accounts.Database, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEvent { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), @@ -50,12 +51,12 @@ func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*Output s := &OutputRoomEvent{ roomServerConsumer: &consumer, db: store, - query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + query: queryAPI, serverName: string(cfg.Matrix.ServerName), } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go index 2597089e3..dba104f90 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go @@ -28,18 +28,6 @@ type SyncAPIProducer struct { Producer sarama.SyncProducer } -// NewSyncAPIProducer creates a new SyncAPIProducer -func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) { - producer, err := sarama.NewSyncProducer(kafkaURIs, nil) - if err != nil { - return nil, err - } - return &SyncAPIProducer{ - Topic: topic, - Producer: producer, - }, nil -} - // SendData sends account data to the sync API server func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { var m sarama.ProducerMessage diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go index f76be0d75..2f2ed7565 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go @@ -34,18 +34,6 @@ type profileUpdate struct { NewValue string `json:"new_value"` // The attribute's value after the update } -// NewUserUpdateProducer creates a new UserUpdateProducer -func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) { - producer, err := sarama.NewSyncProducer(kafkaURIs, nil) - if err != nil { - return nil, err - } - return &UserUpdateProducer{ - Topic: topic, - Producer: producer, - }, nil -} - // SendUpdate sends an update using kafka to notify the roomserver of the // profile update. Returns an error if the update failed to send. func (p *UserUpdateProducer) SendUpdate( diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 5d195bee8..53ebdb934 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" ) var ( @@ -50,24 +51,28 @@ func main() { log.Fatalf("Invalid config file: %s", err) } - log.Info("config: ", cfg) - queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil) inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil) roomserverProducer := producers.NewRoomserverProducer(inputAPI) - userUpdateProducer, err := producers.NewUserUpdateProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), - ) + + kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka producers") } - syncProducer, err := producers.NewSyncAPIProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData), - ) - if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) + + userUpdateProducer := &producers.UserUpdateProducer{ + Producer: kafkaProducer, + Topic: string(cfg.Kafka.Topics.UserUpdates), + } + + syncProducer := &producers.SyncAPIProducer{ + Producer: kafkaProducer, + Topic: string(cfg.Kafka.Topics.OutputClientData), } federation := gomatrixserverlib.NewFederationClient( @@ -90,15 +95,20 @@ func main() { keyRing := gomatrixserverlib.KeyRing{ KeyFetchers: []gomatrixserverlib.KeyFetcher{ // TODO: Use perspective key fetchers for production. - &gomatrixserverlib.DirectKeyFetcher{federation.Client}, + &gomatrixserverlib.DirectKeyFetcher{Client: federation.Client}, }, KeyDatabase: keyDB, } - consumer, err := consumers.NewOutputRoomEvent(cfg, accountDB) + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") } + + consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, accountDB, queryAPI) if err = consumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go index 9052c3f87..dfc2dc2ff 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go @@ -25,9 +25,11 @@ import ( "github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" ) var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") @@ -45,7 +47,15 @@ func main() { log.Fatalf("Invalid config file: %s", err) } - log.Info("config: ", cfg) + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) db, err := storage.NewDatabase(string(cfg.Database.FederationSender)) if err != nil { @@ -58,10 +68,7 @@ func main() { queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation) - consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db) - if err != nil { - log.WithError(err).Panicf("startup: failed to create room server consumer") - } + consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, queues, db, queryAPI) if err = consumer.Start(); err != nil { log.WithError(err).Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 1a265df79..a856a7249 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/naffka" mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing" mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage" @@ -60,6 +61,8 @@ var ( configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") httpBindAddr = flag.String("http-bind-address", ":8008", "The HTTP listening port for the server") httpsBindAddr = flag.String("https-bind-address", ":8448", "The HTTPS listening port for the server") + certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS") + keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS") ) func main() { @@ -70,7 +73,7 @@ func main() { if *configPath == "" { log.Fatal("--config must be supplied") } - cfg, err := config.Load(*configPath) + cfg, err := config.LoadMonolithic(*configPath) if err != nil { log.Fatalf("Invalid config file: %s", err) } @@ -78,6 +81,7 @@ func main() { m := newMonolith(cfg) m.setupDatabases() m.setupFederation() + m.setupKafka() m.setupRoomServer() m.setupProducers() m.setupNotifiers() @@ -85,7 +89,20 @@ func main() { m.setupAPIs() // Expose the matrix APIs directly rather than putting them under a /api path. - log.Fatal(http.ListenAndServe(*httpBindAddr, m.api)) + go func() { + log.Info("Listening on ", *httpBindAddr) + log.Fatal(http.ListenAndServe(*httpBindAddr, m.api)) + }() + // Handle HTTPS if certificate and key are provided + go func() { + if *certFile != "" && *keyFile != "" { + log.Info("Listening on ", *httpsBindAddr) + log.Fatal(http.ListenAndServeTLS(*httpsBindAddr, *certFile, *keyFile, m.api)) + } + }() + + // We want to block forever to let the HTTP and HTTPS handler serve the APIs + select {} } // A monolith contains all the dendrite components. @@ -110,6 +127,9 @@ type monolith struct { queryAPI *roomserver_query.RoomserverQueryAPI aliasAPI *roomserver_alias.RoomserverAliasAPI + naffka *naffka.Naffka + kafkaProducer sarama.SyncProducer + roomServerProducer *producers.RoomserverProducer userUpdateProducer *producers.UserUpdateProducer syncProducer *producers.SyncAPIProducer @@ -167,15 +187,46 @@ func (m *monolith) setupFederation() { } } -func (m *monolith) setupRoomServer() { - kafkaProducer, err := sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) - if err != nil { - panic(err) +func (m *monolith) setupKafka() { + var err error + if m.cfg.Kafka.UseNaffka { + naff, err := naffka.New(&naffka.MemoryDatabase{}) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + }).Panic("Failed to setup naffka") + } + m.naffka = naff + m.kafkaProducer = naff + } else { + m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": m.cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka producers") + } } +} +func (m *monolith) kafkaConsumer() sarama.Consumer { + if m.cfg.Kafka.UseNaffka { + return m.naffka + } + consumer, err := sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": m.cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + return consumer +} + +func (m *monolith) setupRoomServer() { m.inputAPI = &roomserver_input.RoomserverInputAPI{ DB: m.roomServerDB, - Producer: kafkaProducer, + Producer: m.kafkaProducer, OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent), } @@ -192,19 +243,14 @@ func (m *monolith) setupRoomServer() { } func (m *monolith) setupProducers() { - var err error m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI) - m.userUpdateProducer, err = producers.NewUserUpdateProducer( - m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.UserUpdates), - ) - if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err) + m.userUpdateProducer = &producers.UserUpdateProducer{ + Producer: m.kafkaProducer, + Topic: string(m.cfg.Kafka.Topics.UserUpdates), } - m.syncProducer, err = producers.NewSyncAPIProducer( - m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.OutputClientData), - ) - if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err) + m.syncProducer = &producers.SyncAPIProducer{ + Producer: m.kafkaProducer, + Topic: string(m.cfg.Kafka.Topics.OutputClientData), } } @@ -221,42 +267,34 @@ func (m *monolith) setupNotifiers() { } func (m *monolith) setupConsumers() { - clientAPIConsumer, err := clientapi_consumers.NewOutputRoomEvent(m.cfg, m.accountDB) - if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) - } + var err error + + clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI, + ) if err = clientAPIConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer") } - syncAPIRoomConsumer, err := syncapi_consumers.NewOutputRoomEvent( - m.cfg, m.syncAPINotifier, m.syncAPIDB, + syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI, ) - if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) - } if err = syncAPIRoomConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } - syncAPIClientConsumer, err := syncapi_consumers.NewOutputClientData( - m.cfg, m.syncAPINotifier, m.syncAPIDB, + syncAPIClientConsumer := syncapi_consumers.NewOutputClientData( + m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, ) - if err != nil { - log.Panicf("startup: failed to create client API server consumer: %s", err) - } if err = syncAPIClientConsumer.Start(); err != nil { log.Panicf("startup: failed to start client API server consumer: %s", err) } federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) - federationSenderRoomConsumer, err := federationsender_consumers.NewOutputRoomEvent( - m.cfg, federationSenderQueues, m.federationSenderDB, + federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI, ) - if err != nil { - log.WithError(err).Panicf("startup: failed to create room server consumer") - } if err = federationSenderRoomConsumer.Start(); err != nil { log.WithError(err).Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 7db4d4ca4..7e9e4c128 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" @@ -31,6 +32,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" ) var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") @@ -48,7 +50,7 @@ func main() { log.Fatalf("Invalid config file: %s", err) } - log.Info("config: ", cfg) + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI)) if err != nil { @@ -74,17 +76,20 @@ func main() { if err = n.Load(db); err != nil { log.Panicf("startup: failed to set up notifier: %s", err) } - roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db) + + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") } + + roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, n, db, queryAPI) if err = roomConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } - clientConsumer, err := consumers.NewOutputClientData(cfg, n, db) - if err != nil { - log.Panicf("startup: failed to create client API server consumer: %s", err) - } + clientConsumer := consumers.NewOutputClientData(cfg, kafkaConsumer, n, db) if err = clientConsumer.Start(); err != nil { log.Panicf("startup: failed to start client API server consumer: %s", err) } diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 324561f68..ae0fe62cd 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -94,6 +94,11 @@ type Dendrite struct { Kafka struct { // A list of kafka addresses to connect to. Addresses []string `yaml:"addresses"` + // Whether to use naffka instead of kafka. + // Naffka can only be used when running dendrite as a single monolithic server. + // Kafka can be used both with a monolithic server and when running the + // components as separate servers. + UseNaffka bool `yaml:"use_naffka,omitempty"` // The names of the topics to use when reading and writing from kafka. Topics struct { // Topic for roomserver/api.OutputRoomEvent events. @@ -169,7 +174,10 @@ type ThumbnailSize struct { ResizeMethod string `yaml:"method,omitempty"` } -// Load a yaml config file +// Load a yaml config file for a server run as multiple processes. +// Checks the config to ensure that it is valid. +// The checks are different if the server is run as a monolithic process instead +// of being split into multiple components func Load(configPath string) (*Dendrite, error) { configData, err := ioutil.ReadFile(configPath) if err != nil { @@ -181,7 +189,27 @@ func Load(configPath string) (*Dendrite, error) { } // Pass the current working directory and ioutil.ReadFile so that they can // be mocked in the tests - return loadConfig(basePath, configData, ioutil.ReadFile) + monolithic := false + return loadConfig(basePath, configData, ioutil.ReadFile, monolithic) +} + +// LoadMonolithic loads a yaml config file for a server run as a single monolith. +// Checks the config to ensure that it is valid. +// The checks are different if the server is run as a monolithic process instead +// of being split into multiple components +func LoadMonolithic(configPath string) (*Dendrite, error) { + configData, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + basePath, err := filepath.Abs(".") + if err != nil { + return nil, err + } + // Pass the current working directory and ioutil.ReadFile so that they can + // be mocked in the tests + monolithic := true + return loadConfig(basePath, configData, ioutil.ReadFile, monolithic) } // An Error indicates a problem parsing the config. @@ -194,6 +222,7 @@ func loadConfig( basePath string, configData []byte, readFile func(string) ([]byte, error), + monolithic bool, ) (*Dendrite, error) { var config Dendrite var err error @@ -203,7 +232,7 @@ func loadConfig( config.setDefaults() - if err = config.check(); err != nil { + if err = config.check(monolithic); err != nil { return nil, err } @@ -259,7 +288,7 @@ func (e Error) Error() string { ) } -func (config *Dendrite) check() error { +func (config *Dendrite) check(monolithic bool) error { var problems []string if config.Version != Version { @@ -297,21 +326,32 @@ func (config *Dendrite) check() error { checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width)) checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height)) } - - checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) + if config.Kafka.UseNaffka { + if !monolithic { + problems = append(problems, fmt.Sprintf("naffka can only be used in a monolithic server")) + } + } else { + // If we aren't using naffka then we need to have at least one kafka + // server to talk to. + checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) + } checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) + checkNotEmpty("kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates)) checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.device", string(config.Database.Device)) checkNotEmpty("database.server_key", string(config.Database.ServerKey)) checkNotEmpty("database.media_api", string(config.Database.MediaAPI)) checkNotEmpty("database.sync_api", string(config.Database.SyncAPI)) checkNotEmpty("database.room_server", string(config.Database.RoomServer)) - checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI)) - checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI)) - checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI)) - checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI)) - checkNotEmpty("listen.room_server", string(config.Listen.RoomServer)) + + if !monolithic { + checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI)) + checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI)) + checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI)) + checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI)) + checkNotEmpty("listen.room_server", string(config.Listen.RoomServer)) + } if problems != nil { return Error{problems} diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index 4275e3d41..24b0dfc1f 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -25,6 +25,7 @@ func TestLoadConfigRelative(t *testing.T) { "/my/config/dir/matrix_key.pem": testKey, "/my/config/dir/tls_cert.pem": testCert, }.readFile, + false, ) if err != nil { t.Error("failed to load config:", err) @@ -42,9 +43,9 @@ media: kafka: addresses: ["localhost:9092"] topics: - input_room_event: input.room output_room_event: output.room output_client_data: output.client + user_updates: output.user database: media_api: "postgresql:///media_api" account: "postgresql:///account" diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index a28a08d54..948c60f10 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con // Make this configurable somehow? cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" + cfg.Kafka.Topics.UserUpdates = "test.user.output" // TODO: Use different databases for the different schemas. // Using the same database for every schema currently works because diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index 7f133d303..4ebc36c77 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -38,13 +38,13 @@ type OutputRoomEvent struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } - roomServerURL := cfg.RoomServerURL() - +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + queues *queue.OutgoingQueues, + store *storage.Database, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEvent { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, @@ -54,11 +54,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, stor roomServerConsumer: &consumer, db: store, queues: queues, - query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + query: queryAPI, } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index f1b402315..953fe3c8f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -21,8 +21,14 @@ import ( // An OutputType is a type of roomserver output. type OutputType string -// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent -const OutputTypeNewRoomEvent OutputType = "new_room_event" +const ( + // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent + OutputTypeNewRoomEvent OutputType = "new_room_event" + // OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent + OutputTypeNewInviteEvent OutputType = "new_invite_event" + // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent + OutputTypeRetireInviteEvent OutputType = "retire_invite_event" +) // An OutputEvent is an entry in the roomserver output kafka log. // Consumers should check the type field when consuming this event. @@ -31,6 +37,10 @@ type OutputEvent struct { Type OutputType `json:"type"` // The content of event with type OutputTypeNewRoomEvent NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"` + // The content of event with type OutputTypeNewInviteEvent + NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` + // The content of event with type OutputTypeRetireInviteEvent + RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"` } // An OutputNewRoomEvent is written when the roomserver receives a new event. @@ -98,3 +108,26 @@ type OutputNewRoomEvent struct { // future proof the API for virtual hosting. SendAsServer string `json:"send_as_server"` } + +// An OutputNewInviteEvent is written whenever an invite becomes active. +// Invite events can be received outside of an existing room so have to be +// tracked separately from the room events themselves. +type OutputNewInviteEvent struct { + // The "m.room.member" invite event. + Event gomatrixserverlib.Event `json:"event"` +} + +// An OutputRetireInviteEvent is written whenever an existing invite is no longer +// active. An invite stops being active if the user joins the room or if the +// invite is rejected by the user. +type OutputRetireInviteEvent struct { + // The ID of the "m.room.member" invite event. + EventID string + // Optional event ID of the event that replaced the invite. + // This can be empty if the invite was rejected locally and we were unable + // to reach the server that originally sent the invite. + RetiredByEventID string + // The "membership" of the user after retiring the invite. One of "join" + // "leave" or "ban". + Membership string +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index f8acff476..c1eee4c96 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -43,8 +43,8 @@ type RoomEventDatabase interface { // OutputRoomEventWriter has the APIs needed to write an event to the output logs. type OutputRoomEventWriter interface { - // Write an event. - WriteOutputRoomEvent(output api.OutputNewRoomEvent) error + // Write a list of events for a room + WriteOutputEvents(roomID string, updates []api.OutputEvent) error } func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index ffbebd0c7..c8ac58d3a 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -46,22 +46,21 @@ type RoomserverInputAPI struct { processed int64 } -// WriteOutputRoomEvent implements OutputRoomEventWriter -func (r *RoomserverInputAPI) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { - var m sarama.ProducerMessage - oe := api.OutputEvent{ - Type: api.OutputTypeNewRoomEvent, - NewRoomEvent: &output, +// WriteOutputEvents implements OutputRoomEventWriter +func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { + messages := make([]*sarama.ProducerMessage, len(updates)) + for i := range updates { + value, err := json.Marshal(updates[i]) + if err != nil { + return err + } + messages[i] = &sarama.ProducerMessage{ + Topic: r.OutputRoomEventTopic, + Key: sarama.StringEncoder(roomID), + Value: sarama.ByteEncoder(value), + } } - value, err := json.Marshal(oe) - if err != nil { - return err - } - m.Topic = r.OutputRoomEventTopic - m.Key = sarama.StringEncoder("") - m.Value = sarama.ByteEncoder(value) - _, _, err = r.Producer.SendMessage(&m) - return err + return r.Producer.SendMessages(messages) } // InputRoomEvents implements api.RoomserverInputAPI diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 6b5f39679..9328ecf3b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -66,69 +66,88 @@ func updateLatestEvents( } }() - err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event, sendAsServer) - return + u := latestEventsUpdater{ + db: db, updater: updater, ow: ow, roomNID: roomNID, + stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, + } + return u.doUpdateLatestEvents() } -func doUpdateLatestEvents( - db RoomEventDatabase, - updater types.RoomRecentEventsUpdater, - ow OutputRoomEventWriter, - roomNID types.RoomNID, - stateAtEvent types.StateAtEvent, - event gomatrixserverlib.Event, - sendAsServer string, -) error { +// latestEventsUpdater tracks the state used to update the latest events in the +// room. It mostly just ferries state between the various function calls. +// The state could be passed using function arguments, but it becomes impractical +// when there are so many variables to pass around. +type latestEventsUpdater struct { + db RoomEventDatabase + updater types.RoomRecentEventsUpdater + ow OutputRoomEventWriter + roomNID types.RoomNID + stateAtEvent types.StateAtEvent + event gomatrixserverlib.Event + // Which server to send this event as. + sendAsServer string + // The eventID of the event that was processed before this one. + lastEventIDSent string + // The latest events in the room after processing this event. + latest []types.StateAtEventAndReference + // The state entries removed from and added to the current state of the + // room as a result of processing this event. They are sorted lists. + removed []types.StateEntry + added []types.StateEntry + // The state entries that are removed and added to recover the state before + // the event being processed. They are sorted lists. + stateBeforeEventRemoves []types.StateEntry + stateBeforeEventAdds []types.StateEntry + // The snapshots of current state before and after processing this event + oldStateNID types.StateSnapshotNID + newStateNID types.StateSnapshotNID +} + +func (u *latestEventsUpdater) doUpdateLatestEvents() error { var err error var prevEvents []gomatrixserverlib.EventReference - prevEvents = event.PrevEvents() - oldLatest := updater.LatestEvents() - lastEventIDSent := updater.LastEventIDSent() - oldStateNID := updater.CurrentStateSnapshotNID() + prevEvents = u.event.PrevEvents() + oldLatest := u.updater.LatestEvents() + u.lastEventIDSent = u.updater.LastEventIDSent() + u.oldStateNID = u.updater.CurrentStateSnapshotNID() - if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil { + if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil { return err } else if hasBeenSent { // Already sent this event so we can stop processing return nil } - if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil { + if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil { return err } - eventReference := event.EventReference() + eventReference := u.event.EventReference() // Check if this event is already referenced by another event in the room. var alreadyReferenced bool - if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil { + if alreadyReferenced, err = u.updater.IsReferenced(eventReference); err != nil { return err } - newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{ + u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{ EventReference: eventReference, - StateAtEvent: stateAtEvent, + StateAtEvent: u.stateAtEvent, }) - latestStateAtEvents := make([]types.StateAtEvent, len(newLatest)) - for i := range newLatest { - latestStateAtEvents[i] = newLatest[i].StateAtEvent + if err = u.latestState(); err != nil { + return err } - newStateNID, err := state.CalculateAndStoreStateAfterEvents(db, roomNID, latestStateAtEvents) + + updates, err := updateMemberships(u.db, u.updater, u.removed, u.added) if err != nil { return err } - removed, added, err := state.DifferenceBetweeenStateSnapshots(db, oldStateNID, newStateNID) - if err != nil { - return err - } - - stateBeforeEventRemoves, stateBeforeEventAdds, err := state.DifferenceBetweeenStateSnapshots( - db, newStateNID, stateAtEvent.BeforeStateSnapshotNID, - ) + update, err := u.makeOutputNewRoomEvent() if err != nil { return err } + updates = append(updates, *update) // Send the event to the output logs. // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. @@ -138,24 +157,47 @@ func doUpdateLatestEvents( // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = writeEvent( - db, ow, lastEventIDSent, event, newLatest, removed, added, - stateBeforeEventRemoves, stateBeforeEventAdds, sendAsServer, - ); err != nil { + if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil { return err } - if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID, newStateNID); err != nil { + if err = u.updater.SetLatestEvents(u.roomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { return err } - if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil { + if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { return err } return nil } +func (u *latestEventsUpdater) latestState() error { + var err error + + latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)) + for i := range u.latest { + latestStateAtEvents[i] = u.latest[i].StateAtEvent + } + u.newStateNID, err = state.CalculateAndStoreStateAfterEvents(u.db, u.roomNID, latestStateAtEvents) + if err != nil { + return err + } + + u.removed, u.added, err = state.DifferenceBetweeenStateSnapshots(u.db, u.oldStateNID, u.newStateNID) + if err != nil { + return err + } + + u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = state.DifferenceBetweeenStateSnapshots( + u.db, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID, + ) + if err != nil { + return err + } + return nil +} + func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference { var alreadyInLatest bool var newLatest []types.StateAtEventAndReference @@ -189,57 +231,55 @@ func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenc return newLatest } -func writeEvent( - db RoomEventDatabase, ow OutputRoomEventWriter, lastEventIDSent string, - event gomatrixserverlib.Event, latest []types.StateAtEventAndReference, - removed, added []types.StateEntry, - stateBeforeEventRemoves, stateBeforeEventAdds []types.StateEntry, - sendAsServer string, -) error { +func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { - latestEventIDs := make([]string, len(latest)) - for i := range latest { - latestEventIDs[i] = latest[i].EventID + latestEventIDs := make([]string, len(u.latest)) + for i := range u.latest { + latestEventIDs[i] = u.latest[i].EventID } ore := api.OutputNewRoomEvent{ - Event: event, - LastSentEventID: lastEventIDSent, + Event: u.event, + LastSentEventID: u.lastEventIDSent, LatestEventIDs: latestEventIDs, } var stateEventNIDs []types.EventNID - for _, entry := range added { + for _, entry := range u.added { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } - for _, entry := range removed { + for _, entry := range u.removed { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } - for _, entry := range stateBeforeEventRemoves { + for _, entry := range u.stateBeforeEventRemoves { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } - for _, entry := range stateBeforeEventAdds { + for _, entry := range u.stateBeforeEventAdds { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] - eventIDMap, err := db.EventIDs(stateEventNIDs) + eventIDMap, err := u.db.EventIDs(stateEventNIDs) if err != nil { - return err + return nil, err } - for _, entry := range added { + for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } - for _, entry := range removed { + for _, entry := range u.removed { ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID]) } - for _, entry := range stateBeforeEventRemoves { + for _, entry := range u.stateBeforeEventRemoves { ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID]) } - for _, entry := range stateBeforeEventAdds { + for _, entry := range u.stateBeforeEventAdds { ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) } - ore.SendAsServer = sendAsServer - return ow.WriteOutputRoomEvent(ore) + ore.SendAsServer = u.sendAsServer + + return &api.OutputEvent{ + Type: api.OutputTypeNewRoomEvent, + NewRoomEvent: &ore, + }, nil } type eventNIDSorter []types.EventNID diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go new file mode 100644 index 000000000..f306697ff --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go @@ -0,0 +1,297 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package input + +import ( + "fmt" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// updateMembership updates the current membership and the invites for each +// user affected by a change in the current state of the room. +// Returns a list of output events to write to the kafka log to inform the +// consumers about the invites added or retired by the change in current state. +func updateMemberships( + db RoomEventDatabase, updater types.RoomRecentEventsUpdater, removed, added []types.StateEntry, +) ([]api.OutputEvent, error) { + changes := membershipChanges(removed, added) + var eventNIDs []types.EventNID + for _, change := range changes { + if change.addedEventNID != 0 { + eventNIDs = append(eventNIDs, change.addedEventNID) + } + if change.removedEventNID != 0 { + eventNIDs = append(eventNIDs, change.removedEventNID) + } + } + + // Load the event JSON so we can look up the "membership" key. + // TODO: Maybe add a membership key to the events table so we can load that + // key without having to load the entire event JSON? + events, err := db.Events(eventNIDs) + if err != nil { + return nil, err + } + + var updates []api.OutputEvent + + for _, change := range changes { + var ae *gomatrixserverlib.Event + var re *gomatrixserverlib.Event + targetUserNID := change.EventStateKeyNID + if change.removedEventNID != 0 { + ev, _ := eventMap(events).lookup(change.removedEventNID) + if ev != nil { + re = &ev.Event + } + } + if change.addedEventNID != 0 { + ev, _ := eventMap(events).lookup(change.addedEventNID) + if ev != nil { + ae = &ev.Event + } + } + if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil { + return nil, err + } + } + return nil, nil +} + +func updateMembership( + updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID, + remove, add *gomatrixserverlib.Event, + updates []api.OutputEvent, +) ([]api.OutputEvent, error) { + var err error + // Default the membership to "leave" if no event was added or removed. + old := "leave" + new := "leave" + + if remove != nil { + old, err = remove.Membership() + if err != nil { + return nil, err + } + } + if add != nil { + new, err = add.Membership() + if err != nil { + return nil, err + } + } + if old == new { + // If the membership is the same then nothing changed and we can return + // immediately. This should help speed up processing for display name + // changes where the membership is "join" both before and after. + return updates, nil + } + + mu, err := updater.MembershipUpdater(targetUserNID) + if err != nil { + return nil, err + } + + switch new { + case "invite": + return updateToInviteMembership(mu, add, updates) + case "join": + return updateToJoinMembership(mu, add, updates) + case "leave", "ban": + return updateToLeaveMembership(mu, add, new, updates) + default: + panic(fmt.Errorf( + "input: membership %q is not one of the allowed values", new, + )) + } +} + +func updateToInviteMembership( + mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, +) ([]api.OutputEvent, error) { + // We may have already sent the invite to the user, either because we are + // reprocessing this event, or because the we received this invite from a + // remote server via the federation invite API. In those cases we don't need + // to send the event. + needsSending, err := mu.SetToInvite(*add) + if err != nil { + return nil, err + } + if needsSending { + // We notify the consumers using a special event even though we will + // notify them about the change in current state as part of the normal + // room event stream. This ensures that the consumers only have to + // consider a single stream of events when determining whether a user + // is invited, rather than having to combine multiple streams themselves. + onie := api.OutputNewInviteEvent{ + Event: *add, + } + updates = append(updates, api.OutputEvent{ + Type: api.OutputTypeNewInviteEvent, + NewInviteEvent: &onie, + }) + } + return updates, nil +} + +func updateToJoinMembership( + mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, +) ([]api.OutputEvent, error) { + // If the user is already marked as being joined then we can return immediately. + // TODO: Is this code reachable given the "old != new" guard in updateMembership? + if mu.IsJoin() { + return updates, nil + } + // When we mark a user as being joined we will invalidate any invites that + // are active for that user. We notify the consumers that the invites have + // been retired using a special event, even though they could infer this + // by studying the state changes in the room event stream. + retired, err := mu.SetToJoin(add.Sender()) + if err != nil { + return nil, err + } + for _, eventID := range retired { + orie := api.OutputRetireInviteEvent{ + EventID: eventID, + Membership: "join", + } + if add != nil { + orie.RetiredByEventID = add.EventID() + } + updates = append(updates, api.OutputEvent{ + Type: api.OutputTypeRetireInviteEvent, + RetireInviteEvent: &orie, + }) + } + return updates, nil +} + +func updateToLeaveMembership( + mu types.MembershipUpdater, add *gomatrixserverlib.Event, + newMembership string, updates []api.OutputEvent, +) ([]api.OutputEvent, error) { + // If the user is already neither joined, nor invited to the room then we + // can return immediately. + if mu.IsLeave() { + return updates, nil + } + // When we mark a user as having left we will invalidate any invites that + // are active for that user. We notify the consumers that the invites have + // been retired using a special event, even though they could infer this + // by studying the state changes in the room event stream. + retired, err := mu.SetToLeave(add.Sender()) + if err != nil { + return nil, err + } + for _, eventID := range retired { + orie := api.OutputRetireInviteEvent{ + EventID: eventID, + Membership: newMembership, + } + if add != nil { + orie.RetiredByEventID = add.EventID() + } + updates = append(updates, api.OutputEvent{ + Type: api.OutputTypeRetireInviteEvent, + RetireInviteEvent: &orie, + }) + } + return updates, nil +} + +// membershipChanges pairs up the membership state changes from a sorted list +// of state removed and a sorted list of state added. +func membershipChanges(removed, added []types.StateEntry) []stateChange { + changes := pairUpChanges(removed, added) + var result []stateChange + for _, c := range changes { + if c.EventTypeNID == types.MRoomMemberNID { + result = append(result, c) + } + } + return result +} + +type stateChange struct { + types.StateKeyTuple + removedEventNID types.EventNID + addedEventNID types.EventNID +} + +// pairUpChanges pairs up the state events added and removed for each type, +// state key tuple. Assumes that removed and added are sorted. +func pairUpChanges(removed, added []types.StateEntry) []stateChange { + var ai int + var ri int + var result []stateChange + for { + switch { + case ai == len(added): + // We've reached the end of the added entries. + // The rest of the removed list are events that were removed without + // an event with the same state key being added. + for _, s := range removed[ri:] { + result = append(result, stateChange{ + StateKeyTuple: s.StateKeyTuple, + removedEventNID: s.EventNID, + }) + } + return result + case ri == len(removed): + // We've reached the end of the removed entries. + // The rest of the added list are events that were added without + // an event with the same state key being removed. + for _, s := range added[ai:] { + result = append(result, stateChange{ + StateKeyTuple: s.StateKeyTuple, + addedEventNID: s.EventNID, + }) + } + return result + case added[ai].StateKeyTuple == removed[ri].StateKeyTuple: + // The tuple is in both lists so an event with that key is being + // removed and another event with the same key is being added. + result = append(result, stateChange{ + StateKeyTuple: added[ai].StateKeyTuple, + removedEventNID: removed[ri].EventNID, + addedEventNID: added[ai].EventNID, + }) + ai++ + ri++ + case added[ai].StateKeyTuple.LessThan(removed[ri].StateKeyTuple): + // The lists are sorted so the added entry being less than the + // removed entry means that the added event was added without an + // event with the same key being removed. + result = append(result, stateChange{ + StateKeyTuple: added[ai].StateKeyTuple, + addedEventNID: added[ai].EventNID, + }) + ai++ + default: + // Reaching the default case implies that the removed entry is less + // than the added entry. Since the lists are sorted this means that + // the removed event was removed without an event with the same + // key being added. + result = append(result, stateChange{ + StateKeyTuple: removed[ai].StateKeyTuple, + removedEventNID: removed[ri].EventNID, + }) + ri++ + } + } +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go index da5b8a4e5..d30e45815 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go @@ -76,15 +76,23 @@ func (s *eventStateKeyStatements) prepare(db *sql.DB) (err error) { }.prepare(db) } -func (s *eventStateKeyStatements) insertEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) { +func (s *eventStateKeyStatements) insertEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) { var eventStateKeyNID int64 - err := s.insertEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) + stmt := s.insertEventStateKeyNIDStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) return types.EventStateKeyNID(eventStateKeyNID), err } -func (s *eventStateKeyStatements) selectEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) { +func (s *eventStateKeyStatements) selectEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) { var eventStateKeyNID int64 - err := s.selectEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) + stmt := s.selectEventStateKeyNIDStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) return types.EventStateKeyNID(eventStateKeyNID), err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go new file mode 100644 index 000000000..9e0860b42 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go @@ -0,0 +1,149 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/roomserver/types" +) + +const inviteSchema = ` +CREATE TABLE IF NOT EXISTS roomserver_invites ( + -- The string ID of the invite event itself. + -- We can't use a numeric event ID here because we don't always have + -- enough information to store an invite in the event table. + -- In particular we don't always have a chain of auth_events for invites + -- received over federation. + invite_event_id TEXT PRIMARY KEY, + -- The numeric ID of the room the invite m.room.member event is in. + room_nid BIGINT NOT NULL, + -- The numeric ID for the state key of the invite m.room.member event. + -- This tells us who the invite is for. + -- This is used to query the active invites for a user. + target_nid BIGINT NOT NULL, + -- The numeric ID for the sender of the invite m.room.member event. + -- This tells us who sent the invite. + -- This is used to work out which matrix server we should talk to when + -- we try to join the room. + sender_nid BIGINT NOT NULL DEFAULT 0, + -- This is used to track whether the invite is still active. + -- This is set implicitly when processing new join and leave events and + -- explicitly when rejecting events over federation. + retired BOOLEAN NOT NULL DEFAULT FALSE, + -- The invite event JSON. + invite_event_json TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS roomserver_invites_active_idx ON roomserver_invites (target_nid, room_nid) + WHERE NOT retired; +` +const insertInviteEventSQL = "" + + "INSERT INTO roomserver_invites (invite_event_id, room_nid, target_nid," + + " sender_nid, invite_event_json) VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT DO NOTHING" + +const selectInviteActiveForUserInRoomSQL = "" + + "SELECT sender_nid FROM roomserver_invites" + + " WHERE target_nid = $1 AND room_nid = $2" + + " AND NOT retired" + +// Retire every active invite for a user in a room. +// Ideally we'd know which invite events were retired by a given update so we +// wouldn't need to remove every active invite. +// However the matrix protocol doesn't give us a way to reliably identify the +// invites that were retired, so we are forced to retire all of them. +const updateInviteRetiredSQL = "" + + "UPDATE roomserver_invites SET retired = TRUE" + + " WHERE room_nid = $1 AND target_nid = $2 AND NOT retired" + + " RETURNING invite_event_id" + +type inviteStatements struct { + insertInviteEventStmt *sql.Stmt + selectInviteActiveForUserInRoomStmt *sql.Stmt + updateInviteRetiredStmt *sql.Stmt +} + +func (s *inviteStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(inviteSchema) + if err != nil { + return + } + + return statementList{ + {&s.insertInviteEventStmt, insertInviteEventSQL}, + {&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL}, + {&s.updateInviteRetiredStmt, updateInviteRetiredSQL}, + }.prepare(db) +} + +func (s *inviteStatements) insertInviteEvent( + txn *sql.Tx, inviteEventID string, roomNID types.RoomNID, + targetUserNID, senderUserNID types.EventStateKeyNID, + inviteEventJSON []byte, +) (bool, error) { + result, err := txn.Stmt(s.insertInviteEventStmt).Exec( + inviteEventID, roomNID, targetUserNID, senderUserNID, inviteEventJSON, + ) + if err != nil { + return false, err + } + count, err := result.RowsAffected() + if err != nil { + return false, err + } + return count != 0, nil +} + +func (s *inviteStatements) updateInviteRetired( + txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, +) ([]string, error) { + rows, err := txn.Stmt(s.updateInviteRetiredStmt).Query(roomNID, targetUserNID) + if err != nil { + return nil, err + } + defer rows.Close() + var result []string + for rows.Next() { + var inviteEventID string + if err := rows.Scan(&inviteEventID); err != nil { + return nil, err + } + result = append(result, inviteEventID) + } + return result, nil +} + +// selectInviteActiveForUserInRoom returns a list of sender state key NIDs +func (s *inviteStatements) selectInviteActiveForUserInRoom( + targetUserNID types.EventStateKeyNID, roomNID types.RoomNID, +) ([]types.EventStateKeyNID, error) { + rows, err := s.selectInviteActiveForUserInRoomStmt.Query( + targetUserNID, roomNID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var result []types.EventStateKeyNID + for rows.Next() { + var senderUserNID int64 + if err := rows.Scan(&senderUserNID); err != nil { + return nil, err + } + result = append(result, types.EventStateKeyNID(senderUserNID)) + } + return result, nil +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go new file mode 100644 index 000000000..725e5b8d9 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go @@ -0,0 +1,111 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/roomserver/types" +) + +type membershipState int64 + +const ( + membershipStateLeaveOrBan membershipState = 1 + membershipStateInvite membershipState = 2 + membershipStateJoin membershipState = 3 +) + +const membershipSchema = ` +-- The membership table is used to coordinate updates between the invite table +-- and the room state tables. +-- This table is updated in one of 3 ways: +-- 1) The membership of a user changes within the current state of the room. +-- 2) An invite is received outside of a room over federation. +-- 3) An invite is rejected outside of a room over federation. +CREATE TABLE IF NOT EXISTS roomserver_membership ( + room_nid BIGINT NOT NULL, + -- Numeric state key ID for the user ID this state is for. + target_nid BIGINT NOT NULL, + -- Numeric state key ID for the user ID who changed the state. + -- This may be 0 since it is not always possible to identify the user that + -- changed the state. + sender_nid BIGINT NOT NULL DEFAULT 0, + -- The state the user is in within this room. + -- Default value is "membershipStateLeaveOrBan" + membership_nid BIGINT NOT NULL DEFAULT 1, + UNIQUE (room_nid, target_nid) +); +` + +// Insert a row in to membership table so that it can be locked by the +// SELECT FOR UPDATE +const insertMembershipSQL = "" + + "INSERT INTO roomserver_membership (room_nid, target_nid)" + + " VALUES ($1, $2)" + + " ON CONFLICT DO NOTHING" + +const selectMembershipForUpdateSQL = "" + + "SELECT membership_nid FROM roomserver_membership" + + " WHERE room_nid = $1 AND target_nid = $2 FOR UPDATE" + +const updateMembershipSQL = "" + + "UPDATE roomserver_membership SET sender_nid = $3, membership_nid = $4" + + " WHERE room_nid = $1 AND target_nid = $2" + +type membershipStatements struct { + insertMembershipStmt *sql.Stmt + selectMembershipForUpdateStmt *sql.Stmt + updateMembershipStmt *sql.Stmt +} + +func (s *membershipStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(membershipSchema) + if err != nil { + return + } + + return statementList{ + {&s.insertMembershipStmt, insertMembershipSQL}, + {&s.selectMembershipForUpdateStmt, selectMembershipForUpdateSQL}, + {&s.updateMembershipStmt, updateMembershipSQL}, + }.prepare(db) +} + +func (s *membershipStatements) insertMembership( + txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, +) error { + _, err := txn.Stmt(s.insertMembershipStmt).Exec(roomNID, targetUserNID) + return err +} + +func (s *membershipStatements) selectMembershipForUpdate( + txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, +) (membership membershipState, err error) { + err = txn.Stmt(s.selectMembershipForUpdateStmt).QueryRow( + roomNID, targetUserNID, + ).Scan(&membership) + return +} + +func (s *membershipStatements) updateMembership( + txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, + senderUserNID types.EventStateKeyNID, membership membershipState, +) error { + _, err := txn.Stmt(s.updateMembershipStmt).Exec( + roomNID, targetUserNID, senderUserNID, membership, + ) + return err +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index ddab2356b..a24dbb1d3 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -28,6 +28,8 @@ type statements struct { stateBlockStatements previousEventStatements roomAliasesStatements + inviteStatements + membershipStatements } func (s *statements) prepare(db *sql.DB) error { @@ -43,6 +45,8 @@ func (s *statements) prepare(db *sql.DB) error { s.stateBlockStatements.prepare, s.previousEventStatements.prepare, s.roomAliasesStatements.prepare, + s.inviteStatements.prepare, + s.membershipStatements.prepare, } { if err = prepare(db); err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 3f99e7d85..d323fd139 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -16,6 +16,7 @@ package storage import ( "database/sql" + // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/roomserver/types" @@ -64,7 +65,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ // Assigned a numeric ID for the state_key if there is one present. // Otherwise set the numeric ID for the state_key to 0. if eventStateKey != nil { - if eventStateKeyNID, err = d.assignStateKeyNID(*eventStateKey); err != nil { + if eventStateKeyNID, err = d.assignStateKeyNID(nil, *eventStateKey); err != nil { return 0, types.StateAtEvent{}, err } } @@ -131,15 +132,15 @@ func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, err return eventTypeNID, err } -func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) { +func (d *Database) assignStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) { // Check if we already have a numeric ID in the database. - eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey) + eventStateKeyNID, err := d.statements.selectEventStateKeyNID(txn, eventStateKey) if err == sql.ErrNoRows { // We don't have a numeric ID so insert one into the database. - eventStateKeyNID, err = d.statements.insertEventStateKeyNID(eventStateKey) + eventStateKeyNID, err = d.statements.insertEventStateKeyNID(txn, eventStateKey) if err == sql.ErrNoRows { // We raced with another insert so run the select again. - eventStateKeyNID, err = d.statements.selectEventStateKeyNID(eventStateKey) + eventStateKeyNID, err = d.statements.selectEventStateKeyNID(txn, eventStateKey) } } return eventStateKeyNID, err @@ -249,12 +250,15 @@ func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) (types.RoomRe return nil, err } } - return &roomRecentEventsUpdater{txn, d, stateAndRefs, lastEventIDSent, currentStateSnapshotNID}, nil + return &roomRecentEventsUpdater{ + transaction{txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID, + }, nil } type roomRecentEventsUpdater struct { - txn *sql.Tx + transaction d *Database + roomNID types.RoomNID latestEvents []types.StateAtEventAndReference lastEventIDSent string currentStateSnapshotNID types.StateSnapshotNID @@ -319,14 +323,8 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error return u.d.statements.updateEventSentToOutput(u.txn, eventNID) } -// Commit implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) Commit() error { - return u.txn.Commit() -} - -// Rollback implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) Rollback() error { - return u.txn.Rollback() +func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (types.MembershipUpdater, error) { + return u.d.membershipUpdaterTxn(u.txn, u.roomNID, targetUserNID) } // RoomNID implements query.RoomserverQueryAPIDB @@ -381,3 +379,124 @@ func (d *Database) StateEntriesForTuples( ) ([]types.StateEntryList, error) { return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples) } + +type membershipUpdater struct { + transaction + d *Database + roomNID types.RoomNID + targetUserNID types.EventStateKeyNID + membership membershipState +} + +func (d *Database) membershipUpdaterTxn( + txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, +) (types.MembershipUpdater, error) { + + if err := d.statements.insertMembership(txn, roomNID, targetUserNID); err != nil { + return nil, err + } + + membership, err := d.statements.selectMembershipForUpdate(txn, roomNID, targetUserNID) + if err != nil { + return nil, err + } + + return &membershipUpdater{ + transaction{txn}, d, roomNID, targetUserNID, membership, + }, nil +} + +// IsInvite implements types.MembershipUpdater +func (u *membershipUpdater) IsInvite() bool { + return u.membership == membershipStateInvite +} + +// IsJoin implements types.MembershipUpdater +func (u *membershipUpdater) IsJoin() bool { + return u.membership == membershipStateJoin +} + +// IsLeave implements types.MembershipUpdater +func (u *membershipUpdater) IsLeave() bool { + return u.membership == membershipStateLeaveOrBan +} + +// SetToInvite implements types.MembershipUpdater +func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) { + senderUserNID, err := u.d.assignStateKeyNID(u.txn, event.Sender()) + if err != nil { + return false, err + } + inserted, err := u.d.statements.insertInviteEvent( + u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(), + ) + if err != nil { + return false, err + } + if u.membership != membershipStateInvite { + if err = u.d.statements.updateMembership( + u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateInvite, + ); err != nil { + return false, err + } + } + return inserted, nil +} + +// SetToJoin implements types.MembershipUpdater +func (u *membershipUpdater) SetToJoin(senderUserID string) ([]string, error) { + senderUserNID, err := u.d.assignStateKeyNID(u.txn, senderUserID) + if err != nil { + return nil, err + } + inviteEventIDs, err := u.d.statements.updateInviteRetired( + u.txn, u.roomNID, u.targetUserNID, + ) + if err != nil { + return nil, err + } + if u.membership != membershipStateJoin { + if err = u.d.statements.updateMembership( + u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateJoin, + ); err != nil { + return nil, err + } + } + return inviteEventIDs, nil +} + +// SetToLeave implements types.MembershipUpdater +func (u *membershipUpdater) SetToLeave(senderUserID string) ([]string, error) { + senderUserNID, err := u.d.assignStateKeyNID(u.txn, senderUserID) + if err != nil { + return nil, err + } + inviteEventIDs, err := u.d.statements.updateInviteRetired( + u.txn, u.roomNID, u.targetUserNID, + ) + if err != nil { + return nil, err + } + if u.membership != membershipStateLeaveOrBan { + if err = u.d.statements.updateMembership( + u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateLeaveOrBan, + ); err != nil { + return nil, err + } + } + return inviteEventIDs, nil +} + +type transaction struct { + txn *sql.Tx +} + +// Commit implements types.Transaction +func (t *transaction) Commit() error { + return t.txn.Commit() +} + +// Rollback implements types.Transaction +func (t *transaction) Rollback() error { + return t.txn.Rollback() +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index b255b64b9..809b6e574 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -135,9 +135,17 @@ type StateEntryList struct { StateEntries []StateEntry } +// A Transaction is something that can be committed or rolledback. +type Transaction interface { + // Commit the transaction + Commit() error + // Rollback the transaction. + Rollback() error +} + // A RoomRecentEventsUpdater is used to update the recent events in a room. // (On postgresql this wraps a database transaction that holds a "FOR UPDATE" -// lock on the row holding the latest events for the room.) +// lock on the row in the rooms table holding the latest events for the room.) type RoomRecentEventsUpdater interface { // The latest event IDs and state in the room. LatestEvents() []StateAtEventAndReference @@ -163,10 +171,36 @@ type RoomRecentEventsUpdater interface { HasEventBeenSent(eventNID EventNID) (bool, error) // Mark the event as having been sent to the output logs. MarkEventAsSent(eventNID EventNID) error - // Commit the transaction - Commit() error - // Rollback the transaction. - Rollback() error + // Build a membership updater for the target user in this room. + // It will share the same transaction as this updater. + MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error) + // Implements Transaction so it can be committed or rolledback + Transaction +} + +// A MembershipUpdater is used to update the membership of a user in a room. +// (On postgresql this wraps a database transaction that holds a "FOR UPDATE" +// lock on the row in the membership table for this user in the room) +// The caller should call one of SetToInvite, SetToJoin or SetToLeave once to +// make the update, or none of them if no update is required. +type MembershipUpdater interface { + // True if the target user is invited to the room before updating. + IsInvite() bool + // True if the target user is joined to the room before updating. + IsJoin() bool + // True if the target user is not invited or joined to the room before updating. + IsLeave() bool + // Set the state to invite. + // Returns whether this invite needs to be sent + SetToInvite(event gomatrixserverlib.Event) (needsSending bool, err error) + // Set the state to join. + // Returns a list of invite event IDs that this state change retired. + SetToJoin(senderUserID string) (inviteEventIDs []string, err error) + // Set the state to leave. + // Returns a list of invite event IDs that this state change retired. + SetToLeave(senderUserID string) (inviteEventIDs []string, err error) + // Implements Transaction so it can be committed or rolledback. + Transaction } // A MissingEventError is an error that happened because the roomserver was diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go index a2a240ff9..7cc38b815 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -33,11 +33,12 @@ type OutputClientData struct { } // NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. -func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } +func NewOutputClientData( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + n *sync.Notifier, + store *storage.SyncServerDatabase, +) *OutputClientData { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputClientData), @@ -51,7 +52,7 @@ func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage. } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index c846705fc..373577894 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -44,12 +44,13 @@ type prevEventRef struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } - roomServerURL := cfg.RoomServerURL() +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + n *sync.Notifier, + store *storage.SyncServerDatabase, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEvent { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), @@ -60,11 +61,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S roomServerConsumer: &consumer, db: store, notifier: n, - query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + query: queryAPI, } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers diff --git a/vendor/manifest b/vendor/manifest index 425cc8f1f..99bb98dae 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -101,6 +101,12 @@ "revision": "768a8767051a4aca7f5e41f912954ae04d5f1efb", "branch": "master" }, + { + "importpath": "github.com/matrix-org/naffka", + "repository": "https://github.com/matrix-org/naffka", + "revision": "d28656e34f96a8eeaab53e3b7678c9ce14af5786", + "branch": "master" + }, { "importpath": "github.com/matrix-org/util", "repository": "https://github.com/matrix-org/util", diff --git a/vendor/src/github.com/matrix-org/naffka/README.md b/vendor/src/github.com/matrix-org/naffka/README.md new file mode 100644 index 000000000..8a7dd2591 --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/README.md @@ -0,0 +1,5 @@ +# naffka + +Single in-process implementation of the [sarama golang kafka](https://github.com/Shopify/sarama) APIs. + +It's like Kafka, but a bit [naff](https://www.collinsdictionary.com/dictionary/english/naff). diff --git a/vendor/src/github.com/matrix-org/naffka/hooks/install.sh b/vendor/src/github.com/matrix-org/naffka/hooks/install.sh new file mode 100644 index 000000000..f8aa331ff --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/hooks/install.sh @@ -0,0 +1,5 @@ +#! /bin/bash + +DOT_GIT="$(dirname $0)/../.git" + +ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit" \ No newline at end of file diff --git a/vendor/src/github.com/matrix-org/naffka/hooks/pre-commit b/vendor/src/github.com/matrix-org/naffka/hooks/pre-commit new file mode 100644 index 000000000..a7ec4d015 --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/hooks/pre-commit @@ -0,0 +1,24 @@ +#! /bin/bash + +set -eu + +golint ./... +misspell -error . + +# gofmt doesn't exit with an error code if the files don't match the expected +# format. So we have to run it and see if it outputs anything. +if gofmt -l -s . 2>&1 | read +then + echo "Error: not all code had been formatted with gofmt." + echo "Fixing the following files" + gofmt -s -w -l . + echo + echo "Please add them to the commit" + git status --short + exit 1 +fi + +ineffassign . +go tool vet --all --shadow . +gocyclo -over 16 . +go test -timeout 5s . ./... diff --git a/vendor/src/github.com/matrix-org/naffka/memorydatabase.go b/vendor/src/github.com/matrix-org/naffka/memorydatabase.go new file mode 100644 index 000000000..05d1f3ee6 --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/memorydatabase.go @@ -0,0 +1,91 @@ +package naffka + +import ( + "fmt" + "sync" +) + +// A MemoryDatabase stores the message history as arrays in memory. +// It can be used to run unit tests. +// If the process is stopped then any messages that haven't been +// processed by a consumer are lost forever. +type MemoryDatabase struct { + topicsMutex sync.Mutex + topics map[string]*memoryDatabaseTopic +} + +type memoryDatabaseTopic struct { + messagesMutex sync.Mutex + messages []Message +} + +func (t *memoryDatabaseTopic) addMessages(msgs []Message) error { + t.messagesMutex.Lock() + defer t.messagesMutex.Unlock() + if int64(len(t.messages)) != msgs[0].Offset { + return fmt.Errorf("message offset %d is not immediately after the previous offset %d", msgs[0].Offset, len(t.messages)) + } + t.messages = append(t.messages, msgs...) + return nil +} + +// getMessages returns the current messages as a slice. +// This slice will have it's own copy of the length field so won't be affected +// by adding more messages in addMessages. +// The slice will share the same backing array with the slice we append new +// messages to. It is safe to read the messages in the backing array since we +// only append to the slice. It is not safe to write or append to the returned +// slice. +func (t *memoryDatabaseTopic) getMessages() []Message { + t.messagesMutex.Lock() + defer t.messagesMutex.Unlock() + return t.messages +} + +func (m *MemoryDatabase) getTopic(topicName string) *memoryDatabaseTopic { + m.topicsMutex.Lock() + defer m.topicsMutex.Unlock() + result := m.topics[topicName] + if result == nil { + result = &memoryDatabaseTopic{} + if m.topics == nil { + m.topics = map[string]*memoryDatabaseTopic{} + } + m.topics[topicName] = result + } + return result +} + +// StoreMessages implements Database +func (m *MemoryDatabase) StoreMessages(topic string, messages []Message) error { + if err := m.getTopic(topic).addMessages(messages); err != nil { + return err + } + return nil +} + +// FetchMessages implements Database +func (m *MemoryDatabase) FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error) { + messages := m.getTopic(topic).getMessages() + if endOffset > int64(len(messages)) { + return nil, fmt.Errorf("end offset %d out of range %d", endOffset, len(messages)) + } + if startOffset >= endOffset { + return nil, fmt.Errorf("start offset %d greater than or equal to end offset %d", startOffset, endOffset) + } + if startOffset < -1 { + return nil, fmt.Errorf("start offset %d less than -1", startOffset) + } + return messages[startOffset+1 : endOffset], nil +} + +// MaxOffsets implements Database +func (m *MemoryDatabase) MaxOffsets() (map[string]int64, error) { + m.topicsMutex.Lock() + defer m.topicsMutex.Unlock() + result := map[string]int64{} + for name, t := range m.topics { + result[name] = int64(len(t.getMessages())) - 1 + } + return result, nil +} diff --git a/vendor/src/github.com/matrix-org/naffka/naffka.go b/vendor/src/github.com/matrix-org/naffka/naffka.go new file mode 100644 index 000000000..d429ffdaa --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/naffka.go @@ -0,0 +1,360 @@ +package naffka + +import ( + "fmt" + "log" + "sync" + "time" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// Naffka is an implementation of the sarama kafka API designed to run within a +// single go process. It implements both the sarama.SyncProducer and the +// sarama.Consumer interfaces. This means it can act as a drop in replacement +// for kafka for testing or single instance deployment. +type Naffka struct { + db Database + topicsMutex sync.Mutex + topics map[string]*topic +} + +// New creates a new Naffka instance. +func New(db Database) (*Naffka, error) { + n := &Naffka{db: db, topics: map[string]*topic{}} + maxOffsets, err := db.MaxOffsets() + if err != nil { + return nil, err + } + for topicName, offset := range maxOffsets { + n.topics[topicName] = &topic{ + topicName: topicName, + nextOffset: offset + 1, + } + } + return n, nil +} + +// A Message is used internally within naffka to store messages. +// It is converted to a sarama.ConsumerMessage when exposed to the +// public APIs to maintain API compatibility with sarama. +type Message struct { + Offset int64 + Key []byte + Value []byte + Timestamp time.Time +} + +func (m *Message) consumerMessage(topic string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Topic: topic, + Offset: m.Offset, + Key: m.Key, + Value: m.Value, + Timestamp: m.Timestamp, + } +} + +// A Database is used to store naffka messages. +// Messages are stored so that new consumers can access the full message history. +type Database interface { + // StoreMessages stores a list of messages. + // Every message offset must be unique within each topic. + // Messages must be stored monotonically and contiguously for each topic. + // So for a given topic the message with offset n+1 is stored after the + // the message with offset n. + StoreMessages(topic string, messages []Message) error + // FetchMessages fetches all messages with an offset greater than but not + // including startOffset and less than but not including endOffset. + // The range of offsets requested must not overlap with those stored by a + // concurrent StoreMessages. The message offsets within the requested range + // are contigous. That is FetchMessage("foo", n, m) will only be called + // once the messages between n and m have been stored by StoreMessages. + // Every call must return at least one message. That is there must be at + // least one message between the start and offset. + FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error) + // MaxOffsets returns the maximum offset for each topic. + MaxOffsets() (map[string]int64, error) +} + +// SendMessage implements sarama.SyncProducer +func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + err = n.SendMessages([]*sarama.ProducerMessage{msg}) + return msg.Partition, msg.Offset, err +} + +// SendMessages implements sarama.SyncProducer +func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error { + byTopic := map[string][]*sarama.ProducerMessage{} + for _, msg := range msgs { + byTopic[msg.Topic] = append(byTopic[msg.Topic], msg) + } + var topicNames []string + for topicName := range byTopic { + topicNames = append(topicNames, topicName) + } + + now := time.Now() + topics := n.getTopics(topicNames) + for topicName := range byTopic { + if err := topics[topicName].send(now, byTopic[topicName]); err != nil { + return err + } + } + return nil +} + +func (n *Naffka) getTopics(topicNames []string) map[string]*topic { + n.topicsMutex.Lock() + defer n.topicsMutex.Unlock() + result := map[string]*topic{} + for _, topicName := range topicNames { + t := n.topics[topicName] + if t == nil { + // If the topic doesn't already exist then create it. + t = &topic{db: n.db, topicName: topicName} + n.topics[topicName] = t + } + result[topicName] = t + } + return result +} + +// Topics implements sarama.Consumer +func (n *Naffka) Topics() ([]string, error) { + n.topicsMutex.Lock() + defer n.topicsMutex.Unlock() + var result []string + for topic := range n.topics { + result = append(result, topic) + } + return result, nil +} + +// Partitions implements sarama.Consumer +func (n *Naffka) Partitions(topic string) ([]int32, error) { + // Naffka stores a single partition per topic, so this always returns a single partition ID. + return []int32{0}, nil +} + +// ConsumePartition implements sarama.Consumer +func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { + if partition != 0 { + return nil, fmt.Errorf("Unknown partition ID %d", partition) + } + topics := n.getTopics([]string{topic}) + return topics[topic].consume(offset), nil +} + +// HighWaterMarks implements sarama.Consumer +func (n *Naffka) HighWaterMarks() map[string]map[int32]int64 { + n.topicsMutex.Lock() + defer n.topicsMutex.Unlock() + result := map[string]map[int32]int64{} + for topicName, topic := range n.topics { + result[topicName] = map[int32]int64{ + 0: topic.highwaterMark(), + } + } + return result +} + +// Close implements sarama.SyncProducer and sarama.Consumer +func (n *Naffka) Close() error { + return nil +} + +const channelSize = 1024 + +type partitionConsumer struct { + topic *topic + messages chan *sarama.ConsumerMessage + // Whether the consumer is ready for new messages or whether it + // is catching up on historic messages. + // Reads and writes to this field are proctected by the topic mutex. + ready bool +} + +// AsyncClose implements sarama.PartitionConsumer +func (c *partitionConsumer) AsyncClose() { +} + +// Close implements sarama.PartitionConsumer +func (c *partitionConsumer) Close() error { + // TODO: Add support for performing a clean shutdown of the consumer. + return nil +} + +// Messages implements sarama.PartitionConsumer +func (c *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return c.messages +} + +// Errors implements sarama.PartitionConsumer +func (c *partitionConsumer) Errors() <-chan *sarama.ConsumerError { + // TODO: Add option to pass consumer errors to an errors channel. + return nil +} + +// HighWaterMarkOffset implements sarama.PartitionConsumer +func (c *partitionConsumer) HighWaterMarkOffset() int64 { + return c.topic.highwaterMark() +} + +// block writes the message to the consumer blocking until the consumer is ready +// to add the message to the channel. Once the message is successfully added to +// the channel it will catch up by pulling historic messsages from the database. +func (c *partitionConsumer) block(cmsg *sarama.ConsumerMessage) { + c.messages <- cmsg + c.catchup(cmsg.Offset) +} + +// catchup reads historic messages from the database until the consumer has caught +// up on all the historic messages. +func (c *partitionConsumer) catchup(fromOffset int64) { + for { + // First check if we have caught up. + caughtUp, nextOffset := c.topic.hasCaughtUp(c, fromOffset) + if caughtUp { + return + } + // Limit the number of messages we request from the database to be the + // capacity of the channel. + if nextOffset > fromOffset+int64(cap(c.messages)) { + nextOffset = fromOffset + int64(cap(c.messages)) + } + // Fetch the messages from the database. + msgs, err := c.topic.db.FetchMessages(c.topic.topicName, fromOffset, nextOffset) + if err != nil { + // TODO: Add option to write consumer errors to an errors channel + // as an alternative to logging the errors. + log.Print("Error reading messages: ", err) + // Wait before retrying. + // TODO: Maybe use an exponentional backoff scheme here. + // TODO: This timeout should take account of all the other goroutines + // that might be doing the same thing. (If there are a 10000 consumers + // then we don't want to end up retrying every millisecond) + time.Sleep(10 * time.Second) + continue + } + if len(msgs) == 0 { + // This should only happen if the database is corrupted and has lost the + // messages between the requested offsets. + log.Fatalf("Corrupt database returned no messages between %d and %d", fromOffset, nextOffset) + } + + // Pass the messages into the consumer channel. + // Blocking each write until the channel has enough space for the message. + for i := range msgs { + c.messages <- msgs[i].consumerMessage(c.topic.topicName) + } + // Update our the offset for the next loop iteration. + fromOffset = msgs[len(msgs)-1].Offset + } +} + +type topic struct { + db Database + topicName string + mutex sync.Mutex + consumers []*partitionConsumer + nextOffset int64 +} + +func (t *topic) send(now time.Time, pmsgs []*sarama.ProducerMessage) error { + var err error + // Encode the message keys and values. + msgs := make([]Message, len(pmsgs)) + for i := range msgs { + if pmsgs[i].Key != nil { + msgs[i].Key, err = pmsgs[i].Key.Encode() + if err != nil { + return err + } + } + if pmsgs[i].Value != nil { + msgs[i].Value, err = pmsgs[i].Value.Encode() + if err != nil { + return err + } + } + pmsgs[i].Timestamp = now + msgs[i].Timestamp = now + } + // Take the lock before assigning the offsets. + t.mutex.Lock() + defer t.mutex.Unlock() + offset := t.nextOffset + for i := range msgs { + pmsgs[i].Offset = offset + msgs[i].Offset = offset + offset++ + } + // Store the messages while we hold the lock. + err = t.db.StoreMessages(t.topicName, msgs) + if err != nil { + return err + } + t.nextOffset = offset + + // Now notify the consumers about the messages. + for i := range msgs { + cmsg := msgs[i].consumerMessage(t.topicName) + for _, c := range t.consumers { + if c.ready { + select { + case c.messages <- cmsg: + default: + // The consumer wasn't ready to receive a message because + // the channel buffer was full. + // Fork a goroutine to send the message so that we don't + // block sending messages to the other consumers. + c.ready = false + go c.block(cmsg) + } + } + } + } + + return nil +} + +func (t *topic) consume(offset int64) *partitionConsumer { + t.mutex.Lock() + defer t.mutex.Unlock() + c := &partitionConsumer{ + topic: t, + } + // Handle special offsets. + if offset == sarama.OffsetNewest { + offset = t.nextOffset + } + if offset == sarama.OffsetOldest { + offset = -1 + } + c.messages = make(chan *sarama.ConsumerMessage, channelSize) + t.consumers = append(t.consumers, c) + // Start catching up on historic messages in the background. + go c.catchup(offset) + return c +} + +func (t *topic) hasCaughtUp(c *partitionConsumer, offset int64) (bool, int64) { + t.mutex.Lock() + defer t.mutex.Unlock() + // Check if we have caught up while holding a lock on the topic so there + // isn't a way for our check to race with a new message being sent on the topic. + if offset+1 == t.nextOffset { + // We've caught up, the consumer can now receive messages as they are + // sent rather than fetching them from the database. + c.ready = true + return true, t.nextOffset + } + return false, t.nextOffset +} + +func (t *topic) highwaterMark() int64 { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.nextOffset +} diff --git a/vendor/src/github.com/matrix-org/naffka/naffka_test.go b/vendor/src/github.com/matrix-org/naffka/naffka_test.go new file mode 100644 index 000000000..d1a267102 --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/naffka_test.go @@ -0,0 +1,86 @@ +package naffka + +import ( + "testing" + "time" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +func TestSendAndReceive(t *testing.T) { + naffka, err := New(&MemoryDatabase{}) + if err != nil { + t.Fatal(err) + } + producer := sarama.SyncProducer(naffka) + consumer := sarama.Consumer(naffka) + const topic = "testTopic" + const value = "Hello, World" + + c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest) + if err != nil { + t.Fatal(err) + } + + message := sarama.ProducerMessage{ + Value: sarama.StringEncoder(value), + Topic: topic, + } + + if _, _, err = producer.SendMessage(&message); err != nil { + t.Fatal(err) + } + + var result *sarama.ConsumerMessage + select { + case result = <-c.Messages(): + case _ = <-time.NewTimer(10 * time.Second).C: + t.Fatal("expected to receive a message") + } + + if string(result.Value) != value { + t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value)) + } + + select { + case result = <-c.Messages(): + t.Fatal("expected to only receive one message") + default: + } +} + +func TestDelayedReceive(t *testing.T) { + naffka, err := New(&MemoryDatabase{}) + if err != nil { + t.Fatal(err) + } + producer := sarama.SyncProducer(naffka) + consumer := sarama.Consumer(naffka) + const topic = "testTopic" + const value = "Hello, World" + + message := sarama.ProducerMessage{ + Value: sarama.StringEncoder(value), + Topic: topic, + } + + if _, _, err = producer.SendMessage(&message); err != nil { + t.Fatal(err) + } + + c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest) + if err != nil { + t.Fatal(err) + } + + var result *sarama.ConsumerMessage + select { + case result = <-c.Messages(): + case _ = <-time.NewTimer(10 * time.Second).C: + t.Fatal("expected to receive a message") + } + + if string(result.Value) != value { + t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value)) + } +}