Don't forget current state and EDU servers

This commit is contained in:
Neil Alexander 2020-07-28 17:43:53 +01:00
parent d8eb0e1b4f
commit 85375ef96b
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 12 additions and 9 deletions

View file

@ -34,13 +34,13 @@ func AddInternalRoutes(router *mux.Router, intAPI api.CurrentStateInternalAPI) {
// NewInternalAPI returns a concrete implementation of the internal API. Callers // NewInternalAPI returns a concrete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(cfg *config.Dendrite, consumer sarama.Consumer) api.CurrentStateInternalAPI { func NewInternalAPI(cfg *config.CurrentStateServer, consumer sarama.Consumer) api.CurrentStateInternalAPI {
csDB, err := storage.NewDatabase(string(cfg.Database.CurrentState), cfg.DbProperties()) csDB, err := storage.NewDatabase(string(cfg.Database), cfg.DatabaseOptions)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to open database") logrus.WithError(err).Panicf("failed to open database")
} }
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
string(cfg.Kafka.Topics.OutputRoomEvent), consumer, csDB, string(cfg.Matrix.Kafka.Topics.OutputRoomEvent), consumer, csDB,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")

View file

@ -92,9 +92,11 @@ func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *rooms
} }
func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.SyncProducer) { func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.SyncProducer) {
cfg := &config.Dendrite{} cfg := &config.CurrentStateServer{
cfg.Kafka.Topics.OutputRoomEvent = config.Topic(kafkaTopic) Matrix: &config.Global{},
cfg.Database.CurrentState = config.DataSource("file::memory:") }
cfg.Matrix.Kafka.Topics.OutputRoomEvent = config.Topic(kafkaTopic)
cfg.Database = config.DataSource("file::memory:")
db, err := sqlutil.Open(sqlutil.SQLiteDriverName(), "file::memory:", nil) db, err := sqlutil.Open(sqlutil.SQLiteDriverName(), "file::memory:", nil)
if err != nil { if err != nil {
t.Fatalf("Failed to open naffka database: %s", err) t.Fatalf("Failed to open naffka database: %s", err)

View file

@ -39,12 +39,13 @@ func NewInternalAPI(
eduCache *cache.EDUCache, eduCache *cache.EDUCache,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
) api.EDUServerInputAPI { ) api.EDUServerInputAPI {
cfg := &base.Cfg.EDUServer
return &input.EDUServerInputAPI{ return &input.EDUServerInputAPI{
Cache: eduCache, Cache: eduCache,
UserAPI: userAPI, UserAPI: userAPI,
Producer: base.KafkaProducer, Producer: base.KafkaProducer,
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), OutputTypingEventTopic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent),
OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEvent), OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent),
ServerName: base.Cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }