diff --git a/WIRING.md b/WIRING.md index bddb1614c..8ec5b0432 100644 --- a/WIRING.md +++ b/WIRING.md @@ -72,7 +72,7 @@ Diagram: | | | | | | +---+ | | | | +----------| S | | | - | | | Typing +---+ | | + | | | EDU +---+ | | | |>=========================================>| Server |>=====================>| | +------------+ | | +----------+ +---+ | | @@ -156,7 +156,7 @@ choke-point to implement ratelimiting and backoff correctly. * It may be impossible to implement without folding it into the Room Server forever coupling the components together. -## Typing Server +## EDU Server * Reads new updates to typing from the logs written by the FS and CTS. * Updates the current list of people typing in a room. @@ -179,7 +179,7 @@ choke-point to implement ratelimiting and backoff correctly. * Reads new events and the current state of the rooms from logs written by the Room Server. * Reads new receipts positions from the logs written by the Receipts Server. * Reads changes to presence from the logs written by the Presence Server. - * Reads changes to typing from the logs written by the Typing Server. + * Reads changes to typing from the logs written by the EDU Server. * Writes when a client starts and stops syncing to the logs. ## Client Search diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index e608b69f3..1339f7c8c 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -23,9 +23,9 @@ import ( "github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/transactions" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - typingServerAPI "github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -41,13 +41,13 @@ func SetupClientAPIComponent( aliasAPI roomserverAPI.RoomserverAliasAPI, inputAPI roomserverAPI.RoomserverInputAPI, queryAPI roomserverAPI.RoomserverQueryAPI, - typingInputAPI typingServerAPI.TypingServerInputAPI, + eduInputAPI eduServerAPI.EDUServerInputAPI, asAPI appserviceAPI.AppServiceQueryAPI, transactionsCache *transactions.Cache, fedSenderAPI federationSenderAPI.FederationSenderQueryAPI, ) { roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) - typingProducer := producers.NewTypingServerProducer(typingInputAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) userUpdateProducer := &producers.UserUpdateProducer{ Producer: base.KafkaProducer, @@ -69,6 +69,6 @@ func SetupClientAPIComponent( routing.Setup( base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI, accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, - syncProducer, typingProducer, transactionsCache, fedSenderAPI, + syncProducer, eduProducer, transactionsCache, fedSenderAPI, ) } diff --git a/clientapi/producers/typingserver.go b/clientapi/producers/eduserver.go similarity index 68% rename from clientapi/producers/typingserver.go rename to clientapi/producers/eduserver.go index f4d0bcba7..30c40fb7f 100644 --- a/clientapi/producers/typingserver.go +++ b/clientapi/producers/eduserver.go @@ -16,32 +16,32 @@ import ( "context" "time" - "github.com/matrix-org/dendrite/typingserver/api" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/gomatrixserverlib" ) -// TypingServerProducer produces events for the typing server to consume -type TypingServerProducer struct { - InputAPI api.TypingServerInputAPI +// EDUServerProducer produces events for the EDU server to consume +type EDUServerProducer struct { + InputAPI api.EDUServerInputAPI } -// NewTypingServerProducer creates a new TypingServerProducer -func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer { - return &TypingServerProducer{ +// NewEDUServerProducer creates a new EDUServerProducer +func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer { + return &EDUServerProducer{ InputAPI: inputAPI, } } -// Send typing event to typing server -func (p *TypingServerProducer) Send( +// SendTyping sends a typing event to EDU server +func (p *EDUServerProducer) SendTyping( ctx context.Context, userID, roomID string, - typing bool, timeout int64, + typing bool, timeoutMS int64, ) error { requestData := api.InputTypingEvent{ UserID: userID, RoomID: roomID, Typing: typing, - Timeout: timeout, + TimeoutMS: timeoutMS, OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 22ff12b02..91a1588cb 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -58,7 +58,7 @@ func Setup( keyRing gomatrixserverlib.KeyRing, userUpdateProducer *producers.UserUpdateProducer, syncProducer *producers.SyncAPIProducer, - typingProducer *producers.TypingServerProducer, + eduProducer *producers.EDUServerProducer, transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderQueryAPI, ) { @@ -235,7 +235,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, typingProducer) + return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer) }), ).Methods(http.MethodPut, http.MethodOptions) diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index 29953c32d..ffaa0e662 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -35,7 +35,7 @@ type typingContentJSON struct { func SendTyping( req *http.Request, device *authtypes.Device, roomID string, userID string, accountDB accounts.Database, - typingProducer *producers.TypingServerProducer, + eduProducer *producers.EDUServerProducer, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -69,10 +69,10 @@ func SendTyping( return *resErr } - if err = typingProducer.Send( + if err = eduProducer.SendTyping( req.Context(), userID, roomID, r.Typing, r.Timeout, ); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("typingProducer.Send failed") + util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") return jsonerror.InternalServerError() } diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 2bde0f4cf..a7e241b13 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -19,8 +19,8 @@ import ( "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/transactions" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" ) func main() { @@ -38,11 +38,11 @@ func main() { asQuery := base.CreateHTTPAppServiceAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs() fedSenderAPI := base.CreateHTTPFederationSenderAPIs() - typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, - alias, input, query, typingInputAPI, asQuery, transactions.New(), fedSenderAPI, + alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI)) diff --git a/cmd/dendrite-typing-server/main.go b/cmd/dendrite-edu-server/main.go similarity index 72% rename from cmd/dendrite-typing-server/main.go rename to cmd/dendrite-edu-server/main.go index 461eb7144..a4511f1ba 100644 --- a/cmd/dendrite-typing-server/main.go +++ b/cmd/dendrite-edu-server/main.go @@ -16,22 +16,22 @@ import ( _ "net/http/pprof" "github.com/matrix-org/dendrite/common/basecomponent" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/sirupsen/logrus" ) func main() { cfg := basecomponent.ParseFlags() - base := basecomponent.NewBaseDendrite(cfg, "TypingServerAPI") + base := basecomponent.NewBaseDendrite(cfg, "EDUServerAPI") defer func() { if err := base.Close(); err != nil { logrus.WithError(err).Warn("BaseDendrite close failed") } }() - typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduserver.SetupEDUServerComponent(base, cache.New()) - base.SetupAndServeHTTP(string(base.Cfg.Bind.TypingServer), string(base.Cfg.Listen.TypingServer)) + base.SetupAndServeHTTP(string(base.Cfg.Bind.EDUServer), string(base.Cfg.Listen.EDUServer)) } diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index 367f5dc0c..d18926a68 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -15,8 +15,11 @@ package main import ( + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" ) @@ -34,10 +37,12 @@ func main() { alias, input, query := base.CreateHTTPRoomserverAPIs() asQuery := base.CreateHTTPAppServiceAPIs() + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) federationapi.SetupFederationAPIComponent( base, accountDB, deviceDB, federation, &keyRing, - alias, input, query, asQuery, federationSender, + alias, input, query, asQuery, federationSender, eduProducer, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 27c3054b8..9f6531ed3 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -20,18 +20,19 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -56,7 +57,7 @@ func main() { keyRing := keydb.CreateKeyRing(federation.Client, keyDB) alias, input, query := roomserver.SetupRoomServerComponent(base) - typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) @@ -65,9 +66,10 @@ func main() { clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, alias, input, query, - typingInputAPI, asQuery, transactions.New(), fedSenderAPI, + eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 7c8526715..05802725d 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -23,18 +23,19 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p" "github.com/matrix-org/gomatrixserverlib" @@ -122,7 +123,7 @@ func main() { p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node) alias, input, query := roomserver.SetupRoomServerComponent(base) - typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) @@ -131,9 +132,10 @@ func main() { clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, alias, input, query, - typingInputAPI, asQuery, transactions.New(), fedSenderAPI, + eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) diff --git a/common/basecomponent/base.go b/common/basecomponent/base.go index d1d953f7b..8d559f4dc 100644 --- a/common/basecomponent/base.go +++ b/common/basecomponent/base.go @@ -35,9 +35,9 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/common/config" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - typingServerAPI "github.com/matrix-org/dendrite/typingserver/api" "github.com/sirupsen/logrus" ) @@ -111,10 +111,10 @@ func (b *BaseDendrite) CreateHTTPRoomserverAPIs() ( return alias, input, query } -// CreateHTTPTypingServerAPIs returns typingInputAPI for hitting the typing +// CreateHTTPEDUServerAPIs returns eduInputAPI for hitting the EDU // server over HTTP -func (b *BaseDendrite) CreateHTTPTypingServerAPIs() typingServerAPI.TypingServerInputAPI { - return typingServerAPI.NewTypingServerInputAPIHTTP(b.Cfg.TypingServerURL(), nil) +func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI { + return eduServerAPI.NewEDUServerInputAPIHTTP(b.Cfg.EDUServerURL(), nil) } // CreateHTTPFederationSenderAPIs returns FederationSenderQueryAPI for hitting diff --git a/common/config/config.go b/common/config/config.go index bd83cbf8b..e2f5e6635 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -134,7 +134,7 @@ type Dendrite struct { OutputRoomEvent Topic `yaml:"output_room_event"` // Topic for sending account data from client API to sync API OutputClientData Topic `yaml:"output_client_data"` - // Topic for typingserver/api.OutputTypingEvent events. + // Topic for eduserver/api.OutputTypingEvent events. OutputTypingEvent Topic `yaml:"output_typing_event"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` @@ -206,7 +206,7 @@ type Dendrite struct { RoomServer Address `yaml:"room_server"` FederationSender Address `yaml:"federation_sender"` PublicRoomsAPI Address `yaml:"public_rooms_api"` - TypingServer Address `yaml:"typing_server"` + EDUServer Address `yaml:"edu_server"` } `yaml:"bind"` // The addresses for talking to other microservices. @@ -219,7 +219,7 @@ type Dendrite struct { RoomServer Address `yaml:"room_server"` FederationSender Address `yaml:"federation_sender"` PublicRoomsAPI Address `yaml:"public_rooms_api"` - TypingServer Address `yaml:"typing_server"` + EDUServer Address `yaml:"edu_server"` } `yaml:"listen"` // The config for tracing the dendrite servers. @@ -571,7 +571,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) { checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI)) checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI)) checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer)) - checkNotEmpty(configErrs, "listen.typing_server", string(config.Listen.TypingServer)) + checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer)) } // checkLogging verifies the parameters logging.* are valid. @@ -669,7 +669,7 @@ func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint { // AppServiceURL returns a HTTP URL for where the appservice component is listening. func (config *Dendrite) AppServiceURL() string { - // Hard code the roomserver to talk HTTP for now. + // Hard code the appservice server to talk HTTP for now. // If we support HTTPS we need to think of a practical way to do certificate validation. // People setting up servers shouldn't need to get a certificate valid for the public // internet for an internal API. @@ -685,18 +685,18 @@ func (config *Dendrite) RoomServerURL() string { return "http://" + string(config.Listen.RoomServer) } -// TypingServerURL returns an HTTP URL for where the typing server is listening. -func (config *Dendrite) TypingServerURL() string { - // Hard code the typing server to talk HTTP for now. +// EDUServerURL returns an HTTP URL for where the EDU server is listening. +func (config *Dendrite) EDUServerURL() string { + // Hard code the EDU server to talk HTTP for now. // If we support HTTPS we need to think of a practical way to do certificate validation. // People setting up servers shouldn't need to get a certificate valid for the public // internet for an internal API. - return "http://" + string(config.Listen.TypingServer) + return "http://" + string(config.Listen.EDUServer) } // FederationSenderURL returns an HTTP URL for where the federation sender is listening. func (config *Dendrite) FederationSenderURL() string { - // Hard code the typing server to talk HTTP for now. + // Hard code the federation sender server to talk HTTP for now. // If we support HTTPS we need to think of a practical way to do certificate validation. // People setting up servers shouldn't need to get a certificate valid for the public // internet for an internal API. diff --git a/common/config/config_test.go b/common/config/config_test.go index 110c8b84c..b72f5fad0 100644 --- a/common/config/config_test.go +++ b/common/config/config_test.go @@ -62,7 +62,7 @@ listen: sync_api: "localhost:7773" media_api: "localhost:7774" appservice_api: "localhost:7777" - typing_server: "localhost:7778" + edu_server: "localhost:7778" logging: - type: "file" level: "info" diff --git a/common/test/config.go b/common/test/config.go index 0fed252ae..f88e45125 100644 --- a/common/test/config.go +++ b/common/test/config.go @@ -106,7 +106,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Listen.RoomServer = assignAddress() cfg.Listen.SyncAPI = assignAddress() cfg.Listen.PublicRoomsAPI = assignAddress() - cfg.Listen.TypingServer = assignAddress() + cfg.Listen.EDUServer = assignAddress() // Bind to the same address as the listen address // All microservices are run on the same host in testing @@ -117,7 +117,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Bind.RoomServer = cfg.Listen.RoomServer cfg.Bind.SyncAPI = cfg.Listen.SyncAPI cfg.Bind.PublicRoomsAPI = cfg.Listen.PublicRoomsAPI - cfg.Bind.TypingServer = cfg.Listen.TypingServer + cfg.Bind.EDUServer = cfg.Listen.EDUServer return &cfg, port, nil } diff --git a/dendrite-config.yaml b/dendrite-config.yaml index a8d39aa1e..7436af7a3 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -85,7 +85,7 @@ kafka: topics: output_room_event: roomserverOutput output_client_data: clientapiOutput - output_typing_event: typingServerOutput + output_typing_event: eduServerOutput user_updates: userUpdates # The postgres connection configs for connecting to the databases e.g a postgres:// URI @@ -114,7 +114,7 @@ listen: public_rooms_api: "localhost:7775" federation_sender: "localhost:7776" appservice_api: "localhost:7777" - typing_server: "localhost:7778" + edu_server: "localhost:7778" # The configuration for tracing the dendrite components. tracing: diff --git a/docker/README.md b/docker/README.md index ee4f0f96f..83d0b6a87 100644 --- a/docker/README.md +++ b/docker/README.md @@ -58,7 +58,7 @@ docker-compose up kafka zookeeper postgres and the following dendrite components ``` -docker-compose up client_api media_api sync_api room_server public_rooms_api typing_server +docker-compose up client_api media_api sync_api room_server public_rooms_api edu_server docker-compose up client_api_proxy ``` diff --git a/docker/dendrite-docker.yml b/docker/dendrite-docker.yml index abb8c3307..a72ff3ddc 100644 --- a/docker/dendrite-docker.yml +++ b/docker/dendrite-docker.yml @@ -85,7 +85,7 @@ kafka: topics: output_room_event: roomserverOutput output_client_data: clientapiOutput - output_typing_event: typingServerOutput + output_typing_event: eduServerOutput user_updates: userUpdates @@ -114,7 +114,7 @@ listen: media_api: "media_api:7774" public_rooms_api: "public_rooms_api:7775" federation_sender: "federation_sender:7776" - typing_server: "typing_server:7777" + edu_server: "typing_server:7777" # The configuration for tracing the dendrite components. tracing: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d738ed3f0..957c3bf3f 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -103,10 +103,10 @@ services: networks: - internal - typing_server: - container_name: dendrite_typing_server - hostname: typing_server - entrypoint: ["bash", "./docker/services/typing-server.sh"] + edu_server: + container_name: dendrite_edu_server + hostname: edu_server + entrypoint: ["bash", "./docker/services/edu-server.sh"] build: ./ volumes: - ..:/build diff --git a/docker/services/edu-server.sh b/docker/services/edu-server.sh new file mode 100644 index 000000000..d40b9fa7e --- /dev/null +++ b/docker/services/edu-server.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +bash ./docker/build.sh + +./bin/dendrite-edu-server --config=dendrite.yaml diff --git a/docker/services/typing-server.sh b/docker/services/typing-server.sh deleted file mode 100644 index 16ee0fa62..000000000 --- a/docker/services/typing-server.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -bash ./docker/build.sh - -./bin/dendrite-typing-server --config=dendrite.yaml diff --git a/typingserver/api/input.go b/eduserver/api/input.go similarity index 67% rename from typingserver/api/input.go rename to eduserver/api/input.go index 25e2ea228..ad3f1ed58 100644 --- a/typingserver/api/input.go +++ b/eduserver/api/input.go @@ -30,13 +30,13 @@ type InputTypingEvent struct { RoomID string `json:"room_id"` // Typing is true if the user is typing, false if they have stopped. Typing bool `json:"typing"` - // Timeout is the interval for which the user should be marked as typing. - Timeout int64 `json:"timeout"` + // Timeout is the interval in milliseconds for which the user should be marked as typing. + TimeoutMS int64 `json:"timeout"` // OriginServerTS when the server received the update. OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` } -// InputTypingEventRequest is a request to TypingServerInputAPI +// InputTypingEventRequest is a request to EDUServerInputAPI type InputTypingEventRequest struct { InputTypingEvent InputTypingEvent `json:"input_typing_event"` } @@ -44,8 +44,8 @@ type InputTypingEventRequest struct { // InputTypingEventResponse is a response to InputTypingEvents type InputTypingEventResponse struct{} -// TypingServerInputAPI is used to write events to the typing server. -type TypingServerInputAPI interface { +// EDUServerInputAPI is used to write events to the typing server. +type EDUServerInputAPI interface { InputTypingEvent( ctx context.Context, request *InputTypingEventRequest, @@ -53,24 +53,24 @@ type TypingServerInputAPI interface { ) error } -// TypingServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. -const TypingServerInputTypingEventPath = "/api/typingserver/input" +// EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. +const EDUServerInputTypingEventPath = "/api/eduserver/input" -// NewTypingServerInputAPIHTTP creates a TypingServerInputAPI implemented by talking to a HTTP POST API. -func NewTypingServerInputAPIHTTP(typingServerURL string, httpClient *http.Client) TypingServerInputAPI { +// NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API. +func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) EDUServerInputAPI { if httpClient == nil { httpClient = http.DefaultClient } - return &httpTypingServerInputAPI{typingServerURL, httpClient} + return &httpEDUServerInputAPI{eduServerURL, httpClient} } -type httpTypingServerInputAPI struct { - typingServerURL string - httpClient *http.Client +type httpEDUServerInputAPI struct { + eduServerURL string + httpClient *http.Client } -// InputRoomEvents implements TypingServerInputAPI -func (h *httpTypingServerInputAPI) InputTypingEvent( +// InputRoomEvents implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputTypingEvent( ctx context.Context, request *InputTypingEventRequest, response *InputTypingEventResponse, @@ -78,6 +78,6 @@ func (h *httpTypingServerInputAPI) InputTypingEvent( span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent") defer span.Finish() - apiURL := h.typingServerURL + TypingServerInputTypingEventPath + apiURL := h.eduServerURL + EDUServerInputTypingEventPath return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/typingserver/api/output.go b/eduserver/api/output.go similarity index 100% rename from typingserver/api/output.go rename to eduserver/api/output.go diff --git a/typingserver/cache/cache.go b/eduserver/cache/cache.go similarity index 87% rename from typingserver/cache/cache.go rename to eduserver/cache/cache.go index 3f05c938e..46f7a2b13 100644 --- a/typingserver/cache/cache.go +++ b/eduserver/cache/cache.go @@ -32,8 +32,8 @@ type roomData struct { userSet userSet } -// TypingCache maintains a list of users typing in each room. -type TypingCache struct { +// EDUCache maintains a list of users typing in each room. +type EDUCache struct { sync.RWMutex latestSyncPosition int64 data map[string]*roomData @@ -42,26 +42,26 @@ type TypingCache struct { // Create a roomData with its sync position set to the latest sync position. // Must only be called after locking the cache. -func (t *TypingCache) newRoomData() *roomData { +func (t *EDUCache) newRoomData() *roomData { return &roomData{ syncPosition: t.latestSyncPosition, userSet: make(userSet), } } -// NewTypingCache returns a new TypingCache initialised for use. -func NewTypingCache() *TypingCache { - return &TypingCache{data: make(map[string]*roomData)} +// New returns a new EDUCache initialised for use. +func New() *EDUCache { + return &EDUCache{data: make(map[string]*roomData)} } // SetTimeoutCallback sets a callback function that is called right after // a user is removed from the typing user list due to timeout. -func (t *TypingCache) SetTimeoutCallback(fn TimeoutCallbackFn) { +func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) { t.timeoutCallback = fn } // GetTypingUsers returns the list of users typing in a room. -func (t *TypingCache) GetTypingUsers(roomID string) []string { +func (t *EDUCache) GetTypingUsers(roomID string) []string { users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0) // 0 should work above because the first position used will be 1. return users @@ -70,7 +70,7 @@ func (t *TypingCache) GetTypingUsers(roomID string) []string { // GetTypingUsersIfUpdatedAfter returns all users typing in this room with // updated == true if the typing sync position of the room is after the given // position. Otherwise, returns an empty slice with updated == false. -func (t *TypingCache) GetTypingUsersIfUpdatedAfter( +func (t *EDUCache) GetTypingUsersIfUpdatedAfter( roomID string, position int64, ) (users []string, updated bool) { t.RLock() @@ -93,7 +93,7 @@ func (t *TypingCache) GetTypingUsersIfUpdatedAfter( // expire is the time when the user typing should time out. // if expire is nil, defaultTypingTimeout is assumed. // Returns the latest sync position for typing after update. -func (t *TypingCache) AddTypingUser( +func (t *EDUCache) AddTypingUser( userID, roomID string, expire *time.Time, ) int64 { expireTime := getExpireTime(expire) @@ -111,7 +111,7 @@ func (t *TypingCache) AddTypingUser( // addUser with mutex lock & replace the previous timer. // Returns the latest typing sync position after update. -func (t *TypingCache) addUser( +func (t *EDUCache) addUser( userID, roomID string, expiryTimer *time.Timer, ) int64 { t.Lock() @@ -143,7 +143,7 @@ func (t *TypingCache) addUser( // RemoveUser with mutex lock & stop the timer. // Returns the latest sync position for typing after update. -func (t *TypingCache) RemoveUser(userID, roomID string) int64 { +func (t *EDUCache) RemoveUser(userID, roomID string) int64 { t.Lock() defer t.Unlock() @@ -166,7 +166,7 @@ func (t *TypingCache) RemoveUser(userID, roomID string) int64 { return t.latestSyncPosition } -func (t *TypingCache) GetLatestSyncPosition() int64 { +func (t *EDUCache) GetLatestSyncPosition() int64 { t.Lock() defer t.Unlock() return t.latestSyncPosition diff --git a/typingserver/cache/cache_test.go b/eduserver/cache/cache_test.go similarity index 88% rename from typingserver/cache/cache_test.go rename to eduserver/cache/cache_test.go index 2a6ffa50e..8a1b6f797 100644 --- a/typingserver/cache/cache_test.go +++ b/eduserver/cache/cache_test.go @@ -19,10 +19,10 @@ import ( "github.com/matrix-org/dendrite/common/test" ) -func TestTypingCache(t *testing.T) { - tCache := NewTypingCache() +func TestEDUCache(t *testing.T) { + tCache := New() if tCache == nil { - t.Fatal("NewTypingCache failed") + t.Fatal("New failed") } t.Run("AddTypingUser", func(t *testing.T) { @@ -38,7 +38,7 @@ func TestTypingCache(t *testing.T) { }) } -func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam +func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam present := time.Now() tests := []struct { userID string @@ -58,7 +58,7 @@ func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam } } -func testGetTypingUsers(t *testing.T, tCache *TypingCache) { +func testGetTypingUsers(t *testing.T, tCache *EDUCache) { tests := []struct { roomID string wantUsers []string @@ -75,7 +75,7 @@ func testGetTypingUsers(t *testing.T, tCache *TypingCache) { } } -func testRemoveUser(t *testing.T, tCache *TypingCache) { +func testRemoveUser(t *testing.T, tCache *EDUCache) { tests := []struct { roomID string userIDs []string diff --git a/typingserver/typingserver.go b/eduserver/eduserver.go similarity index 66% rename from typingserver/typingserver.go rename to eduserver/eduserver.go index b43f72f75..8ddd2c527 100644 --- a/typingserver/typingserver.go +++ b/eduserver/eduserver.go @@ -10,27 +10,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -package typingserver +package eduserver import ( "net/http" "github.com/matrix-org/dendrite/common/basecomponent" - "github.com/matrix-org/dendrite/typingserver/api" - "github.com/matrix-org/dendrite/typingserver/cache" - "github.com/matrix-org/dendrite/typingserver/input" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/eduserver/input" ) -// SetupTypingServerComponent sets up and registers HTTP handlers for the -// TypingServer component. Returns instances of the various roomserver APIs, +// SetupEDUServerComponent sets up and registers HTTP handlers for the +// EDUServer component. Returns instances of the various roomserver APIs, // allowing other components running in the same process to hit the query the // APIs directly instead of having to use HTTP. -func SetupTypingServerComponent( +func SetupEDUServerComponent( base *basecomponent.BaseDendrite, - typingCache *cache.TypingCache, -) api.TypingServerInputAPI { - inputAPI := &input.TypingServerInputAPI{ - Cache: typingCache, + eduCache *cache.EDUCache, +) api.EDUServerInputAPI { + inputAPI := &input.EDUServerInputAPI{ + Cache: eduCache, Producer: base.KafkaProducer, OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), } diff --git a/typingserver/input/input.go b/eduserver/input/input.go similarity index 77% rename from typingserver/input/input.go rename to eduserver/input/input.go index 0e2fbe51f..845909452 100644 --- a/typingserver/input/input.go +++ b/eduserver/input/input.go @@ -19,25 +19,25 @@ import ( "time" "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/typingserver/api" - "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "gopkg.in/Shopify/sarama.v1" ) -// TypingServerInputAPI implements api.TypingServerInputAPI -type TypingServerInputAPI struct { +// EDUServerInputAPI implements api.EDUServerInputAPI +type EDUServerInputAPI struct { // Cache to store the current typing members in each room. - Cache *cache.TypingCache + Cache *cache.EDUCache // The kafka topic to output new typing events to. OutputTypingEventTopic string // kafka producer Producer sarama.SyncProducer } -// InputTypingEvent implements api.TypingServerInputAPI -func (t *TypingServerInputAPI) InputTypingEvent( +// InputTypingEvent implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputTypingEvent( ctx context.Context, request *api.InputTypingEventRequest, response *api.InputTypingEventResponse, @@ -46,7 +46,7 @@ func (t *TypingServerInputAPI) InputTypingEvent( if ite.Typing { // user is typing, update our current state of users typing. expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.Timeout) * time.Millisecond, + time.Duration(ite.TimeoutMS) * time.Millisecond, ) t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime) } else { @@ -56,7 +56,7 @@ func (t *TypingServerInputAPI) InputTypingEvent( return t.sendEvent(ite) } -func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { +func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, RoomID: ite.RoomID, @@ -69,7 +69,7 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { if ev.Typing { expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.Timeout) * time.Millisecond, + time.Duration(ite.TimeoutMS) * time.Millisecond, ) ote.ExpireTime = &expireTime } @@ -89,9 +89,9 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { return err } -// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux. -func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) { - servMux.Handle(api.TypingServerInputTypingEventPath, +// SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux. +func (t *EDUServerInputAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle(api.EDUServerInputTypingEventPath, common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse { var request api.InputTypingEventRequest var response api.InputTypingEventResponse diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 90db95b3a..ed96322b8 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -41,12 +41,13 @@ func SetupFederationAPIComponent( queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, + eduProducer *producers.EDUServerProducer, ) { roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) routing.Setup( base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI, - roomserverProducer, federationSenderAPI, *keyRing, + roomserverProducer, eduProducer, federationSenderAPI, *keyRing, federation, accountsDB, deviceDB, ) } diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index b5c8e53de..9ac535767 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -48,6 +48,7 @@ func Setup( aliasAPI roomserverAPI.RoomserverAliasAPI, asAPI appserviceAPI.AppServiceQueryAPI, producer *producers.RoomserverProducer, + eduProducer *producers.EDUServerProducer, federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, @@ -79,7 +80,7 @@ func Setup( } return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, query, producer, keys, federation, + cfg, query, producer, eduProducer, keys, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 4c92c7e5e..1013a44cf 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -36,20 +36,22 @@ func Send( cfg *config.Dendrite, query api.RoomserverQueryAPI, producer *producers.RoomserverProducer, + eduProducer *producers.EDUServerProducer, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { t := txnReq{ - context: httpReq.Context(), - query: query, - producer: producer, - keys: keys, - federation: federation, + context: httpReq.Context(), + query: query, + producer: producer, + eduProducer: eduProducer, + keys: keys, + federation: federation, } var txnEvents struct { - PDUs []json.RawMessage `json:"pdus"` - EDUs []json.RawMessage `json:"edus"` + PDUs []json.RawMessage `json:"pdus"` + EDUs []gomatrixserverlib.EDU `json:"edus"` } if err := json.Unmarshal(request.Content(), &txnEvents); err != nil { @@ -59,7 +61,9 @@ func Send( } } + // TODO: Really we should have a function to convert FederationRequest to txnReq t.PDUs = txnEvents.PDUs + t.EDUs = txnEvents.EDUs t.Origin = request.Origin() t.TransactionID = txnID t.Destination = cfg.Matrix.ServerName @@ -80,11 +84,12 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - context context.Context - query api.RoomserverQueryAPI - producer *producers.RoomserverProducer - keys gomatrixserverlib.KeyRing - federation *gomatrixserverlib.FederationClient + context context.Context + query api.RoomserverQueryAPI + producer *producers.RoomserverProducer + eduProducer *producers.EDUServerProducer + keys gomatrixserverlib.KeyRing + federation *gomatrixserverlib.FederationClient } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { @@ -152,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { } } - // TODO: Process the EDUs. + t.processEDUs(t.EDUs) util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID) return &gomatrixserverlib.RespSend{PDUs: results}, nil } @@ -163,6 +168,29 @@ type unknownRoomError struct { func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) } +func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { + for _, e := range edus { + switch e.Type { + case gomatrixserverlib.MTyping: + // https://matrix.org/docs/spec/server_server/latest#typing-notifications + var typingPayload struct { + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` + } + if err := json.Unmarshal(e.Content, &typingPayload); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event") + continue + } + if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server") + } + default: + util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu") + } + } +} + func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { prevEventIDs := e.PrevEventIDs() diff --git a/federationsender/consumers/typingserver.go b/federationsender/consumers/eduserver.go similarity index 76% rename from federationsender/consumers/typingserver.go rename to federationsender/consumers/eduserver.go index 590fcb257..4d2445f3c 100644 --- a/federationsender/consumers/typingserver.go +++ b/federationsender/consumers/eduserver.go @@ -18,15 +18,15 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" "gopkg.in/Shopify/sarama.v1" ) -// OutputTypingEventConsumer consumes events that originate in typing server. +// OutputTypingEventConsumer consumes events that originate in EDU server. type OutputTypingEventConsumer struct { consumer *common.ContinualConsumer db storage.Database @@ -34,7 +34,7 @@ type OutputTypingEventConsumer struct { ServerName gomatrixserverlib.ServerName } -// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers. +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from EDU servers. func NewOutputTypingEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, @@ -57,19 +57,30 @@ func NewOutputTypingEventConsumer( return c } -// Start consuming from typing servers +// Start consuming from EDU servers func (t *OutputTypingEventConsumer) Start() error { return t.consumer.Start() } -// onMessage is called for OutputTypingEvent received from the typing servers. +// onMessage is called for OutputTypingEvent received from the EDU servers. // Parses the msg, creates a matrix federation EDU and sends it to joined hosts. func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Extract the typing event from msg. var ote api.OutputTypingEvent if err := json.Unmarshal(msg.Value, &ote); err != nil { // Skip this msg but continue processing messages. - log.WithError(err).Errorf("typingserver output log: message parse failed") + log.WithError(err).Errorf("eduserver output log: message parse failed") + return nil + } + + // only send typing events which originated from us + _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID) + if err != nil { + log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender") + return nil + } + if typingServerName != t.ServerName { + log.WithField("other_server", typingServerName).Info("Suppressing typing notif: originated elsewhere") return nil } diff --git a/go.mod b/go.mod index 97c0e44ef..8743437ec 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/libp2p/go-libp2p-core v0.5.0 github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f - github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 + github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 diff --git a/go.sum b/go.sum index aeedd6f62..1294f3b7b 100644 --- a/go.sum +++ b/go.sum @@ -122,6 +122,8 @@ github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f h1:5T github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f/go.mod h1:qK3LUW7RCLhFM7gC3pabj3EXT9A1DsCK33MHstUhhbk= github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 h1:UlhTKClOgWnSB25Rv+BS/Vc1mRinjNUErfyGEVOBP04= github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= +github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 h1:SnhC7/o87ueVwEWI3mUYtrs+s8VnYq3KZtpWsFQOLFE= +github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20190130130140-385f072fe9af h1:piaIBNQGIHnni27xRB7VKkEwoWCgAmeuYf8pxAyG0bI= github.com/matrix-org/gomatrix v0.0.0-20190130130140-385f072fe9af/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= diff --git a/roomserver/state/state.go b/roomserver/state/state.go index b8e3e18a1..94873dbeb 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -672,7 +672,7 @@ func (v StateResolution) calculateStateAfterManyEvents( return } algorithm = "full_state_with_conflicts" - state = resolved + state = resolved[:util.SortAndUnique(stateEntrySorter(resolved))] } else { algorithm = "full_state_no_conflicts" // 6) There weren't any conflicts diff --git a/show-expected-fail-tests.sh b/show-expected-fail-tests.sh index 9cd51b007..0a4c7be87 100755 --- a/show-expected-fail-tests.sh +++ b/show-expected-fail-tests.sh @@ -60,7 +60,7 @@ while read -r test_name; do # Ignore empty lines [ "${test_name}" = "" ] && continue - grep "${test_name}" "${whitelist_file}" > /dev/null 2>&1 + grep "^${test_name}" "${whitelist_file}" > /dev/null 2>&1 if [ "$?" != "0" ]; then # Check if this test name is blacklisted if printf '%s\n' "${blacklisted_tests[@]}" | grep -q -P "^${test_name}$"; then @@ -80,8 +80,8 @@ done <<< "${passed_but_expected_fail}" # TODO: Check that the same test doesn't appear twice in the whitelist|blacklist # Trim test output strings -tests_to_add=$(echo -e $tests_to_add | xargs) -already_in_whitelist=$(echo -e $already_in_whitelist | xargs) +tests_to_add=$(echo -e $tests_to_add | xargs -d '\n') +already_in_whitelist=$(echo -e $already_in_whitelist | xargs -d '\n') # Format output with markdown for buildkite annotation rendering purposes if [ -n "${tests_to_add}" ] && [ -n "${already_in_whitelist}" ]; then diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/eduserver.go similarity index 90% rename from syncapi/consumers/typingserver.go rename to syncapi/consumers/eduserver.go index 369254411..5491c1e9f 100644 --- a/syncapi/consumers/typingserver.go +++ b/syncapi/consumers/eduserver.go @@ -19,15 +19,15 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/api" log "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) -// OutputTypingEventConsumer consumes events that originated in the typing server. +// OutputTypingEventConsumer consumes events that originated in the EDU server. type OutputTypingEventConsumer struct { typingConsumer *common.ContinualConsumer db storage.Database @@ -35,7 +35,7 @@ type OutputTypingEventConsumer struct { } // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. -// Call Start() to begin consuming from the typing server. +// Call Start() to begin consuming from the EDU server. func NewOutputTypingEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, @@ -60,7 +60,7 @@ func NewOutputTypingEventConsumer( return s } -// Start consuming from typing api +// Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { s.notifier.OnNewEvent( @@ -78,7 +78,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error var output api.OutputTypingEvent if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("typing server output log: message parse failure") + log.WithError(err).Errorf("EDU server output log: message parse failure") return nil } @@ -86,7 +86,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": output.Event.RoomID, "user_id": output.Event.UserID, "typing": output.Event.Typing, - }).Debug("received data from typing server") + }).Debug("received data from EDU server") var typingPos types.StreamPosition typingEvent := output.Event diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index b6dc19696..a3efd8d58 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -20,9 +20,9 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/gomatrixserverlib" ) diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index f3f1aabc7..ead1bf335 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -30,8 +30,8 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/gomatrixserverlib" ) @@ -53,7 +53,7 @@ type SyncServerDatasource struct { events outputRoomEventsStatements roomstate currentRoomStateStatements invites inviteEventsStatements - typingCache *cache.TypingCache + eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities backwardExtremitiesStatements } @@ -86,7 +86,7 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er if err := d.backwardExtremities.prepare(d.db); err != nil { return nil, err } - d.typingCache = cache.NewTypingCache() + d.eduCache = cache.New() return &d, nil } @@ -395,7 +395,7 @@ func (d *SyncServerDatasource) syncPositionTx( maxEventID = maxInviteID } sp.PDUPosition = types.StreamPosition(maxEventID) - sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition()) + sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition()) return } @@ -468,7 +468,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUTypingPosition), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -719,7 +719,7 @@ func (d *SyncServerDatasource) RetireInviteEvent( } func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.typingCache.SetTimeoutCallback(fn) + d.eduCache.SetTimeoutCallback(fn) } // AddTypingUser adds a typing user to the typing cache. @@ -727,7 +727,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback func (d *SyncServerDatasource) AddTypingUser( userID, roomID string, expireTime *time.Time, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime)) + return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) } // RemoveTypingUser removes a typing user from the typing cache. @@ -735,7 +735,7 @@ func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) RemoveTypingUser( userID, roomID string, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID)) + return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) } func (d *SyncServerDatasource) addInvitesToResponse( diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 8ff189007..30f77e54d 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -33,8 +33,8 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/gomatrixserverlib" ) @@ -57,7 +57,7 @@ type SyncServerDatasource struct { events outputRoomEventsStatements roomstate currentRoomStateStatements invites inviteEventsStatements - typingCache *cache.TypingCache + eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities backwardExtremitiesStatements } @@ -84,7 +84,7 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro if err = d.prepare(); err != nil { return nil, err } - d.typingCache = cache.NewTypingCache() + d.eduCache = cache.New() return &d, nil } @@ -429,7 +429,7 @@ func (d *SyncServerDatasource) syncPositionTx( maxEventID = maxInviteID } sp.PDUPosition = types.StreamPosition(maxEventID) - sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition()) + sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition()) return } @@ -502,7 +502,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUTypingPosition), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -766,7 +766,7 @@ func (d *SyncServerDatasource) RetireInviteEvent( } func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.typingCache.SetTimeoutCallback(fn) + d.eduCache.SetTimeoutCallback(fn) } // AddTypingUser adds a typing user to the typing cache. @@ -774,7 +774,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback func (d *SyncServerDatasource) AddTypingUser( userID, roomID string, expireTime *time.Time, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime)) + return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) } // RemoveTypingUser removes a typing user from the typing cache. @@ -782,7 +782,7 @@ func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) RemoveTypingUser( userID, roomID string, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID)) + return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) } func (d *SyncServerDatasource) addInvitesToResponse(