Wire Pushserver component

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
Piotr Kozimor 2021-11-17 17:00:59 +01:00
parent 151d52ce23
commit 2db65e1b43
18 changed files with 287 additions and 6 deletions

3
.gitignore vendored
View file

@ -60,3 +60,6 @@ cmd/dendrite-demo-yggdrasil/embed/fs*.go
# Test dependencies
test/wasm/node_modules
# Ignore complement folder when running locally
complement

View file

@ -323,6 +323,17 @@ user_api:
max_idle_conns: 2
conn_max_lifetime: -1
# Configuration for the Push Server API.
push_server:
internal_api:
listen: http://localhost:7782
connect: http://localhost:7782
database:
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_pushserver?sslmode=disable
max_open_conns: 10
max_idle_conns: 2
conn_max_lifetime: -1
# Configuration for Opentracing.
# See https://github.com/matrix-org/dendrite/tree/master/docs/tracing for information on
# how this works and how to set it up.

View file

@ -1,5 +1,5 @@
#!/bin/sh
for db in userapi_accounts userapi_devices mediaapi syncapi roomserver signingkeyserver keyserver federationsender appservice naffka; do
for db in userapi_accounts userapi_devices pushserver mediaapi syncapi roomserver signingkeyserver keyserver federationsender appservice naffka; do
createdb -U dendrite -O dendrite dendrite_$db
done

View file

