mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 19:33:09 -06:00
Integrate the typing server component, create kafka topic
This commit is contained in:
parent
cc6c1f7f7e
commit
9e75165dc9
|
|
@ -86,6 +86,7 @@ kafka:
|
||||||
topics:
|
topics:
|
||||||
output_room_event: roomserverOutput
|
output_room_event: roomserverOutput
|
||||||
output_client_data: clientapiOutput
|
output_client_data: clientapiOutput
|
||||||
|
output_typing_event: typingServerOutput
|
||||||
user_updates: userUpdates
|
user_updates: userUpdates
|
||||||
|
|
||||||
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
||||||
|
|
|
||||||
|
|
@ -18,20 +18,20 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common/keydb"
|
|
||||||
"github.com/matrix-org/dendrite/common/transactions"
|
|
||||||
"github.com/matrix-org/dendrite/typingserver"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
"github.com/matrix-org/dendrite/clientapi"
|
"github.com/matrix-org/dendrite/clientapi"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
|
"github.com/matrix-org/dendrite/common/keydb"
|
||||||
|
"github.com/matrix-org/dendrite/common/transactions"
|
||||||
"github.com/matrix-org/dendrite/federationapi"
|
"github.com/matrix-org/dendrite/federationapi"
|
||||||
"github.com/matrix-org/dendrite/federationsender"
|
"github.com/matrix-org/dendrite/federationsender"
|
||||||
"github.com/matrix-org/dendrite/mediaapi"
|
"github.com/matrix-org/dendrite/mediaapi"
|
||||||
"github.com/matrix-org/dendrite/publicroomsapi"
|
"github.com/matrix-org/dendrite/publicroomsapi"
|
||||||
"github.com/matrix-org/dendrite/roomserver"
|
"github.com/matrix-org/dendrite/roomserver"
|
||||||
"github.com/matrix-org/dendrite/syncapi"
|
"github.com/matrix-org/dendrite/syncapi"
|
||||||
|
"github.com/matrix-org/dendrite/typingserver"
|
||||||
|
"github.com/matrix-org/dendrite/typingserver/cache"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -56,7 +56,7 @@ func main() {
|
||||||
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
|
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
|
||||||
|
|
||||||
alias, input, query := roomserver.SetupRoomServerComponent(base)
|
alias, input, query := roomserver.SetupRoomServerComponent(base)
|
||||||
typingInputAPI := typingserver.SetupTypingServerComponent(base)
|
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
|
||||||
|
|
||||||
clientapi.SetupClientAPIComponent(
|
clientapi.SetupClientAPIComponent(
|
||||||
base, deviceDB, accountDB,
|
base, deviceDB, accountDB,
|
||||||
|
|
|
||||||
|
|
@ -134,6 +134,8 @@ type Dendrite struct {
|
||||||
OutputRoomEvent Topic `yaml:"output_room_event"`
|
OutputRoomEvent Topic `yaml:"output_room_event"`
|
||||||
// Topic for sending account data from client API to sync API
|
// Topic for sending account data from client API to sync API
|
||||||
OutputClientData Topic `yaml:"output_client_data"`
|
OutputClientData Topic `yaml:"output_client_data"`
|
||||||
|
// Topic for typingserver/api.OutputTypingEvent events.
|
||||||
|
OutputTypingEvent Topic `yaml:"output_typing_event"`
|
||||||
// Topic for user updates (profile, presence)
|
// Topic for user updates (profile, presence)
|
||||||
UserUpdates Topic `yaml:"user_updates"`
|
UserUpdates Topic `yaml:"user_updates"`
|
||||||
}
|
}
|
||||||
|
|
@ -527,6 +529,7 @@ func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) {
|
||||||
}
|
}
|
||||||
checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
||||||
checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
||||||
|
checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent))
|
||||||
checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
|
checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ kafka:
|
||||||
topics:
|
topics:
|
||||||
output_room_event: output.room
|
output_room_event: output.room
|
||||||
output_client_data: output.client
|
output_client_data: output.client
|
||||||
|
output_typing_event: output.typing
|
||||||
user_updates: output.user
|
user_updates: output.user
|
||||||
database:
|
database:
|
||||||
media_api: "postgresql:///media_api"
|
media_api: "postgresql:///media_api"
|
||||||
|
|
|
||||||
|
|
@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||||
// Make this configurable somehow?
|
// Make this configurable somehow?
|
||||||
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
||||||
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
||||||
|
cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output"
|
||||||
cfg.Kafka.Topics.UserUpdates = "test.user.output"
|
cfg.Kafka.Topics.UserUpdates = "test.user.output"
|
||||||
|
|
||||||
// TODO: Use different databases for the different schemas.
|
// TODO: Use different databases for the different schemas.
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,8 @@ package typingserver
|
||||||
import (
|
import (
|
||||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
"github.com/matrix-org/dendrite/typingserver/api"
|
"github.com/matrix-org/dendrite/typingserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/typingserver/cache"
|
||||||
|
"github.com/matrix-org/dendrite/typingserver/input"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetupTypingServerComponent sets up and registers HTTP handlers for the
|
// SetupTypingServerComponent sets up and registers HTTP handlers for the
|
||||||
|
|
@ -23,7 +25,13 @@ import (
|
||||||
// APIs directly instead of having to use HTTP.
|
// APIs directly instead of having to use HTTP.
|
||||||
func SetupTypingServerComponent(
|
func SetupTypingServerComponent(
|
||||||
base *basecomponent.BaseDendrite,
|
base *basecomponent.BaseDendrite,
|
||||||
|
typingCache *cache.TypingCache,
|
||||||
) api.TypingServerInputAPI {
|
) api.TypingServerInputAPI {
|
||||||
// TODO: implement typing server
|
inputAPI := &input.TypingServerInputAPI{
|
||||||
return base.CreateHTTPTypingServerAPIs()
|
Cache: typingCache,
|
||||||
|
Producer: base.KafkaProducer,
|
||||||
|
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
|
||||||
|
}
|
||||||
|
|
||||||
|
return inputAPI
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue