Rename the typing server to EDU server (#948)

* Blunt move and sed rename

* Rename common/ refs to typing

* Rename internal stuff in eduserver

* Rename docs and scripts

* Rename constants/filenames, goimports everything to re-order imports
This commit is contained in:
Kegsay 2020-03-30 15:02:20 +01:00 committed by GitHub
parent f72b759426
commit 11a8059bba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 153 additions and 153 deletions

View file

@ -72,7 +72,7 @@ Diagram:
| | | | | | | |
| | +---+ | | | | +---+ | |
| | +----------| S | | | | | +----------| S | | |
| | | Typing +---+ | | | | | EDU +---+ | |
| |>=========================================>| Server |>=====================>| | | |>=========================================>| Server |>=====================>| |
+------------+ | | +----------+ +------------+ | | +----------+
+---+ | | +---+ | |
@ -156,7 +156,7 @@ choke-point to implement ratelimiting and backoff correctly.
* It may be impossible to implement without folding it into the Room Server * It may be impossible to implement without folding it into the Room Server
forever coupling the components together. forever coupling the components together.
## Typing Server ## EDU Server
* Reads new updates to typing from the logs written by the FS and CTS. * Reads new updates to typing from the logs written by the FS and CTS.
* Updates the current list of people typing in a room. * Updates the current list of people typing in a room.
@ -179,7 +179,7 @@ choke-point to implement ratelimiting and backoff correctly.
* Reads new events and the current state of the rooms from logs written by the Room Server. * Reads new events and the current state of the rooms from logs written by the Room Server.
* Reads new receipts positions from the logs written by the Receipts Server. * Reads new receipts positions from the logs written by the Receipts Server.
* Reads changes to presence from the logs written by the Presence Server. * Reads changes to presence from the logs written by the Presence Server.
* Reads changes to typing from the logs written by the Typing Server. * Reads changes to typing from the logs written by the EDU Server.
* Writes when a client starts and stops syncing to the logs. * Writes when a client starts and stops syncing to the logs.
## Client Search ## Client Search

View file

@ -23,9 +23,9 @@ import (
"github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/transactions" "github.com/matrix-org/dendrite/common/transactions"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -41,13 +41,13 @@ func SetupClientAPIComponent(
aliasAPI roomserverAPI.RoomserverAliasAPI, aliasAPI roomserverAPI.RoomserverAliasAPI,
inputAPI roomserverAPI.RoomserverInputAPI, inputAPI roomserverAPI.RoomserverInputAPI,
queryAPI roomserverAPI.RoomserverQueryAPI, queryAPI roomserverAPI.RoomserverQueryAPI,
typingInputAPI typingServerAPI.TypingServerInputAPI, eduInputAPI eduServerAPI.EDUServerInputAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
fedSenderAPI federationSenderAPI.FederationSenderQueryAPI, fedSenderAPI federationSenderAPI.FederationSenderQueryAPI,
) { ) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)
typingProducer := producers.NewTypingServerProducer(typingInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
userUpdateProducer := &producers.UserUpdateProducer{ userUpdateProducer := &producers.UserUpdateProducer{
Producer: base.KafkaProducer, Producer: base.KafkaProducer,
@ -69,6 +69,6 @@ func SetupClientAPIComponent(
routing.Setup( routing.Setup(
base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI, base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
syncProducer, typingProducer, transactionsCache, fedSenderAPI, syncProducer, eduProducer, transactionsCache, fedSenderAPI,
) )
} }

View file

@ -16,24 +16,24 @@ import (
"context" "context"
"time" "time"
"github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
// TypingServerProducer produces events for the typing server to consume // EDUServerProducer produces events for the typing server to consume
type TypingServerProducer struct { type EDUServerProducer struct {
InputAPI api.TypingServerInputAPI InputAPI api.EDUServerInputAPI
} }
// NewTypingServerProducer creates a new TypingServerProducer // NewEDUServerProducer creates a new EDUServerProducer
func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer { func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
return &TypingServerProducer{ return &EDUServerProducer{
InputAPI: inputAPI, InputAPI: inputAPI,
} }
} }
// Send typing event to typing server // SendTyping sends a typing event to EDU server
func (p *TypingServerProducer) Send( func (p *EDUServerProducer) SendTyping(
ctx context.Context, userID, roomID string, ctx context.Context, userID, roomID string,
typing bool, timeout int64, typing bool, timeout int64,
) error { ) error {

View file

@ -58,7 +58,7 @@ func Setup(
keyRing gomatrixserverlib.KeyRing, keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer, userUpdateProducer *producers.UserUpdateProducer,
syncProducer *producers.SyncAPIProducer, syncProducer *producers.SyncAPIProducer,
typingProducer *producers.TypingServerProducer, eduProducer *producers.EDUServerProducer,
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderQueryAPI, federationSender federationSenderAPI.FederationSenderQueryAPI,
) { ) {
@ -235,7 +235,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, typingProducer) return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)

View file

@ -35,7 +35,7 @@ type typingContentJSON struct {
func SendTyping( func SendTyping(
req *http.Request, device *authtypes.Device, roomID string, req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB accounts.Database, userID string, accountDB accounts.Database,
typingProducer *producers.TypingServerProducer, eduProducer *producers.EDUServerProducer,
) util.JSONResponse { ) util.JSONResponse {
if device.UserID != userID { if device.UserID != userID {
return util.JSONResponse{ return util.JSONResponse{
@ -69,10 +69,10 @@ func SendTyping(
return *resErr return *resErr
} }
if err = typingProducer.Send( if err = eduProducer.SendTyping(
req.Context(), userID, roomID, r.Typing, r.Timeout, req.Context(), userID, roomID, r.Typing, r.Timeout,
); err != nil { ); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("typingProducer.Send failed") util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -19,8 +19,8 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions" "github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/typingserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
) )
func main() { func main() {
@ -38,11 +38,11 @@ func main() {
asQuery := base.CreateHTTPAppServiceAPIs() asQuery := base.CreateHTTPAppServiceAPIs()
alias, input, query := base.CreateHTTPRoomserverAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs()
fedSenderAPI := base.CreateHTTPFederationSenderAPIs() fedSenderAPI := base.CreateHTTPFederationSenderAPIs()
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing, base, deviceDB, accountDB, federation, &keyRing,
alias, input, query, typingInputAPI, asQuery, transactions.New(), fedSenderAPI, alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))

View file

@ -16,22 +16,22 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func main() { func main() {
cfg := basecomponent.ParseFlags() cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "TypingServerAPI") base := basecomponent.NewBaseDendrite(cfg, "EDUServerAPI")
defer func() { defer func() {
if err := base.Close(); err != nil { if err := base.Close(); err != nil {
logrus.WithError(err).Warn("BaseDendrite close failed") logrus.WithError(err).Warn("BaseDendrite close failed")
} }
}() }()
typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) eduserver.SetupEDUServerComponent(base, cache.New())
base.SetupAndServeHTTP(string(base.Cfg.Bind.TypingServer), string(base.Cfg.Listen.TypingServer)) base.SetupAndServeHTTP(string(base.Cfg.Bind.EDUServer), string(base.Cfg.Listen.EDUServer))
} }

View file

@ -24,14 +24,14 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions" "github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -56,7 +56,7 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB) keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := roomserver.SetupRoomServerComponent(base) alias, input, query := roomserver.SetupRoomServerComponent(base)
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
@ -65,7 +65,7 @@ func main() {
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,
federation, &keyRing, alias, input, query, federation, &keyRing, alias, input, query,
typingInputAPI, asQuery, transactions.New(), fedSenderAPI, eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
) )
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
mediaapi.SetupMediaAPIComponent(base, deviceDB) mediaapi.SetupMediaAPIComponent(base, deviceDB)

View file

@ -27,14 +27,14 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/transactions" "github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p" "github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -122,7 +122,7 @@ func main() {
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node)
alias, input, query := roomserver.SetupRoomServerComponent(base) alias, input, query := roomserver.SetupRoomServerComponent(base)
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
@ -131,7 +131,7 @@ func main() {
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,
federation, &keyRing, alias, input, query, federation, &keyRing, alias, input, query,
typingInputAPI, asQuery, transactions.New(), fedSenderAPI, eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
) )
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
mediaapi.SetupMediaAPIComponent(base, deviceDB) mediaapi.SetupMediaAPIComponent(base, deviceDB)

View file

@ -35,9 +35,9 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -111,10 +111,10 @@ func (b *BaseDendrite) CreateHTTPRoomserverAPIs() (
return alias, input, query return alias, input, query
} }
// CreateHTTPTypingServerAPIs returns typingInputAPI for hitting the typing // CreateHTTPEDUServerAPIs returns eduInputAPI for hitting the EDU
// server over HTTP // server over HTTP
func (b *BaseDendrite) CreateHTTPTypingServerAPIs() typingServerAPI.TypingServerInputAPI { func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI {
return typingServerAPI.NewTypingServerInputAPIHTTP(b.Cfg.TypingServerURL(), nil) return eduServerAPI.NewEDUServerInputAPIHTTP(b.Cfg.EDUServerURL(), nil)
} }
// CreateHTTPFederationSenderAPIs returns FederationSenderQueryAPI for hitting // CreateHTTPFederationSenderAPIs returns FederationSenderQueryAPI for hitting

View file

@ -134,7 +134,7 @@ type Dendrite struct {
OutputRoomEvent Topic `yaml:"output_room_event"` OutputRoomEvent Topic `yaml:"output_room_event"`
// Topic for sending account data from client API to sync API // Topic for sending account data from client API to sync API
OutputClientData Topic `yaml:"output_client_data"` OutputClientData Topic `yaml:"output_client_data"`
// Topic for typingserver/api.OutputTypingEvent events. // Topic for eduserver/api.OutputTypingEvent events.
OutputTypingEvent Topic `yaml:"output_typing_event"` OutputTypingEvent Topic `yaml:"output_typing_event"`
// Topic for user updates (profile, presence) // Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"` UserUpdates Topic `yaml:"user_updates"`
@ -206,7 +206,7 @@ type Dendrite struct {
RoomServer Address `yaml:"room_server"` RoomServer Address `yaml:"room_server"`
FederationSender Address `yaml:"federation_sender"` FederationSender Address `yaml:"federation_sender"`
PublicRoomsAPI Address `yaml:"public_rooms_api"` PublicRoomsAPI Address `yaml:"public_rooms_api"`
TypingServer Address `yaml:"typing_server"` EDUServer Address `yaml:"edu_server"`
} `yaml:"bind"` } `yaml:"bind"`
// The addresses for talking to other microservices. // The addresses for talking to other microservices.
@ -219,7 +219,7 @@ type Dendrite struct {
RoomServer Address `yaml:"room_server"` RoomServer Address `yaml:"room_server"`
FederationSender Address `yaml:"federation_sender"` FederationSender Address `yaml:"federation_sender"`
PublicRoomsAPI Address `yaml:"public_rooms_api"` PublicRoomsAPI Address `yaml:"public_rooms_api"`
TypingServer Address `yaml:"typing_server"` EDUServer Address `yaml:"edu_server"`
} `yaml:"listen"` } `yaml:"listen"`
// The config for tracing the dendrite servers. // The config for tracing the dendrite servers.
@ -571,7 +571,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI)) checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI))
checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI)) checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI))
checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer)) checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer))
checkNotEmpty(configErrs, "listen.typing_server", string(config.Listen.TypingServer)) checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer))
} }
// checkLogging verifies the parameters logging.* are valid. // checkLogging verifies the parameters logging.* are valid.
@ -669,7 +669,7 @@ func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint {
// AppServiceURL returns a HTTP URL for where the appservice component is listening. // AppServiceURL returns a HTTP URL for where the appservice component is listening.
func (config *Dendrite) AppServiceURL() string { func (config *Dendrite) AppServiceURL() string {
// Hard code the roomserver to talk HTTP for now. // Hard code the appservice server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation. // If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public // People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API. // internet for an internal API.
@ -685,18 +685,18 @@ func (config *Dendrite) RoomServerURL() string {
return "http://" + string(config.Listen.RoomServer) return "http://" + string(config.Listen.RoomServer)
} }
// TypingServerURL returns an HTTP URL for where the typing server is listening. // EDUServerURL returns an HTTP URL for where the EDU server is listening.
func (config *Dendrite) TypingServerURL() string { func (config *Dendrite) EDUServerURL() string {
// Hard code the typing server to talk HTTP for now. // Hard code the EDU server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation. // If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public // People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API. // internet for an internal API.
return "http://" + string(config.Listen.TypingServer) return "http://" + string(config.Listen.EDUServer)
} }
// FederationSenderURL returns an HTTP URL for where the federation sender is listening. // FederationSenderURL returns an HTTP URL for where the federation sender is listening.
func (config *Dendrite) FederationSenderURL() string { func (config *Dendrite) FederationSenderURL() string {
// Hard code the typing server to talk HTTP for now. // Hard code the federation sender server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation. // If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public // People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API. // internet for an internal API.

View file

@ -62,7 +62,7 @@ listen:
sync_api: "localhost:7773" sync_api: "localhost:7773"
media_api: "localhost:7774" media_api: "localhost:7774"
appservice_api: "localhost:7777" appservice_api: "localhost:7777"
typing_server: "localhost:7778" edu_server: "localhost:7778"
logging: logging:
- type: "file" - type: "file"
level: "info" level: "info"

View file

@ -106,7 +106,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.Listen.RoomServer = assignAddress() cfg.Listen.RoomServer = assignAddress()
cfg.Listen.SyncAPI = assignAddress() cfg.Listen.SyncAPI = assignAddress()
cfg.Listen.PublicRoomsAPI = assignAddress() cfg.Listen.PublicRoomsAPI = assignAddress()
cfg.Listen.TypingServer = assignAddress() cfg.Listen.EDUServer = assignAddress()
// Bind to the same address as the listen address // Bind to the same address as the listen address
// All microservices are run on the same host in testing // All microservices are run on the same host in testing
@ -117,7 +117,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.Bind.RoomServer = cfg.Listen.RoomServer cfg.Bind.RoomServer = cfg.Listen.RoomServer
cfg.Bind.SyncAPI = cfg.Listen.SyncAPI cfg.Bind.SyncAPI = cfg.Listen.SyncAPI
cfg.Bind.PublicRoomsAPI = cfg.Listen.PublicRoomsAPI cfg.Bind.PublicRoomsAPI = cfg.Listen.PublicRoomsAPI
cfg.Bind.TypingServer = cfg.Listen.TypingServer cfg.Bind.EDUServer = cfg.Listen.EDUServer
return &cfg, port, nil return &cfg, port, nil
} }

View file

@ -85,7 +85,7 @@ kafka:
topics: topics:
output_room_event: roomserverOutput output_room_event: roomserverOutput
output_client_data: clientapiOutput output_client_data: clientapiOutput
output_typing_event: typingServerOutput output_typing_event: eduServerOutput
user_updates: userUpdates user_updates: userUpdates
# The postgres connection configs for connecting to the databases e.g a postgres:// URI # The postgres connection configs for connecting to the databases e.g a postgres:// URI
@ -114,7 +114,7 @@ listen:
public_rooms_api: "localhost:7775" public_rooms_api: "localhost:7775"
federation_sender: "localhost:7776" federation_sender: "localhost:7776"
appservice_api: "localhost:7777" appservice_api: "localhost:7777"
typing_server: "localhost:7778" edu_server: "localhost:7778"
# The configuration for tracing the dendrite components. # The configuration for tracing the dendrite components.
tracing: tracing:

View file

@ -58,7 +58,7 @@ docker-compose up kafka zookeeper postgres
and the following dendrite components and the following dendrite components
``` ```
docker-compose up client_api media_api sync_api room_server public_rooms_api typing_server docker-compose up client_api media_api sync_api room_server public_rooms_api edu_server
docker-compose up client_api_proxy docker-compose up client_api_proxy
``` ```

View file

@ -85,7 +85,7 @@ kafka:
topics: topics:
output_room_event: roomserverOutput output_room_event: roomserverOutput
output_client_data: clientapiOutput output_client_data: clientapiOutput
output_typing_event: typingServerOutput output_typing_event: eduServerOutput
user_updates: userUpdates user_updates: userUpdates
@ -114,7 +114,7 @@ listen:
media_api: "media_api:7774" media_api: "media_api:7774"
public_rooms_api: "public_rooms_api:7775" public_rooms_api: "public_rooms_api:7775"
federation_sender: "federation_sender:7776" federation_sender: "federation_sender:7776"
typing_server: "typing_server:7777" edu_server: "typing_server:7777"
# The configuration for tracing the dendrite components. # The configuration for tracing the dendrite components.
tracing: tracing:

View file

@ -103,10 +103,10 @@ services:
networks: networks:
- internal - internal
typing_server: edu_server:
container_name: dendrite_typing_server container_name: dendrite_edu_server
hostname: typing_server hostname: edu_server
entrypoint: ["bash", "./docker/services/typing-server.sh"] entrypoint: ["bash", "./docker/services/edu-server.sh"]
build: ./ build: ./
volumes: volumes:
- ..:/build - ..:/build

View file

@ -0,0 +1,5 @@
#!/bin/bash
bash ./docker/build.sh
./bin/dendrite-edu-server --config=dendrite.yaml

View file

@ -1,5 +0,0 @@
#!/bin/bash
bash ./docker/build.sh
./bin/dendrite-typing-server --config=dendrite.yaml

View file

@ -36,7 +36,7 @@ type InputTypingEvent struct {
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
} }
// InputTypingEventRequest is a request to TypingServerInputAPI // InputTypingEventRequest is a request to EDUServerInputAPI
type InputTypingEventRequest struct { type InputTypingEventRequest struct {
InputTypingEvent InputTypingEvent `json:"input_typing_event"` InputTypingEvent InputTypingEvent `json:"input_typing_event"`
} }
@ -44,8 +44,8 @@ type InputTypingEventRequest struct {
// InputTypingEventResponse is a response to InputTypingEvents // InputTypingEventResponse is a response to InputTypingEvents
type InputTypingEventResponse struct{} type InputTypingEventResponse struct{}
// TypingServerInputAPI is used to write events to the typing server. // EDUServerInputAPI is used to write events to the typing server.
type TypingServerInputAPI interface { type EDUServerInputAPI interface {
InputTypingEvent( InputTypingEvent(
ctx context.Context, ctx context.Context,
request *InputTypingEventRequest, request *InputTypingEventRequest,
@ -53,24 +53,24 @@ type TypingServerInputAPI interface {
) error ) error
} }
// TypingServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. // EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API.
const TypingServerInputTypingEventPath = "/api/typingserver/input" const EDUServerInputTypingEventPath = "/api/eduserver/input"
// NewTypingServerInputAPIHTTP creates a TypingServerInputAPI implemented by talking to a HTTP POST API. // NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
func NewTypingServerInputAPIHTTP(typingServerURL string, httpClient *http.Client) TypingServerInputAPI { func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) EDUServerInputAPI {
if httpClient == nil { if httpClient == nil {
httpClient = http.DefaultClient httpClient = http.DefaultClient
} }
return &httpTypingServerInputAPI{typingServerURL, httpClient} return &httpEDUServerInputAPI{eduServerURL, httpClient}
} }
type httpTypingServerInputAPI struct { type httpEDUServerInputAPI struct {
typingServerURL string eduServerURL string
httpClient *http.Client httpClient *http.Client
} }
// InputRoomEvents implements TypingServerInputAPI // InputRoomEvents implements EDUServerInputAPI
func (h *httpTypingServerInputAPI) InputTypingEvent( func (h *httpEDUServerInputAPI) InputTypingEvent(
ctx context.Context, ctx context.Context,
request *InputTypingEventRequest, request *InputTypingEventRequest,
response *InputTypingEventResponse, response *InputTypingEventResponse,
@ -78,6 +78,6 @@ func (h *httpTypingServerInputAPI) InputTypingEvent(
span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent") span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent")
defer span.Finish() defer span.Finish()
apiURL := h.typingServerURL + TypingServerInputTypingEventPath apiURL := h.eduServerURL + EDUServerInputTypingEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }

View file

@ -32,8 +32,8 @@ type roomData struct {
userSet userSet userSet userSet
} }
// TypingCache maintains a list of users typing in each room. // EDUCache maintains a list of users typing in each room.
type TypingCache struct { type EDUCache struct {
sync.RWMutex sync.RWMutex
latestSyncPosition int64 latestSyncPosition int64
data map[string]*roomData data map[string]*roomData
@ -42,26 +42,26 @@ type TypingCache struct {
// Create a roomData with its sync position set to the latest sync position. // Create a roomData with its sync position set to the latest sync position.
// Must only be called after locking the cache. // Must only be called after locking the cache.
func (t *TypingCache) newRoomData() *roomData { func (t *EDUCache) newRoomData() *roomData {
return &roomData{ return &roomData{
syncPosition: t.latestSyncPosition, syncPosition: t.latestSyncPosition,
userSet: make(userSet), userSet: make(userSet),
} }
} }
// NewTypingCache returns a new TypingCache initialised for use. // New returns a new EDUCache initialised for use.
func NewTypingCache() *TypingCache { func New() *EDUCache {
return &TypingCache{data: make(map[string]*roomData)} return &EDUCache{data: make(map[string]*roomData)}
} }
// SetTimeoutCallback sets a callback function that is called right after // SetTimeoutCallback sets a callback function that is called right after
// a user is removed from the typing user list due to timeout. // a user is removed from the typing user list due to timeout.
func (t *TypingCache) SetTimeoutCallback(fn TimeoutCallbackFn) { func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) {
t.timeoutCallback = fn t.timeoutCallback = fn
} }
// GetTypingUsers returns the list of users typing in a room. // GetTypingUsers returns the list of users typing in a room.
func (t *TypingCache) GetTypingUsers(roomID string) []string { func (t *EDUCache) GetTypingUsers(roomID string) []string {
users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0) users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0)
// 0 should work above because the first position used will be 1. // 0 should work above because the first position used will be 1.
return users return users
@ -70,7 +70,7 @@ func (t *TypingCache) GetTypingUsers(roomID string) []string {
// GetTypingUsersIfUpdatedAfter returns all users typing in this room with // GetTypingUsersIfUpdatedAfter returns all users typing in this room with
// updated == true if the typing sync position of the room is after the given // updated == true if the typing sync position of the room is after the given
// position. Otherwise, returns an empty slice with updated == false. // position. Otherwise, returns an empty slice with updated == false.
func (t *TypingCache) GetTypingUsersIfUpdatedAfter( func (t *EDUCache) GetTypingUsersIfUpdatedAfter(
roomID string, position int64, roomID string, position int64,
) (users []string, updated bool) { ) (users []string, updated bool) {
t.RLock() t.RLock()
@ -93,7 +93,7 @@ func (t *TypingCache) GetTypingUsersIfUpdatedAfter(
// expire is the time when the user typing should time out. // expire is the time when the user typing should time out.
// if expire is nil, defaultTypingTimeout is assumed. // if expire is nil, defaultTypingTimeout is assumed.
// Returns the latest sync position for typing after update. // Returns the latest sync position for typing after update.
func (t *TypingCache) AddTypingUser( func (t *EDUCache) AddTypingUser(
userID, roomID string, expire *time.Time, userID, roomID string, expire *time.Time,
) int64 { ) int64 {
expireTime := getExpireTime(expire) expireTime := getExpireTime(expire)
@ -111,7 +111,7 @@ func (t *TypingCache) AddTypingUser(
// addUser with mutex lock & replace the previous timer. // addUser with mutex lock & replace the previous timer.
// Returns the latest typing sync position after update. // Returns the latest typing sync position after update.
func (t *TypingCache) addUser( func (t *EDUCache) addUser(
userID, roomID string, expiryTimer *time.Timer, userID, roomID string, expiryTimer *time.Timer,
) int64 { ) int64 {
t.Lock() t.Lock()
@ -143,7 +143,7 @@ func (t *TypingCache) addUser(
// RemoveUser with mutex lock & stop the timer. // RemoveUser with mutex lock & stop the timer.
// Returns the latest sync position for typing after update. // Returns the latest sync position for typing after update.
func (t *TypingCache) RemoveUser(userID, roomID string) int64 { func (t *EDUCache) RemoveUser(userID, roomID string) int64 {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -166,7 +166,7 @@ func (t *TypingCache) RemoveUser(userID, roomID string) int64 {
return t.latestSyncPosition return t.latestSyncPosition
} }
func (t *TypingCache) GetLatestSyncPosition() int64 { func (t *EDUCache) GetLatestSyncPosition() int64 {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
return t.latestSyncPosition return t.latestSyncPosition

View file

@ -19,10 +19,10 @@ import (
"github.com/matrix-org/dendrite/common/test" "github.com/matrix-org/dendrite/common/test"
) )
func TestTypingCache(t *testing.T) { func TestEDUCache(t *testing.T) {
tCache := NewTypingCache() tCache := New()
if tCache == nil { if tCache == nil {
t.Fatal("NewTypingCache failed") t.Fatal("New failed")
} }
t.Run("AddTypingUser", func(t *testing.T) { t.Run("AddTypingUser", func(t *testing.T) {
@ -38,7 +38,7 @@ func TestTypingCache(t *testing.T) {
}) })
} }
func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam
present := time.Now() present := time.Now()
tests := []struct { tests := []struct {
userID string userID string
@ -58,7 +58,7 @@ func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam
} }
} }
func testGetTypingUsers(t *testing.T, tCache *TypingCache) { func testGetTypingUsers(t *testing.T, tCache *EDUCache) {
tests := []struct { tests := []struct {
roomID string roomID string
wantUsers []string wantUsers []string
@ -75,7 +75,7 @@ func testGetTypingUsers(t *testing.T, tCache *TypingCache) {
} }
} }
func testRemoveUser(t *testing.T, tCache *TypingCache) { func testRemoveUser(t *testing.T, tCache *EDUCache) {
tests := []struct { tests := []struct {
roomID string roomID string
userIDs []string userIDs []string

View file

@ -10,27 +10,27 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package typingserver package eduserver
import ( import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/typingserver/input" "github.com/matrix-org/dendrite/eduserver/input"
) )
// SetupTypingServerComponent sets up and registers HTTP handlers for the // SetupEDUServerComponent sets up and registers HTTP handlers for the
// TypingServer component. Returns instances of the various roomserver APIs, // EDUServer component. Returns instances of the various roomserver APIs,
// allowing other components running in the same process to hit the query the // allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP. // APIs directly instead of having to use HTTP.
func SetupTypingServerComponent( func SetupEDUServerComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite,
typingCache *cache.TypingCache, eduCache *cache.EDUCache,
) api.TypingServerInputAPI { ) api.EDUServerInputAPI {
inputAPI := &input.TypingServerInputAPI{ inputAPI := &input.EDUServerInputAPI{
Cache: typingCache, Cache: eduCache,
Producer: base.KafkaProducer, Producer: base.KafkaProducer,
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
} }

View file

@ -19,25 +19,25 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"gopkg.in/Shopify/sarama.v1" "gopkg.in/Shopify/sarama.v1"
) )
// TypingServerInputAPI implements api.TypingServerInputAPI // EDUServerInputAPI implements api.EDUServerInputAPI
type TypingServerInputAPI struct { type EDUServerInputAPI struct {
// Cache to store the current typing members in each room. // Cache to store the current typing members in each room.
Cache *cache.TypingCache Cache *cache.EDUCache
// The kafka topic to output new typing events to. // The kafka topic to output new typing events to.
OutputTypingEventTopic string OutputTypingEventTopic string
// kafka producer // kafka producer
Producer sarama.SyncProducer Producer sarama.SyncProducer
} }
// InputTypingEvent implements api.TypingServerInputAPI // InputTypingEvent implements api.EDUServerInputAPI
func (t *TypingServerInputAPI) InputTypingEvent( func (t *EDUServerInputAPI) InputTypingEvent(
ctx context.Context, ctx context.Context,
request *api.InputTypingEventRequest, request *api.InputTypingEventRequest,
response *api.InputTypingEventResponse, response *api.InputTypingEventResponse,
@ -56,7 +56,7 @@ func (t *TypingServerInputAPI) InputTypingEvent(
return t.sendEvent(ite) return t.sendEvent(ite)
} }
func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{ ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping, Type: gomatrixserverlib.MTyping,
RoomID: ite.RoomID, RoomID: ite.RoomID,
@ -89,9 +89,9 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
return err return err
} }
// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux. // SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux.
func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) { func (t *EDUServerInputAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(api.TypingServerInputTypingEventPath, servMux.Handle(api.EDUServerInputTypingEventPath,
common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse { common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
var request api.InputTypingEventRequest var request api.InputTypingEventRequest
var response api.InputTypingEventResponse var response api.InputTypingEventResponse

View file

@ -18,15 +18,15 @@ import (
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gopkg.in/Shopify/sarama.v1" "gopkg.in/Shopify/sarama.v1"
) )
// OutputTypingEventConsumer consumes events that originate in typing server. // OutputTypingEventConsumer consumes events that originate in EDU server.
type OutputTypingEventConsumer struct { type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer consumer *common.ContinualConsumer
db storage.Database db storage.Database
@ -34,7 +34,7 @@ type OutputTypingEventConsumer struct {
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
} }
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers. // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputTypingEventConsumer( func NewOutputTypingEventConsumer(
cfg *config.Dendrite, cfg *config.Dendrite,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
@ -57,19 +57,19 @@ func NewOutputTypingEventConsumer(
return c return c
} }
// Start consuming from typing servers // Start consuming from EDU servers
func (t *OutputTypingEventConsumer) Start() error { func (t *OutputTypingEventConsumer) Start() error {
return t.consumer.Start() return t.consumer.Start()
} }
// onMessage is called for OutputTypingEvent received from the typing servers. // onMessage is called for OutputTypingEvent received from the EDU servers.
// Parses the msg, creates a matrix federation EDU and sends it to joined hosts. // Parses the msg, creates a matrix federation EDU and sends it to joined hosts.
func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Extract the typing event from msg. // Extract the typing event from msg.
var ote api.OutputTypingEvent var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil { if err := json.Unmarshal(msg.Value, &ote); err != nil {
// Skip this msg but continue processing messages. // Skip this msg but continue processing messages.
log.WithError(err).Errorf("typingserver output log: message parse failed") log.WithError(err).Errorf("eduserver output log: message parse failed")
return nil return nil
} }

View file

@ -19,15 +19,15 @@ import (
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/api"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
// OutputTypingEventConsumer consumes events that originated in the typing server. // OutputTypingEventConsumer consumes events that originated in the EDU server.
type OutputTypingEventConsumer struct { type OutputTypingEventConsumer struct {
typingConsumer *common.ContinualConsumer typingConsumer *common.ContinualConsumer
db storage.Database db storage.Database
@ -35,7 +35,7 @@ type OutputTypingEventConsumer struct {
} }
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the typing server. // Call Start() to begin consuming from the EDU server.
func NewOutputTypingEventConsumer( func NewOutputTypingEventConsumer(
cfg *config.Dendrite, cfg *config.Dendrite,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
@ -60,7 +60,7 @@ func NewOutputTypingEventConsumer(
return s return s
} }
// Start consuming from typing api // Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error { func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent( s.notifier.OnNewEvent(
@ -78,7 +78,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
var output api.OutputTypingEvent var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("typing server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
return nil return nil
} }
@ -86,7 +86,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.Event.RoomID, "room_id": output.Event.RoomID,
"user_id": output.Event.UserID, "user_id": output.Event.UserID,
"typing": output.Event.Typing, "typing": output.Event.Typing,
}).Debug("received data from typing server") }).Debug("received data from EDU server")
var typingPos types.StreamPosition var typingPos types.StreamPosition
typingEvent := output.Event typingEvent := output.Event

View file

@ -20,9 +20,9 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )

View file

@ -30,8 +30,8 @@ import (
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -53,7 +53,7 @@ type SyncServerDatasource struct {
events outputRoomEventsStatements events outputRoomEventsStatements
roomstate currentRoomStateStatements roomstate currentRoomStateStatements
invites inviteEventsStatements invites inviteEventsStatements
typingCache *cache.TypingCache eduCache *cache.EDUCache
topology outputRoomEventsTopologyStatements topology outputRoomEventsTopologyStatements
backwardExtremities backwardExtremitiesStatements backwardExtremities backwardExtremitiesStatements
} }
@ -86,7 +86,7 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er
if err := d.backwardExtremities.prepare(d.db); err != nil { if err := d.backwardExtremities.prepare(d.db); err != nil {
return nil, err return nil, err
} }
d.typingCache = cache.NewTypingCache() d.eduCache = cache.New()
return &d, nil return &d, nil
} }
@ -395,7 +395,7 @@ func (d *SyncServerDatasource) syncPositionTx(
maxEventID = maxInviteID maxEventID = maxInviteID
} }
sp.PDUPosition = types.StreamPosition(maxEventID) sp.PDUPosition = types.StreamPosition(maxEventID)
sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition()) sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition())
return return
} }
@ -468,7 +468,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
var ok bool var ok bool
var err error var err error
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter( if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUTypingPosition), roomID, int64(since.EDUTypingPosition),
); updated { ); updated {
ev := gomatrixserverlib.ClientEvent{ ev := gomatrixserverlib.ClientEvent{
@ -719,7 +719,7 @@ func (d *SyncServerDatasource) RetireInviteEvent(
} }
func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.typingCache.SetTimeoutCallback(fn) d.eduCache.SetTimeoutCallback(fn)
} }
// AddTypingUser adds a typing user to the typing cache. // AddTypingUser adds a typing user to the typing cache.
@ -727,7 +727,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback
func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) AddTypingUser(
userID, roomID string, expireTime *time.Time, userID, roomID string, expireTime *time.Time,
) types.StreamPosition { ) types.StreamPosition {
return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime)) return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime))
} }
// RemoveTypingUser removes a typing user from the typing cache. // RemoveTypingUser removes a typing user from the typing cache.
@ -735,7 +735,7 @@ func (d *SyncServerDatasource) AddTypingUser(
func (d *SyncServerDatasource) RemoveTypingUser( func (d *SyncServerDatasource) RemoveTypingUser(
userID, roomID string, userID, roomID string,
) types.StreamPosition { ) types.StreamPosition {
return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID)) return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID))
} }
func (d *SyncServerDatasource) addInvitesToResponse( func (d *SyncServerDatasource) addInvitesToResponse(

View file

@ -33,8 +33,8 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -57,7 +57,7 @@ type SyncServerDatasource struct {
events outputRoomEventsStatements events outputRoomEventsStatements
roomstate currentRoomStateStatements roomstate currentRoomStateStatements
invites inviteEventsStatements invites inviteEventsStatements
typingCache *cache.TypingCache eduCache *cache.EDUCache
topology outputRoomEventsTopologyStatements topology outputRoomEventsTopologyStatements
backwardExtremities backwardExtremitiesStatements backwardExtremities backwardExtremitiesStatements
} }
@ -84,7 +84,7 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro
if err = d.prepare(); err != nil { if err = d.prepare(); err != nil {
return nil, err return nil, err
} }
d.typingCache = cache.NewTypingCache() d.eduCache = cache.New()
return &d, nil return &d, nil
} }
@ -429,7 +429,7 @@ func (d *SyncServerDatasource) syncPositionTx(
maxEventID = maxInviteID maxEventID = maxInviteID
} }
sp.PDUPosition = types.StreamPosition(maxEventID) sp.PDUPosition = types.StreamPosition(maxEventID)
sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition()) sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition())
return return
} }
@ -502,7 +502,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
var ok bool var ok bool
var err error var err error
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter( if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUTypingPosition), roomID, int64(since.EDUTypingPosition),
); updated { ); updated {
ev := gomatrixserverlib.ClientEvent{ ev := gomatrixserverlib.ClientEvent{
@ -766,7 +766,7 @@ func (d *SyncServerDatasource) RetireInviteEvent(
} }
func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.typingCache.SetTimeoutCallback(fn) d.eduCache.SetTimeoutCallback(fn)
} }
// AddTypingUser adds a typing user to the typing cache. // AddTypingUser adds a typing user to the typing cache.
@ -774,7 +774,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback
func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) AddTypingUser(
userID, roomID string, expireTime *time.Time, userID, roomID string, expireTime *time.Time,
) types.StreamPosition { ) types.StreamPosition {
return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime)) return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime))
} }
// RemoveTypingUser removes a typing user from the typing cache. // RemoveTypingUser removes a typing user from the typing cache.
@ -782,7 +782,7 @@ func (d *SyncServerDatasource) AddTypingUser(
func (d *SyncServerDatasource) RemoveTypingUser( func (d *SyncServerDatasource) RemoveTypingUser(
userID, roomID string, userID, roomID string,
) types.StreamPosition { ) types.StreamPosition {
return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID)) return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID))
} }
func (d *SyncServerDatasource) addInvitesToResponse( func (d *SyncServerDatasource) addInvitesToResponse(