@ -24,6 +24,7 @@ import (
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
pushserverAPI "github.com/matrix-org/dendrite/pushserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/kafka"
@ -46,6 +47,7 @@ func AddPublicRoutes(
fsAPI federationSenderAPI.FederationSenderInternalAPI,
userAPI userapi.UserInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
psAPI pushserverAPI.PushserverInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
@ -59,6 +61,7 @@ func AddPublicRoutes(
routing.Setup(
router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, userAPI, federation,
syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg,
syncProducer, transactionsCache, fsAPI, keyAPI,
psAPI, extRoomsProvider, mscCfg,
)
}

View file

@ -7,11 +7,13 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
pushserverapi "github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
type newPasswordRequest struct {
@ -28,6 +30,7 @@ type newPasswordAuth struct {
func Password(
req *http.Request,
psAPI pushserverapi.PushserverInternalAPI,
userAPI api.UserInternalAPI,
accountDB accounts.Database,
device *api.Device,
@ -37,6 +40,11 @@ func Password(
var r newPasswordRequest
r.LogoutDevices = true
logrus.WithFields(logrus.Fields{
"sessionId": device.SessionID,
"userId": device.UserID,
}).Debug("Changing password")
// Unmarshal the request.
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
@ -116,6 +124,15 @@ func Password(
util.GetLogger(req.Context()).WithError(err).Error("PerformDeviceDeletion failed")
return jsonerror.InternalServerError()
}
pushersReq := &pushserverapi.PerformPusherDeletionRequest{
Localpart: localpart,
SessionID: device.SessionID,
}
if err := psAPI.PerformPusherDeletion(req.Context(), pushersReq, &struct{}{}); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("PerformPusherDeletion failed")
return jsonerror.InternalServerError()
}
}
// Return a success code.

115
clientapi/routing/pusher.go Normal file
View file

@ -0,0 +1,115 @@
// Copyright 2021 Dan Peleg <dan@globekeeper.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"net/url"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
pushserverapi "github.com/matrix-org/dendrite/pushserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// GetPushers handles /_matrix/client/r0/pushers
func GetPushers(
req *http.Request, device *userapi.Device,
psAPI pushserverapi.PushserverInternalAPI,
) util.JSONResponse {
var queryRes pushserverapi.QueryPushersResponse
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SplitID failed")
return jsonerror.InternalServerError()
}
err = psAPI.QueryPushers(req.Context(), &pushserverapi.QueryPushersRequest{
Localpart: localpart,
}, &queryRes)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("QueryPushers failed")
return jsonerror.InternalServerError()
}
for i := range queryRes.Pushers {
queryRes.Pushers[i].SessionID = 0
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: queryRes,
}
}
// SetPusher handles /_matrix/client/r0/pushers/set
// This endpoint allows the creation, modification and deletion of pushers for this user ID.
// The behaviour of this endpoint varies depending on the values in the JSON body.
func SetPusher(
req *http.Request, device *userapi.Device,
psAPI pushserverapi.PushserverInternalAPI,
) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SplitID failed")
return jsonerror.InternalServerError()
}
body := pushserverapi.PerformPusherSetRequest{}
if resErr := httputil.UnmarshalJSONRequest(req, &body); resErr != nil {
return *resErr
}
if len(body.AppID) > 64 {
return invalidParam("length of app_id must be no more than 64 characters")
}
if len(body.PushKey) > 512 {
return invalidParam("length of pushkey must be no more than 512 bytes")
}
uInt := body.Data["url"]
if uInt != nil {
u, ok := uInt.(string)
if !ok {
return invalidParam("url must be string")
}
if u != "" {
var pushUrl *url.URL
pushUrl, err = url.Parse(u)
if err != nil {
return invalidParam("malformed url passed")
}
if pushUrl.Scheme != "https" {
return invalidParam("only https scheme is allowed")
}
}
}
body.Localpart = localpart
body.SessionID = device.SessionID
err = psAPI.PerformPusherSet(req.Context(), &body, &struct{}{})
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("PerformPusherSet failed")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}
func invalidParam(msg string) util.JSONResponse {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidParam(msg),
}
}

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
pushserverAPI "github.com/matrix-org/dendrite/pushserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
@ -58,6 +59,7 @@ func Setup(
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
psAPI pushserverAPI.PushserverInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
@ -478,7 +480,7 @@ func Setup(
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return Password(req, userAPI, accountDB, device, cfg)
return Password(req, psAPI, userAPI, accountDB, device, cfg)
}),
).Methods(http.MethodPost, http.MethodOptions)
@ -833,6 +835,21 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/pushers",
httputil.MakeAuthAPI("get_pushers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return GetPushers(req, device, psAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/pushers/set",
httputil.MakeAuthAPI("set_pushers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return SetPusher(req, device, psAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
// Stub implementations for sytest
r0mux.Handle("/events",
httputil.MakeExternalAPI("events", func(req *http.Request) util.JSONResponse {

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/pushserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup"
@ -69,6 +70,7 @@ func main() {
cfg.RoomServer.InternalAPI.Connect = httpAPIAddr
cfg.SigningKeyServer.InternalAPI.Connect = httpAPIAddr
cfg.SyncAPI.InternalAPI.Connect = httpAPIAddr
cfg.PushServer.InternalAPI.Connect = httpAPIAddr
}
base := setup.NewBaseDendrite(cfg, "Monolith", *enableHTTPAPIs)
@ -141,6 +143,12 @@ func main() {
}
rsAPI.SetAppserviceAPI(asAPI)
psAPI := pushserver.NewInternalAPI(base, rsAPI)
if base.UseHTTPAPIs {
pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI)
psAPI = base.PushServerHTTPClient()
}
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
@ -155,6 +163,7 @@ func main() {
ServerKeyAPI: skAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
PushserverAPI: psAPI,
}
monolith.AddAllPublicRoutes(
base.ProcessContext,

View file

@ -31,10 +31,11 @@ func ClientAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
eduInputAPI := base.EDUServerClient()
userAPI := base.UserAPIClient()
keyAPI := base.KeyServerHTTPClient()
psAPI := base.PushServerHTTPClient()
clientapi.AddPublicRoutes(
base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, accountDB, federation,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, psAPI, nil,
&cfg.MSCs,
)

View file

@ -0,0 +1,34 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package personalities
import (
"github.com/matrix-org/dendrite/pushserver"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
)
func PushServer(base *setup.BaseDendrite, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI) {
intAPI := pushserver.NewInternalAPI(base, rsAPI)
pushserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
base.SetupAndServeHTTP(
base.Cfg.PushServer.InternalAPI.Listen, // internal listener
setup.NoListener, // external listener
nil, nil,
)
}

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/pushserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
@ -174,6 +175,7 @@ func main() {
cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db"
cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db"
cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db"
cfg.PushServer.Database.ConnectionString = "file:/idb/dendritejs_pushserver.db"
cfg.Global.Kafka.UseNaffka = true
cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db"
cfg.Global.TrustedIDServers = []string{
@ -215,6 +217,8 @@ func main() {
rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
psAPI := pushserver.NewInternalAPI(base)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
@ -228,6 +232,7 @@ func main() {
RoomserverAPI: rsAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
PushserverAPI: psAPI,
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: p2pPublicRoomProvider,
}

View file

@ -371,6 +371,17 @@ user_api:
# The default lifetime is 3600000ms (60 minutes).
# openid_token_lifetime_ms: 3600000
# Configuration for the Push Server API.
push_server:
internal_api:
listen: http://localhost:7782
connect: http://localhost:7782
database:
connection_string: file:pushserver.db
max_open_conns: 10
max_idle_conns: 2
conn_max_lifetime: -1
# Configuration for Opentracing.
# See https://github.com/matrix-org/dendrite/tree/master/docs/tracing for information on
# how this works and how to set it up.

View file

@ -51,6 +51,8 @@ import (
fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
keyinthttp "github.com/matrix-org/dendrite/keyserver/inthttp"
pushserverAPI "github.com/matrix-org/dendrite/pushserver/api"
psinthttp "github.com/matrix-org/dendrite/pushserver/inthttp"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp"
"github.com/matrix-org/dendrite/setup/config"
@ -280,6 +282,15 @@ func (b *BaseDendrite) KeyServerHTTPClient() keyserverAPI.KeyInternalAPI {
return f
}
// PushServerHTTPClient returns PushserverInternalAPI for hitting the push server over HTTP
func (b *BaseDendrite) PushServerHTTPClient() pushserverAPI.PushserverInternalAPI {
f, err := psinthttp.NewPushserverClient(b.Cfg.PushServerURL(), b.apiHttpClient)
if err != nil {
logrus.WithError(err).Panic("PushServerHTTPClient failed", b.apiHttpClient)
}
return f
}
// CreateAccountsDB creates a new instance of the accounts database. Should only
// be called once per component.
func (b *BaseDendrite) CreateAccountsDB() accounts.Database {

View file

@ -65,6 +65,7 @@ type Dendrite struct {
SigningKeyServer SigningKeyServer `yaml:"signing_key_server"`
SyncAPI SyncAPI `yaml:"sync_api"`
UserAPI UserAPI `yaml:"user_api"`
PushServer PushServer `yaml:"push_server"`
MSCs MSCs `yaml:"mscs"`
@ -308,6 +309,7 @@ func (c *Dendrite) Defaults() {
c.SyncAPI.Defaults()
c.UserAPI.Defaults()
c.AppServiceAPI.Defaults()
c.PushServer.Defaults()
c.MSCs.Defaults()
c.Wiring()
@ -340,6 +342,7 @@ func (c *Dendrite) Wiring() {
c.SyncAPI.Matrix = &c.Global
c.UserAPI.Matrix = &c.Global
c.AppServiceAPI.Matrix = &c.Global
c.PushServer.Matrix = &c.Global
c.MSCs.Matrix = &c.Global
c.ClientAPI.Derived = &c.Derived
@ -547,6 +550,15 @@ func (config *Dendrite) KeyServerURL() string {
return string(config.KeyServer.InternalAPI.Connect)
}
// PushServerURL returns an HTTP URL for where the push server is listening.
func (config *Dendrite) PushServerURL() string {
// Hard code the push server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return string(config.PushServer.InternalAPI.Connect)
}
// SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
if !config.Tracing.Enabled {

View file

@ -0,0 +1,22 @@
package config
type PushServer struct {
Matrix *Global `yaml:"-"`
InternalAPI InternalAPIOptions `yaml:"internal_api"`
Database DatabaseOptions `yaml:"database"`
}
func (c *PushServer) Defaults() {
c.InternalAPI.Listen = "http://localhost:7783"
c.InternalAPI.Connect = "http://localhost:7783"
c.Database.Defaults(10)
c.Database.ConnectionString = "file:pushserver.db"
}
func (c *PushServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkURL(configErrs, "room_server.internal_api.listen", string(c.InternalAPI.Listen))
checkURL(configErrs, "room_server.internal_ap.bind", string(c.InternalAPI.Connect))
checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString))
}

View file

@ -205,6 +205,11 @@ user_api:
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1
pusher_database:
connection_string: file:pushserver.db
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1
tracing:
enabled: false
jaeger:

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/internal/transactions"
keyAPI "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/mediaapi"
pushserverAPI "github.com/matrix-org/dendrite/pushserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
@ -51,6 +52,7 @@ type Monolith struct {
ServerKeyAPI serverKeyAPI.SigningKeyServerAPI
UserAPI userapi.UserInternalAPI
KeyAPI keyAPI.KeyInternalAPI
PushserverAPI pushserverAPI.PushserverInternalAPI
// Optional
ExtPublicRoomsProvider api.ExtraPublicRoomsProvider
@ -62,8 +64,8 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
&m.Config.MSCs,
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.PushserverAPI,
m.ExtPublicRoomsProvider, &m.Config.MSCs,
)
federationapi.AddPublicRoutes(
ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,

View file

@ -587,3 +587,6 @@ User can invite remote user to room with version 9
Remote user can backfill in a room with version 9
Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9
Pushers created with a different access token are deleted on password change
Pushers created with a the same access token are not deleted on password change
Can fetch a user's pushers