diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 09af80f6c..e30057ed4 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -334,6 +334,7 @@ func (m *DendriteMonolith) Start() { base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicMediaAPIMux, + base.SynapseAdminMux, ) httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 5074f6da4..338628049 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -173,6 +173,7 @@ func (m *DendriteMonolith) Start() { base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicMediaAPIMux, + base.SynapseAdminMux, ) httpRouter := mux.NewRouter() diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 2c4fa5d64..562d89d28 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -35,6 +35,7 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. func AddPublicRoutes( router *mux.Router, + synapseAdminRouter *mux.Router, cfg *config.ClientAPI, accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, @@ -56,7 +57,7 @@ func AddPublicRoutes( } routing.Setup( - router, cfg, eduInputAPI, rsAPI, asAPI, + router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI, accountsDB, userAPI, federation, syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg, ) diff --git a/clientapi/routing/register.go b/clientapi/routing/register.go index 526418669..8823a41e3 100644 --- a/clientapi/routing/register.go +++ b/clientapi/routing/register.go @@ -17,10 +17,7 @@ package routing import ( "context" - "crypto/hmac" - "crypto/sha1" "encoding/json" - "errors" "fmt" "io/ioutil" "net/http" @@ -594,7 +591,6 @@ func handleRegistrationFlow( accessToken string, accessTokenErr error, ) util.JSONResponse { - // TODO: Shared secret registration (create new user scripts) // TODO: Enable registration config flag // TODO: Guest account upgrading @@ -643,20 +639,6 @@ func handleRegistrationFlow( // Add Recaptcha to the list of completed registration stages AddCompletedSessionStage(sessionID, authtypes.LoginTypeRecaptcha) - case authtypes.LoginTypeSharedSecret: - // Check shared secret against config - valid, err := isValidMacLogin(cfg, r.Username, r.Password, r.Admin, r.Auth.Mac) - - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("isValidMacLogin failed") - return jsonerror.InternalServerError() - } else if !valid { - return util.MessageResponse(http.StatusForbidden, "HMAC incorrect") - } - - // Add SharedSecret to the list of completed registration stages - AddCompletedSessionStage(sessionID, authtypes.LoginTypeSharedSecret) - case authtypes.LoginTypeDummy: // there is nothing to do // Add Dummy to the list of completed registration stages @@ -849,49 +831,6 @@ func completeRegistration( } } -// Used for shared secret registration. -// Checks if the username, password and isAdmin flag matches the given mac. -func isValidMacLogin( - cfg *config.ClientAPI, - username, password string, - isAdmin bool, - givenMac []byte, -) (bool, error) { - sharedSecret := cfg.RegistrationSharedSecret - - // Check that shared secret registration isn't disabled. - if cfg.RegistrationSharedSecret == "" { - return false, errors.New("Shared secret registration is disabled") - } - - // Double check that username/password don't contain the HMAC delimiters. We should have - // already checked this. - if strings.Contains(username, "\x00") { - return false, errors.New("Username contains invalid character") - } - if strings.Contains(password, "\x00") { - return false, errors.New("Password contains invalid character") - } - if sharedSecret == "" { - return false, errors.New("Shared secret registration is disabled") - } - - adminString := "notadmin" - if isAdmin { - adminString = "admin" - } - joined := strings.Join([]string{username, password, adminString}, "\x00") - - mac := hmac.New(sha1.New, []byte(sharedSecret)) - _, err := mac.Write([]byte(joined)) - if err != nil { - return false, err - } - expectedMAC := mac.Sum(nil) - - return hmac.Equal(givenMac, expectedMAC), nil -} - // checkFlows checks a single completed flow against another required one. If // one contains at least all of the stages that the other does, checkFlows // returns true. @@ -995,3 +934,34 @@ func RegisterAvailable( }, } } + +func handleSharedSecretRegistration(userAPI userapi.UserInternalAPI, sr *SharedSecretRegistration, req *http.Request) util.JSONResponse { + ssrr, err := NewSharedSecretRegistrationRequest(req.Body) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON(fmt.Sprintf("malformed json: %s", err)), + } + } + valid, err := sr.IsValidMacLogin(ssrr.Nonce, ssrr.User, ssrr.Password, ssrr.Admin, ssrr.MacBytes) + if err != nil { + return util.ErrorResponse(err) + } + if !valid { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("bad mac"), + } + } + // downcase capitals + ssrr.User = strings.ToLower(ssrr.User) + + if resErr := validateUsername(ssrr.User); resErr != nil { + return *resErr + } + if resErr := validatePassword(ssrr.Password); resErr != nil { + return *resErr + } + deviceID := "shared_secret_registration" + return completeRegistration(req.Context(), userAPI, ssrr.User, ssrr.Password, "", req.RemoteAddr, req.UserAgent(), false, &ssrr.User, &deviceID) +} diff --git a/clientapi/routing/register_secret.go b/clientapi/routing/register_secret.go new file mode 100644 index 000000000..f0436e322 --- /dev/null +++ b/clientapi/routing/register_secret.go @@ -0,0 +1,99 @@ +package routing + +import ( + "context" + "crypto/hmac" + "crypto/sha1" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "strings" + "time" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/util" + cache "github.com/patrickmn/go-cache" +) + +type SharedSecretRegistrationRequest struct { + User string `json:"username"` + Password string `json:"password"` + Nonce string `json:"nonce"` + MacBytes []byte + MacStr string `json:"mac"` + Admin bool `json:"admin"` +} + +func NewSharedSecretRegistrationRequest(reader io.ReadCloser) (*SharedSecretRegistrationRequest, error) { + defer internal.CloseAndLogIfError(context.Background(), reader, "NewSharedSecretRegistrationRequest: failed to close request body") + var ssrr SharedSecretRegistrationRequest + err := json.NewDecoder(reader).Decode(&ssrr) + if err != nil { + return nil, err + } + ssrr.MacBytes, err = hex.DecodeString(ssrr.MacStr) + return &ssrr, err +} + +type SharedSecretRegistration struct { + sharedSecret string + nonces *cache.Cache +} + +func NewSharedSecretRegistration(sharedSecret string) *SharedSecretRegistration { + return &SharedSecretRegistration{ + sharedSecret: sharedSecret, + // nonces live for 5mins, purge every 10mins + nonces: cache.New(5*time.Minute, 10*time.Minute), + } +} + +func (r *SharedSecretRegistration) GenerateNonce() string { + nonce := util.RandomString(16) + r.nonces.Set(nonce, true, cache.DefaultExpiration) + return nonce +} + +func (r *SharedSecretRegistration) validNonce(nonce string) bool { + _, exists := r.nonces.Get(nonce) + return exists +} + +func (r *SharedSecretRegistration) IsValidMacLogin( + nonce, username, password string, + isAdmin bool, + givenMac []byte, +) (bool, error) { + // Check that shared secret registration isn't disabled. + if r.sharedSecret == "" { + return false, errors.New("Shared secret registration is disabled") + } + if !r.validNonce(nonce) { + return false, fmt.Errorf("Incorrect or expired nonce: %s", nonce) + } + + // Check that username/password don't contain the HMAC delimiters. + if strings.Contains(username, "\x00") { + return false, errors.New("Username contains invalid character") + } + if strings.Contains(password, "\x00") { + return false, errors.New("Password contains invalid character") + } + + adminString := "notadmin" + if isAdmin { + adminString = "admin" + } + joined := strings.Join([]string{nonce, username, password, adminString}, "\x00") + + mac := hmac.New(sha1.New, []byte(r.sharedSecret)) + _, err := mac.Write([]byte(joined)) + if err != nil { + return false, err + } + expectedMAC := mac.Sum(nil) + + return hmac.Equal(givenMac, expectedMAC), nil +} diff --git a/clientapi/routing/register_secret_test.go b/clientapi/routing/register_secret_test.go new file mode 100644 index 000000000..e702b2152 --- /dev/null +++ b/clientapi/routing/register_secret_test.go @@ -0,0 +1,43 @@ +package routing + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/patrickmn/go-cache" +) + +func TestSharedSecretRegister(t *testing.T) { + // these values have come from a local synapse instance to ensure compatibility + jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`) + sharedSecret := "dendritetest" + + req, err := NewSharedSecretRegistrationRequest(ioutil.NopCloser(bytes.NewBuffer(jsonStr))) + if err != nil { + t.Fatalf("failed to read request: %s", err) + } + + r := NewSharedSecretRegistration(sharedSecret) + + // force the nonce to be known + r.nonces.Set(req.Nonce, true, cache.DefaultExpiration) + + valid, err := r.IsValidMacLogin(req.Nonce, req.User, req.Password, req.Admin, req.MacBytes) + if err != nil { + t.Fatalf("failed to check for valid mac: %s", err) + } + if !valid { + t.Errorf("mac login failed, wanted success") + } + + // modify the mac so it fails + req.MacBytes[0] = 0xff + valid, err = r.IsValidMacLogin(req.Nonce, req.User, req.Password, req.Admin, req.MacBytes) + if err != nil { + t.Fatalf("failed to check for valid mac: %s", err) + } + if valid { + t.Errorf("mac login succeeded, wanted failure") + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 9f980e0a9..37279e8ed 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -37,6 +37,7 @@ import ( "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client @@ -46,7 +47,7 @@ import ( // applied: // nolint: gocyclo func Setup( - publicAPIMux *mux.Router, cfg *config.ClientAPI, + publicAPIMux, synapseAdminRouter *mux.Router, cfg *config.ClientAPI, eduAPI eduServerAPI.EDUServerInputAPI, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, @@ -88,6 +89,32 @@ func Setup( }), ).Methods(http.MethodGet, http.MethodOptions) + if cfg.RegistrationSharedSecret != "" { + logrus.Info("Enabling shared secret registration at /_synapse/admin/v1/register") + sr := NewSharedSecretRegistration(cfg.RegistrationSharedSecret) + synapseAdminRouter.Handle("/admin/v1/register", + httputil.MakeExternalAPI("shared_secret_registration", func(req *http.Request) util.JSONResponse { + if req.Method == http.MethodGet { + return util.JSONResponse{ + Code: 200, + JSON: struct { + Nonce string `json:"nonce"` + }{ + Nonce: sr.GenerateNonce(), + }, + } + } + if req.Method == http.MethodPost { + return handleSharedSecretRegistration(userAPI, sr, req) + } + return util.JSONResponse{ + Code: http.StatusMethodNotAllowed, + JSON: jsonerror.NotFound("unknown method"), + } + }), + ).Methods(http.MethodGet, http.MethodPost, http.MethodOptions) + } + r0mux := publicAPIMux.PathPrefix("/r0").Subrouter() unstableMux := publicAPIMux.PathPrefix("/unstable").Subrouter() diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index cc7dcf021..6b0e57d8b 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -197,6 +197,7 @@ func main() { base.Base.PublicFederationAPIMux, base.Base.PublicKeyAPIMux, base.Base.PublicMediaAPIMux, + base.Base.SynapseAdminMux, ) if err := mscs.Enable(&base.Base, &monolith); err != nil { logrus.WithError(err).Fatalf("Failed to enable MSCs") diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 72936e42e..2712ed4a1 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -210,6 +210,7 @@ func main() { base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicMediaAPIMux, + base.SynapseAdminMux, ) wsUpgrader := websocket.Upgrader{ diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 2d710ae79..abeefbe5a 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -154,6 +154,7 @@ func main() { base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicMediaAPIMux, + base.SynapseAdminMux, ) if err := mscs.Enable(base, &monolith); err != nil { logrus.WithError(err).Fatalf("Failed to enable MSCs") diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index ef349505c..5efbe8567 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -149,6 +149,7 @@ func main() { base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicMediaAPIMux, + base.SynapseAdminMux, ) if len(base.Cfg.MSCs.MSCs) > 0 { diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index ec445ceb7..5e0c43548 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -33,7 +33,7 @@ func ClientAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation, + base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, &cfg.MSCs, ) diff --git a/cmd/dendrite-upgrade-tests/main.go b/cmd/dendrite-upgrade-tests/main.go new file mode 100644 index 000000000..a6cc2d3f9 --- /dev/null +++ b/cmd/dendrite-upgrade-tests/main.go @@ -0,0 +1,503 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" + "path" + "regexp" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/Masterminds/semver/v3" + "github.com/codeclysm/extract" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/mount" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" + "github.com/docker/go-connections/nat" +) + +var ( + flagTempDir = flag.String("tmp", "tmp", "Path to temporary directory to dump tarballs to") + flagFrom = flag.String("from", "HEAD-1", "The version to start from e.g '0.3.1'. If 'HEAD-N' then starts N versions behind HEAD.") + flagTo = flag.String("to", "HEAD", "The version to end on e.g '0.3.3'.") + flagBuildConcurrency = flag.Int("build-concurrency", runtime.NumCPU(), "The amount of build concurrency when building images") + flagHead = flag.String("head", "", "Location to a dendrite repository to treat as HEAD instead of Github") + flagDockerHost = flag.String("docker-host", "localhost", "The hostname of the docker client. 'localhost' if running locally, 'host.docker.internal' if running in Docker.") + alphaNumerics = regexp.MustCompile("[^a-zA-Z0-9]+") +) + +const HEAD = "HEAD" + +// Embed the Dockerfile to use when building dendrite versions. +// We cannot use the dockerfile associated with the repo with each version sadly due to changes in +// Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients +// due to the error: +// When using COPY with more than one source file, the destination must be a directory and end with a / +// We need to run a postgres anyway, so use the dockerfile associated with Complement instead. +const Dockerfile = `FROM golang:1.13-stretch as build +RUN apt-get update && apt-get install -y postgresql +WORKDIR /build + +# Copy the build context to the repo as this is the right dendrite code. This is different to the +# Complement Dockerfile which wgets a branch. +COPY . . + +RUN go build ./cmd/dendrite-monolith-server +RUN go build ./cmd/generate-keys +RUN go build ./cmd/generate-config +RUN ./generate-config --ci > dendrite.yaml +RUN ./generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key + +# Replace the connection string with a single postgres DB, using user/db = 'postgres' and no password +RUN sed -i "s%connection_string:.*$%connection_string: postgresql://postgres@localhost/postgres?sslmode=disable%g" dendrite.yaml +# No password when connecting over localhost +RUN sed -i "s%127.0.0.1/32 md5%127.0.0.1/32 trust%g" /etc/postgresql/9.6/main/pg_hba.conf +# Bump up max conns for moar concurrency +RUN sed -i 's/max_connections = 100/max_connections = 2000/g' /etc/postgresql/9.6/main/postgresql.conf +RUN sed -i 's/max_open_conns:.*$/max_open_conns: 100/g' dendrite.yaml + +# This entry script starts postgres, waits for it to be up then starts dendrite +RUN echo '\ +#!/bin/bash -eu \n\ +pg_lsclusters \n\ +pg_ctlcluster 9.6 main start \n\ + \n\ +until pg_isready \n\ +do \n\ + echo "Waiting for postgres"; \n\ + sleep 1; \n\ +done \n\ + \n\ +sed -i "s/server_name: localhost/server_name: ${SERVER_NAME}/g" dendrite.yaml \n\ +./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml \n\ +' > run_dendrite.sh && chmod +x run_dendrite.sh + +ENV SERVER_NAME=localhost +EXPOSE 8008 8448 +CMD /build/run_dendrite.sh ` + +const dendriteUpgradeTestLabel = "dendrite_upgrade_test" + +// downloadArchive downloads an arbitrary github archive of the form: +// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz +// and re-tarballs it without the top-level directory which contains branch information. It inserts +// the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that +// you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile. +// Returns the tarball buffer on success. +func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []byte) (*bytes.Buffer, error) { + resp, err := cli.Get(archiveURL) + if err != nil { + return nil, err + } + // nolint:errcheck + defer resp.Body.Close() + if resp.StatusCode != 200 { + return nil, fmt.Errorf("got HTTP %d", resp.StatusCode) + } + _ = os.RemoveAll(tmpDir) + if err = os.Mkdir(tmpDir, os.ModePerm); err != nil { + return nil, fmt.Errorf("failed to make temporary directory: %s", err) + } + // nolint:errcheck + defer os.RemoveAll(tmpDir) + // dump the tarball temporarily, stripping the top-level directory + err = extract.Archive(context.Background(), resp.Body, tmpDir, func(inPath string) string { + // remove top level + segments := strings.Split(inPath, "/") + return strings.Join(segments[1:], "/") + }) + if err != nil { + return nil, err + } + // add top level Dockerfile + err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm) + if err != nil { + return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err) + } + // now re-tarball it :/ + var tarball bytes.Buffer + err = compress(tmpDir, &tarball) + if err != nil { + return nil, err + } + return &tarball, nil +} + +// buildDendrite builds Dendrite on the branchOrTagName given. Returns the image ID or an error +func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir, branchOrTagName string) (string, error) { + var tarball *bytes.Buffer + var err error + // If a custom HEAD location is given, use that, else pull from github. Mostly useful for CI + // where we want to use the working directory. + if branchOrTagName == HEAD && *flagHead != "" { + log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead) + // add top level Dockerfile + err = ioutil.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm) + if err != nil { + return "", fmt.Errorf("Custom HEAD: failed to inject /Dockerfile: %w", err) + } + // now tarball it + var buffer bytes.Buffer + err = compress(*flagHead, &buffer) + if err != nil { + return "", fmt.Errorf("failed to tarball custom HEAD %s : %s", *flagHead, err) + } + tarball = &buffer + } else { + log.Printf("%s: Downloading version %s to %s\n", branchOrTagName, branchOrTagName, tmpDir) + // pull an archive, this contains a top-level directory which screws with the build context + // which we need to fix up post download + u := fmt.Sprintf("https://github.com/matrix-org/dendrite/archive/%s.tar.gz", branchOrTagName) + tarball, err = downloadArchive(httpClient, tmpDir, u, []byte(Dockerfile)) + if err != nil { + return "", fmt.Errorf("failed to download archive %s: %w", u, err) + } + log.Printf("%s: %s => %d bytes\n", branchOrTagName, u, tarball.Len()) + } + + log.Printf("%s: Building version %s\n", branchOrTagName, branchOrTagName) + res, err := dockerClient.ImageBuild(context.Background(), tarball, types.ImageBuildOptions{ + Tags: []string{"dendrite-upgrade"}, + }) + if err != nil { + return "", fmt.Errorf("failed to start building image: %s", err) + } + // nolint:errcheck + defer res.Body.Close() + decoder := json.NewDecoder(res.Body) + // {"aux":{"ID":"sha256:247082c717963bc2639fc2daed08838d67811ea12356cd4fda43e1ffef94f2eb"}} + var imageID string + for decoder.More() { + var dl struct { + Stream string `json:"stream"` + Aux map[string]interface{} `json:"aux"` + } + if err := decoder.Decode(&dl); err != nil { + return "", fmt.Errorf("failed to decode build image output line: %w", err) + } + log.Printf("%s: %s", branchOrTagName, dl.Stream) + if dl.Aux != nil { + imgID, ok := dl.Aux["ID"] + if ok { + imageID = imgID.(string) + } + } + } + return imageID, nil +} + +func getAndSortVersionsFromGithub(httpClient *http.Client) (semVers []*semver.Version, err error) { + u := "https://api.github.com/repos/matrix-org/dendrite/tags" + res, err := httpClient.Get(u) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + return nil, fmt.Errorf("%s returned HTTP %d", u, res.StatusCode) + } + resp := []struct { + Name string `json:"name"` + }{} + if err = json.NewDecoder(res.Body).Decode(&resp); err != nil { + return nil, err + } + for _, r := range resp { + v, err := semver.NewVersion(r.Name) + if err != nil { + continue // not a semver, that's ok and isn't an error, we allow tags that aren't semvers + } + semVers = append(semVers, v) + } + sort.Sort(semver.Collection(semVers)) + return semVers, nil +} + +func calculateVersions(cli *http.Client, from, to string) []string { + semvers, err := getAndSortVersionsFromGithub(cli) + if err != nil { + log.Fatalf("failed to collect semvers from github: %s", err) + } + // snip the lower bound depending on --from + if from != "" { + if strings.HasPrefix(from, "HEAD-") { + var headN int + headN, err = strconv.Atoi(strings.TrimPrefix(from, "HEAD-")) + if err != nil { + log.Fatalf("invalid --from, try 'HEAD-1'") + } + if headN >= len(semvers) { + log.Fatalf("only have %d versions, but asked to go to HEAD-%d", len(semvers), headN) + } + if headN > 0 { + semvers = semvers[len(semvers)-headN:] + } + } else { + fromVer, err := semver.NewVersion(from) + if err != nil { + log.Fatalf("invalid --from: %s", err) + } + i := 0 + for i = 0; i < len(semvers); i++ { + if semvers[i].LessThan(fromVer) { + continue + } + break + } + semvers = semvers[i:] + } + } + if to != "" && to != HEAD { + toVer, err := semver.NewVersion(to) + if err != nil { + log.Fatalf("invalid --to: %s", err) + } + var i int + for i = len(semvers) - 1; i >= 0; i-- { + if semvers[i].GreaterThan(toVer) { + continue + } + break + } + semvers = semvers[:i+1] + } + var versions []string + for _, sv := range semvers { + versions = append(versions, sv.Original()) + } + if to == HEAD { + versions = append(versions, HEAD) + } + return versions +} + +func buildDendriteImages(httpClient *http.Client, dockerClient *client.Client, baseTempDir string, concurrency int, branchOrTagNames []string) map[string]string { + // concurrently build all versions, this can be done in any order. The mutex protects the map + branchToImageID := make(map[string]string) + var mu sync.Mutex + + var wg sync.WaitGroup + wg.Add(concurrency) + ch := make(chan string, len(branchOrTagNames)) + for _, branchName := range branchOrTagNames { + ch <- branchName + } + close(ch) + + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for branchName := range ch { + tmpDir := baseTempDir + alphaNumerics.ReplaceAllString(branchName, "") + imgID, err := buildDendrite(httpClient, dockerClient, tmpDir, branchName) + if err != nil { + log.Fatalf("%s: failed to build dendrite image: %s", branchName, err) + } + mu.Lock() + branchToImageID[branchName] = imgID + mu.Unlock() + } + }() + } + wg.Wait() + return branchToImageID +} + +func runImage(dockerClient *client.Client, volumeName, version, imageID string) (csAPIURL, containerID string, err error) { + log.Printf("%s: running image %s\n", version, imageID) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + body, err := dockerClient.ContainerCreate(ctx, &container.Config{ + Image: imageID, + Env: []string{"SERVER_NAME=hs1"}, + Labels: map[string]string{ + dendriteUpgradeTestLabel: "yes", + }, + }, &container.HostConfig{ + PublishAllPorts: true, + Mounts: []mount.Mount{ + { + Type: mount.TypeVolume, + Source: volumeName, + Target: "/var/lib/postgresql/9.6/main", + }, + }, + }, nil, nil, "dendrite_upgrade_test_"+version) + if err != nil { + return "", "", fmt.Errorf("failed to ContainerCreate: %s", err) + } + containerID = body.ID + + err = dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}) + if err != nil { + return "", "", fmt.Errorf("failed to ContainerStart: %s", err) + } + inspect, err := dockerClient.ContainerInspect(ctx, containerID) + if err != nil { + return "", "", err + } + csapiPortInfo, ok := inspect.NetworkSettings.Ports[nat.Port("8008/tcp")] + if !ok { + return "", "", fmt.Errorf("port 8008 not exposed - exposed ports: %v", inspect.NetworkSettings.Ports) + } + baseURL := fmt.Sprintf("http://%s:%s", *flagDockerHost, csapiPortInfo[0].HostPort) + versionsURL := fmt.Sprintf("%s/_matrix/client/versions", baseURL) + // hit /versions to check it is up + var lastErr error + for i := 0; i < 500; i++ { + res, err := http.Get(versionsURL) + if err != nil { + lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err) + time.Sleep(50 * time.Millisecond) + continue + } + if res.StatusCode != 200 { + lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status) + time.Sleep(50 * time.Millisecond) + continue + } + lastErr = nil + break + } + if lastErr != nil { + logs, err := dockerClient.ContainerLogs(context.Background(), containerID, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + }) + // ignore errors when cannot get logs, it's just for debugging anyways + if err == nil { + logbody, err := ioutil.ReadAll(logs) + if err == nil { + log.Printf("Container logs:\n\n%s\n\n", string(logbody)) + } + } + } + return baseURL, containerID, lastErr +} + +func destroyContainer(dockerClient *client.Client, containerID string) { + err := dockerClient.ContainerRemove(context.TODO(), containerID, types.ContainerRemoveOptions{ + Force: true, + }) + if err != nil { + log.Printf("failed to remove container %s : %s", containerID, err) + } +} + +func loadAndRunTests(dockerClient *client.Client, volumeName, v string, branchToImageID map[string]string) error { + csAPIURL, containerID, err := runImage(dockerClient, volumeName, v, branchToImageID[v]) + if err != nil { + return fmt.Errorf("failed to run container for branch %v: %v", v, err) + } + defer destroyContainer(dockerClient, containerID) + log.Printf("URL %s -> %s \n", csAPIURL, containerID) + if err = runTests(csAPIURL, v); err != nil { + return fmt.Errorf("failed to run tests on version %s: %s", v, err) + } + return nil +} + +func verifyTests(dockerClient *client.Client, volumeName string, versions []string, branchToImageID map[string]string) error { + lastVer := versions[len(versions)-1] + csAPIURL, containerID, err := runImage(dockerClient, volumeName, lastVer, branchToImageID[lastVer]) + if err != nil { + return fmt.Errorf("failed to run container for branch %v: %v", lastVer, err) + } + defer destroyContainer(dockerClient, containerID) + return verifyTestsRan(csAPIURL, versions) +} + +// cleanup old containers/volumes from a previous run +func cleanup(dockerClient *client.Client) { + // ignore all errors, we are just cleaning up and don't want to fail just because we fail to cleanup + containers, _ := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{ + Filters: label(dendriteUpgradeTestLabel), + }) + for _, c := range containers { + s := time.Second + _ = dockerClient.ContainerStop(context.Background(), c.ID, &s) + _ = dockerClient.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{ + Force: true, + }) + } + _ = dockerClient.VolumeRemove(context.Background(), "dendrite_upgrade_test", true) +} + +func label(in string) filters.Args { + f := filters.NewArgs() + f.Add("label", in) + return f +} + +func main() { + flag.Parse() + httpClient := &http.Client{ + Timeout: 60 * time.Second, + } + dockerClient, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + log.Fatalf("failed to make docker client: %s", err) + } + if *flagFrom == "" { + flag.Usage() + os.Exit(1) + } + cleanup(dockerClient) + versions := calculateVersions(httpClient, *flagFrom, *flagTo) + log.Printf("Testing dendrite versions: %v\n", versions) + + branchToImageID := buildDendriteImages(httpClient, dockerClient, *flagTempDir, *flagBuildConcurrency, versions) + + // make a shared postgres volume + volume, err := dockerClient.VolumeCreate(context.Background(), volume.VolumeCreateBody{ + Name: "dendrite_upgrade_test", + Labels: map[string]string{ + dendriteUpgradeTestLabel: "yes", + }, + }) + if err != nil { + log.Fatalf("failed to make docker volume: %s", err) + } + + failed := false + defer func() { + perr := recover() + log.Println("removing postgres volume") + verr := dockerClient.VolumeRemove(context.Background(), volume.Name, true) + if perr == nil { + perr = verr + } + if perr != nil { + panic(perr) + } + if failed { + os.Exit(1) + } + }() + + // run through images sequentially + for _, v := range versions { + if err = loadAndRunTests(dockerClient, volume.Name, v, branchToImageID); err != nil { + log.Printf("failed to run tests for %v: %s\n", v, err) + failed = true + break + } + } + if err := verifyTests(dockerClient, volume.Name, versions, branchToImageID); err != nil { + log.Printf("failed to verify test results: %s", err) + failed = true + } +} diff --git a/cmd/dendrite-upgrade-tests/tar.go b/cmd/dendrite-upgrade-tests/tar.go new file mode 100644 index 000000000..fd45424db --- /dev/null +++ b/cmd/dendrite-upgrade-tests/tar.go @@ -0,0 +1,63 @@ +package main + +import ( + "archive/tar" + "compress/gzip" + "io" + "os" + "path/filepath" + "strings" +) + +// From https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726 +// Modified to strip off top-level when compressing +func compress(src string, buf io.Writer) error { + // tar > gzip > buf + zr := gzip.NewWriter(buf) + tw := tar.NewWriter(zr) + + // walk through every file in the folder + err := filepath.Walk(src, func(file string, fi os.FileInfo, e error) error { + // generate tar header + header, err := tar.FileInfoHeader(fi, file) + if err != nil { + return err + } + + // must provide real name + // (see https://golang.org/src/archive/tar/common.go?#L626) + header.Name = strings.TrimPrefix(filepath.ToSlash(file), src+"/") + // write header + if err := tw.WriteHeader(header); err != nil { + return err + } + // if not a dir, write file content + if !fi.IsDir() { + data, err := os.Open(file) + if err != nil { + return err + } + if _, err = io.Copy(tw, data); err != nil { + return err + } + if err = data.Close(); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + // produce tar + if err := tw.Close(); err != nil { + return err + } + // produce gzip + if err := zr.Close(); err != nil { + return err + } + // + return nil +} diff --git a/cmd/dendrite-upgrade-tests/tests.go b/cmd/dendrite-upgrade-tests/tests.go new file mode 100644 index 000000000..e02af92a9 --- /dev/null +++ b/cmd/dendrite-upgrade-tests/tests.go @@ -0,0 +1,192 @@ +package main + +import ( + "fmt" + "log" + "strings" + + "github.com/matrix-org/gomatrix" + "github.com/matrix-org/gomatrixserverlib" +) + +const userPassword = "this_is_a_long_password" + +type user struct { + userID string + localpart string + client *gomatrix.Client +} + +// runTests performs the following operations: +// - register alice and bob with branch name muxed into the localpart +// - create a DM room for the 2 users and exchange messages +// - create/join a public #global room and exchange messages +func runTests(baseURL, branchName string) error { + // register 2 users + users := []user{ + { + localpart: "alice" + branchName, + }, + { + localpart: "bob" + branchName, + }, + } + for i, u := range users { + client, err := gomatrix.NewClient(baseURL, "", "") + if err != nil { + return err + } + resp, err := client.RegisterDummy(&gomatrix.ReqRegister{ + Username: strings.ToLower(u.localpart), + Password: userPassword, + }) + if err != nil { + return fmt.Errorf("failed to register %s: %s", u.localpart, err) + } + client, err = gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken) + if err != nil { + return err + } + users[i].client = client + users[i].userID = resp.UserID + } + + // create DM room, join it and exchange messages + createRoomResp, err := users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{ + Preset: "trusted_private_chat", + Invite: []string{users[1].userID}, + IsDirect: true, + }) + if err != nil { + return fmt.Errorf("failed to create DM room: %s", err) + } + dmRoomID := createRoomResp.RoomID + if _, err = users[1].client.JoinRoom(dmRoomID, "", nil); err != nil { + return fmt.Errorf("failed to join DM room: %s", err) + } + msgs := []struct { + client *gomatrix.Client + text string + }{ + { + client: users[0].client, text: "1: " + branchName, + }, + { + client: users[1].client, text: "2: " + branchName, + }, + { + client: users[0].client, text: "3: " + branchName, + }, + { + client: users[1].client, text: "4: " + branchName, + }, + } + for _, msg := range msgs { + _, err = msg.client.SendText(dmRoomID, msg.text) + if err != nil { + return fmt.Errorf("failed to send text in dm room: %s", err) + } + } + + // attempt to create/join the shared public room + publicRoomID := "" + createRoomResp, err = users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{ + RoomAliasName: "global", + Preset: "public_chat", + }) + if err != nil { // this is okay and expected if the room already exists and the aliases clash + // try to join it + _, domain, err2 := gomatrixserverlib.SplitID('@', users[0].userID) + if err2 != nil { + return fmt.Errorf("failed to split user ID: %s, %s", users[0].userID, err2) + } + joinRoomResp, err2 := users[0].client.JoinRoom(fmt.Sprintf("#global:%s", domain), "", nil) + if err2 != nil { + return fmt.Errorf("alice failed to join public room: %s", err2) + } + publicRoomID = joinRoomResp.RoomID + } else { + publicRoomID = createRoomResp.RoomID + } + if _, err = users[1].client.JoinRoom(publicRoomID, "", nil); err != nil { + return fmt.Errorf("bob failed to join public room: %s", err) + } + // send messages + for _, msg := range msgs { + _, err = msg.client.SendText(publicRoomID, "public "+msg.text) + if err != nil { + return fmt.Errorf("failed to send text in public room: %s", err) + } + } + log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID) + return nil +} + +// verifyTestsRan checks that the HS has the right rooms/messages +func verifyTestsRan(baseURL string, branchNames []string) error { + log.Println("Verifying tests....") + // check we can login as all users + var resp *gomatrix.RespLogin + for _, branchName := range branchNames { + client, err := gomatrix.NewClient(baseURL, "", "") + if err != nil { + return err + } + userLocalparts := []string{ + "alice" + branchName, + "bob" + branchName, + } + for _, userLocalpart := range userLocalparts { + resp, err = client.Login(&gomatrix.ReqLogin{ + Type: "m.login.password", + User: strings.ToLower(userLocalpart), + Password: userPassword, + }) + if err != nil { + return fmt.Errorf("failed to login as %s: %s", userLocalpart, err) + } + if resp.AccessToken == "" { + return fmt.Errorf("failed to login, bad response: %+v", resp) + } + } + } + log.Println(" accounts exist: OK") + client, err := gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken) + if err != nil { + return err + } + _, domain, err := gomatrixserverlib.SplitID('@', client.UserID) + if err != nil { + return err + } + u := client.BuildURL("directory", "room", fmt.Sprintf("#global:%s", domain)) + r := struct { + RoomID string `json:"room_id"` + }{} + err = client.MakeRequest("GET", u, nil, &r) + if err != nil { + return fmt.Errorf("failed to /directory: %s", err) + } + if r.RoomID == "" { + return fmt.Errorf("/directory lookup returned no room ID") + } + log.Println(" public room exists: OK") + + history, err := client.Messages(r.RoomID, client.Store.LoadNextBatch(client.UserID), "", 'b', 100) + if err != nil { + return fmt.Errorf("failed to get /messages: %s", err) + } + // we expect 4 messages per version + msgCount := 0 + for _, ev := range history.Chunk { + if ev.Type == "m.room.message" { + msgCount += 1 + } + } + wantMsgCount := len(branchNames) * 4 + if msgCount != wantMsgCount { + return fmt.Errorf("got %d messages in global room, want %d", msgCount, wantMsgCount) + } + log.Println(" messages exist: OK") + return nil +} diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go index 433e9bf82..25e496909 100644 --- a/cmd/dendritejs-pinecone/main.go +++ b/cmd/dendritejs-pinecone/main.go @@ -215,6 +215,7 @@ func main() { base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicMediaAPIMux, + base.SynapseAdminMux, ) httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 7ece94ff0..d5a845ae0 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -236,6 +236,7 @@ func main() { base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicMediaAPIMux, + base.SynapseAdminMux, ) httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index ae9a63fc2..5f214e0fc 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -205,7 +205,7 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(context.Background()) + resp, jsonErr := t.processTransaction(httpReq.Context()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -232,7 +232,8 @@ type txnReq struct { // something that can tell us about which servers are in a room right now servers federationAPI.ServersInRoomProvider // a list of events from the auth and prev events which we already had - hadEvents map[string]bool + hadEvents map[string]bool + hadEventsMutex sync.Mutex // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. haveEvents map[string]*gomatrixserverlib.HeaderedEvent @@ -240,6 +241,12 @@ type txnReq struct { work string // metrics } +func (t *txnReq) hadEvent(eventID string, had bool) { + t.hadEventsMutex.Lock() + defer t.hadEventsMutex.Unlock() + t.hadEvents[eventID] = had +} + // A subset of FederationClient functionality that txn requires. Useful for testing. type txnFederationClient interface { LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( @@ -253,11 +260,8 @@ type txnFederationClient interface { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { results := make(map[string]gomatrixserverlib.PDUResult) - //var resultsMutex sync.Mutex - var wg sync.WaitGroup var tasks []*inputTask - wg.Add(1) // for processEDUs for _, pdu := range t.PDUs { pduCountTotal.WithLabelValues("total").Inc() @@ -313,9 +317,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res input: newSendFIFOQueue(), }) worker := v.(*inputWorker) - if !worker.running.Load() { - go worker.run() - } wg.Add(1) task := &inputTask{ ctx: ctx, @@ -325,13 +326,12 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } tasks = append(tasks, task) worker.input.push(task) + if worker.running.CAS(false, true) { + go worker.run() + } } - go func() { - defer wg.Done() - t.processEDUs(ctx) - }() - + t.processEDUs(ctx) wg.Wait() for _, task := range tasks { @@ -351,9 +351,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } func (t *inputWorker) run() { - if !t.running.CAS(false, true) { - return - } defer t.running.Store(false) for { task, ok := t.input.pop() @@ -368,10 +365,14 @@ func (t *inputWorker) run() { select { case <-task.ctx.Done(): task.err = context.DeadlineExceeded + pduCountTotal.WithLabelValues("expired").Inc() return default: evStart := time.Now() - task.err = task.t.processEvent(task.ctx, task.event) + // TODO: Is 5 minutes too long? + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + task.err = task.t.processEvent(ctx, task.event) + cancel() task.duration = time.Since(evStart) if err := task.err; err != nil { switch err.(type) { @@ -572,6 +573,23 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) t.work = "" // reset from previous event + // Ask the roomserver if we know about the room and/or if we're joined + // to it. If we aren't then we won't bother processing the event. + joinedReq := api.QueryServerJoinedToRoomRequest{ + RoomID: e.RoomID(), + } + var joinedRes api.QueryServerJoinedToRoomResponse + if err := t.rsAPI.QueryServerJoinedToRoom(ctx, &joinedReq, &joinedRes); err != nil { + return fmt.Errorf("t.rsAPI.QueryServerJoinedToRoom: %w", err) + } + + if !joinedRes.RoomExists || !joinedRes.IsInRoom { + // We don't believe we're a member of this room, therefore there's + // no point in wasting work trying to figure out what to do with + // missing auth or prev events. Drop the event. + return roomNotFoundError{e.RoomID()} + } + // Work out if the roomserver knows everything it needs to know to auth // the event. This includes the prev_events and auth_events. // NOTE! This is going to include prev_events that have an empty state @@ -588,23 +606,13 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err) } - if !stateResp.RoomExists { - // TODO: When synapse receives a message for a room it is not in it - // asks the remote server for the state of the room so that it can - // check if the remote server knows of a join "m.room.member" event - // that this server is unaware of. - // However generally speaking we should reject events for rooms we - // aren't a member of. - return roomNotFoundError{e.RoomID()} - } - // Prepare a map of all the events we already had before this point, so // that we don't send them to the roomserver again. for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) { - t.hadEvents[eventID] = true + t.hadEvent(eventID, true) } for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) { - t.hadEvents[eventID] = false + t.hadEvent(eventID, false) } if len(stateResp.MissingAuthEventIDs) > 0 { @@ -679,7 +687,7 @@ withNextEvent: ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } - t.hadEvents[ev.EventID()] = true // if the roomserver didn't know about the event before, it does now + t.hadEvent(ev.EventID(), true) // if the roomserver didn't know about the event before, it does now t.cacheAndReturn(ev.Headered(stateResp.RoomVersion)) delete(missingAuthEvents, missingAuthEventID) continue withNextEvent @@ -703,14 +711,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv return gomatrixserverlib.Allowed(e, &authUsingState) } -var processEventWithMissingStateMutexes = internal.NewMutexByRoom() - func (t *txnReq) processEventWithMissingState( ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ) error { - processEventWithMissingStateMutexes.Lock(e.RoomID()) - defer processEventWithMissingStateMutexes.Unlock(e.RoomID()) - // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: @@ -812,14 +815,23 @@ func (t *txnReq) processEventWithMissingState( // First of all, send the backward extremity into the roomserver with the // newly resolved state. This marks the "oldest" point in the backfill and - // sets the baseline state for any new events after this. + // sets the baseline state for any new events after this. We'll make a + // copy of the hadEvents map so that it can be taken downstream without + // worrying about concurrent map reads/writes, since t.hadEvents is meant + // to be protected by a mutex. + hadEvents := map[string]bool{} + t.hadEventsMutex.Lock() + for k, v := range t.hadEvents { + hadEvents[k] = v + } + t.hadEventsMutex.Unlock() err = api.SendEventWithState( context.Background(), t.rsAPI, api.KindOld, resolvedState, backwardsExtremity.Headered(roomVersion), - t.hadEvents, + hadEvents, ) if err != nil { return fmt.Errorf("api.SendEventWithState: %w", err) @@ -915,7 +927,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event // set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this // processEvent request, which is better for memory. stateEvents[i] = t.cacheAndReturn(ev) - t.hadEvents[ev.EventID()] = true + t.hadEvent(ev.EventID(), true) } // we should never access res.StateEvents again so we delete it here to make GC faster res.StateEvents = nil @@ -950,7 +962,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event } for i, ev := range queryRes.Events { authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap()) - t.hadEvents[ev.EventID()] = true + t.hadEvent(ev.EventID(), true) } queryRes.Events = nil } @@ -1027,7 +1039,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even latestEvents := make([]string, len(res.LatestEvents)) for i, ev := range res.LatestEvents { latestEvents[i] = res.LatestEvents[i].EventID - t.hadEvents[ev.EventID] = true + t.hadEvent(ev.EventID, true) } var missingResp *gomatrixserverlib.RespMissingEvents @@ -1163,7 +1175,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even } for i, ev := range queryRes.Events { queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i]) - t.hadEvents[ev.EventID()] = true + t.hadEvent(ev.EventID(), true) evID := queryRes.Events[i].EventID() if missing[evID] { delete(missing, evID) diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 98ff1a0a3..0da06aa95 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -190,7 +190,9 @@ func (t *testRoomserverAPI) QueryServerJoinedToRoom( request *api.QueryServerJoinedToRoomRequest, response *api.QueryServerJoinedToRoomResponse, ) error { - return fmt.Errorf("not implemented") + response.RoomExists = true + response.IsInRoom = true + return nil } // Query whether a server is allowed to see an event diff --git a/go.mod b/go.mod index b02454c21..eeb4a7842 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,19 @@ module github.com/matrix-org/dendrite require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect + github.com/Masterminds/semver/v3 v3.1.1 github.com/Shopify/sarama v1.28.0 + github.com/codeclysm/extract v2.2.0+incompatible + github.com/containerd/containerd v1.5.2 // indirect + github.com/docker/docker v20.10.7+incompatible + github.com/docker/go-connections v0.4.0 github.com/getsentry/sentry-go v0.10.0 github.com/gologme/log v1.2.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 + github.com/h2non/filetype v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.4 + github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 // indirect github.com/lib/pq v1.9.0 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-circuit v0.4.0 @@ -28,9 +35,11 @@ require ( github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb + github.com/morikuni/aec v1.0.0 // indirect github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6 github.com/opentracing/opentracing-go v1.2.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/pressly/goose v2.7.0+incompatible github.com/prometheus/client_golang v1.9.0 @@ -41,7 +50,7 @@ require ( github.com/uber/jaeger-lib v2.4.0+incompatible github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa go.uber.org/atomic v1.7.0 - golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 + golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 diff --git a/internal/log.go b/internal/log.go index 0f374bd4a..d2b233c5b 100644 --- a/internal/log.go +++ b/internal/log.go @@ -73,9 +73,9 @@ func callerPrettyfier(f *runtime.Frame) (string, string) { // Append a newline + tab to it to move the actual log content to its own line funcname += "\n\t" - // Surround the filepath in brackets and append line number so IDEs can quickly - // navigate - filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line) + // Use a shortened file path which just has the filename to avoid having lots of redundant + // directories which contribute significantly to overall log sizes! + filename := fmt.Sprintf(" [%s:%d]", path.Base(f.File), f.Line) return funcname, filename } diff --git a/roomserver/api/query.go b/roomserver/api/query.go index af35f7e72..c70db65c1 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -170,7 +170,8 @@ type QueryMembershipsForRoomResponse struct { // QueryServerJoinedToRoomRequest is a request to QueryServerJoinedToRoom type QueryServerJoinedToRoomRequest struct { - // Server name of the server to find + // Server name of the server to find. If not specified, we will + // default to checking if the local server is joined. ServerName gomatrixserverlib.ServerName `json:"server_name"` // ID of the room to see if we are still joined to RoomID string `json:"room_id"` @@ -182,7 +183,8 @@ type QueryServerJoinedToRoomResponse struct { RoomExists bool `json:"room_exists"` // True if we still believe that we are participating in the room IsInRoom bool `json:"is_in_room"` - // List of servers that are also in the room + // List of servers that are also in the room. This will not be populated + // if the queried ServerName is the local server name. ServerNames []gomatrixserverlib.ServerName `json:"server_names"` } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index c9f92f9ff..f39b26eaf 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -59,6 +59,7 @@ func NewRoomserverAPI( Queryer: &query.Queryer{ DB: roomserverDB, Cache: caches, + ServerName: cfg.Matrix.ServerName, ServerACLs: serverACLs, }, Inputer: &input.Inputer{ @@ -92,6 +93,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen FSAPI: r.fsAPI, RSAPI: r, Inputer: r.Inputer, + Queryer: r.Queryer, } r.Peeker = &perform.Peeker{ ServerName: r.Cfg.Matrix.ServerName, diff --git a/roomserver/internal/helpers/helpers.go b/roomserver/internal/helpers/helpers.go index a829bffca..a389cc898 100644 --- a/roomserver/internal/helpers/helpers.go +++ b/roomserver/internal/helpers/helpers.go @@ -50,6 +50,10 @@ func UpdateToInviteMembership( return updates, nil } +// IsServerCurrentlyInRoom checks if a server is in a given room, based on the room +// memberships. If the servername is not supplied then the local server will be +// checked instead using a faster code path. +// TODO: This should probably be replaced by an API call. func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { info, err := db.RoomInfo(ctx, roomID) if err != nil { @@ -59,6 +63,10 @@ func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverNam return false, fmt.Errorf("unknown room %s", roomID) } + if serverName == "" { + return db.GetLocalServerInRoom(ctx, info.RoomNID) + } + eventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) if err != nil { return false, err diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go index 048496d45..876888e29 100644 --- a/roomserver/internal/perform/perform_join.go +++ b/roomserver/internal/perform/perform_join.go @@ -28,6 +28,7 @@ import ( rsAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/input" + "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" @@ -42,6 +43,7 @@ type Joiner struct { DB storage.Database Inputer *input.Inputer + Queryer *query.Queryer } // PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender. @@ -205,7 +207,14 @@ func (r *Joiner) performJoinRoomByID( // Force a federated join if we aren't in the room and we've been // given some server names to try joining by. - serverInRoom, _ := helpers.IsServerCurrentlyInRoom(ctx, r.DB, r.ServerName, req.RoomIDOrAlias) + inRoomReq := &api.QueryServerJoinedToRoomRequest{ + RoomID: req.RoomIDOrAlias, + } + inRoomRes := &api.QueryServerJoinedToRoomResponse{} + if err = r.Queryer.QueryServerJoinedToRoom(ctx, inRoomReq, inRoomRes); err != nil { + return "", "", fmt.Errorf("r.Queryer.QueryServerJoinedToRoom: %w", err) + } + serverInRoom := inRoomRes.IsInRoom forceFederatedJoin := len(req.ServerNames) > 0 && !serverInRoom // Force a federated join if we're dealing with a pending invite diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go index 9d7c0816d..4d10dea67 100644 --- a/roomserver/internal/perform/perform_leave.go +++ b/roomserver/internal/perform/perform_leave.go @@ -64,7 +64,14 @@ func (r *Leaver) performLeaveRoomByID( // that. isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID) if err == nil && isInvitePending { - return r.performRejectInvite(ctx, req, res, senderUser, eventID) + var host gomatrixserverlib.ServerName + _, host, err = gomatrixserverlib.SplitID('@', senderUser) + if err != nil { + return nil, fmt.Errorf("Sender %q is invalid", senderUser) + } + if host != r.Cfg.Matrix.ServerName { + return r.performFederatedRejectInvite(ctx, req, res, senderUser, eventID) + } } // There's no invite pending, so first of all we want to find out @@ -94,9 +101,7 @@ func (r *Leaver) performLeaveRoomByID( if err != nil { return nil, fmt.Errorf("Error getting membership: %w", err) } - if membership != gomatrixserverlib.Join { - // TODO: should be able to handle "invite" in this case too, if - // it's a case of kicking or banning or such + if membership != gomatrixserverlib.Join && membership != gomatrixserverlib.Invite { return nil, fmt.Errorf("User %q is not joined to the room (membership is %q)", req.UserID, membership) } @@ -147,7 +152,7 @@ func (r *Leaver) performLeaveRoomByID( return nil, nil } -func (r *Leaver) performRejectInvite( +func (r *Leaver) performFederatedRejectInvite( ctx context.Context, req *api.PerformLeaveRequest, res *api.PerformLeaveResponse, // nolint:unparam diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 408f9766e..4af0e6397 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -36,6 +36,7 @@ import ( type Queryer struct { DB storage.Database Cache caching.RoomServerCaches + ServerName gomatrixserverlib.ServerName ServerACLs *acls.ServerACLs } @@ -328,6 +329,16 @@ func (r *Queryer) QueryServerJoinedToRoom( } response.RoomExists = true + if request.ServerName == r.ServerName || request.ServerName == "" { + var joined bool + joined, err = r.DB.GetLocalServerInRoom(ctx, info.RoomNID) + if err != nil { + return fmt.Errorf("r.DB.GetLocalServerInRoom: %w", err) + } + response.IsInRoom = joined + return nil + } + eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) if err != nil { return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err) @@ -377,10 +388,16 @@ func (r *Queryer) QueryServerAllowedToSeeEvent( return } roomID := events[0].RoomID() - isServerInRoom, err := helpers.IsServerCurrentlyInRoom(ctx, r.DB, request.ServerName, roomID) - if err != nil { - return + + inRoomReq := &api.QueryServerJoinedToRoomRequest{ + RoomID: roomID, + ServerName: request.ServerName, } + inRoomRes := &api.QueryServerJoinedToRoomResponse{} + if err = r.QueryServerJoinedToRoom(ctx, inRoomReq, inRoomRes); err != nil { + return fmt.Errorf("r.Queryer.QueryServerJoinedToRoom: %w", err) + } + info, err := r.DB.RoomInfo(ctx, roomID) if err != nil { return err @@ -389,7 +406,7 @@ func (r *Queryer) QueryServerAllowedToSeeEvent( return fmt.Errorf("QueryServerAllowedToSeeEvent: no room info for room %s", roomID) } response.AllowedToSeeEvent, err = helpers.CheckServerAllowedToSeeEvent( - ctx, r.DB, *info, request.EventID, request.ServerName, isServerInRoom, + ctx, r.DB, *info, request.EventID, request.ServerName, inRoomRes.IsInRoom, ) return } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index d2b0e75c9..c25820aac 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -154,6 +154,8 @@ type Database interface { GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) // JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear. JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) (map[string]int, error) + // GetLocalServerInRoom returns true if we think we're in a given room or false otherwise. + GetLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) // GetKnownUsers searches all users that userID knows about. GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) // GetKnownRooms returns a list of all rooms we know about. diff --git a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go index d87ae052b..26d88e00f 100644 --- a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go +++ b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go @@ -117,7 +117,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { _roomserver_state_block.event_nid FROM _roomserver_state_snapshots - JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids) + LEFT JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids) WHERE _roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT @@ -140,22 +140,49 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount) var snapshots []stateBlockData + var badCreateSnapshots []stateBlockData for snapshotrows.Next() { var snapshot stateBlockData - var eventsarray pq.Int64Array - if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil { + var eventsarray []sql.NullInt64 + var nulStateBlockNID sql.NullInt64 + if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &nulStateBlockNID, pq.Array(&eventsarray)); err != nil { return fmt.Errorf("rows.Scan: %w", err) } + if nulStateBlockNID.Valid { + snapshot.StateBlockNID = types.StateBlockNID(nulStateBlockNID.Int64) + } + // Dendrite v0.1.0 would not make a state block for the create event, resulting in [NULL] from the query above. + // Remember the snapshot and we'll fill it in after we close this cursor as we can't have 2 queries running at the same time + if len(eventsarray) == 1 && !eventsarray[0].Valid { + badCreateSnapshots = append(badCreateSnapshots, snapshot) + continue + } for _, e := range eventsarray { - snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e)) + if e.Valid { + snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e.Int64)) + } } snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)] snapshots = append(snapshots, snapshot) } - if err = snapshotrows.Close(); err != nil { return fmt.Errorf("snapshots.Close: %w", err) } + // fill in bad create snapshots + for _, s := range badCreateSnapshots { + var createEventNID types.EventNID + err = tx.QueryRow( + `SELECT event_nid FROM roomserver_events WHERE state_snapshot_nid = $1 AND event_type_nid = 1`, s.StateSnapshotNID, + ).Scan(&createEventNID) + if err != nil { + return fmt.Errorf("cannot xref null state block with snapshot %d: %s", s.StateSnapshotNID, err) + } + if createEventNID == 0 { + return fmt.Errorf("cannot xref null state block with snapshot %d, no create event", s.StateSnapshotNID) + } + s.EventNIDs = append(s.EventNIDs, createEventNID) + snapshots = append(snapshots, s) + } newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{} diff --git a/roomserver/storage/postgres/membership_table.go b/roomserver/storage/postgres/membership_table.go index 3466da6d2..9102f26a3 100644 --- a/roomserver/storage/postgres/membership_table.go +++ b/roomserver/storage/postgres/membership_table.go @@ -124,6 +124,14 @@ var selectKnownUsersSQL = "" + " SELECT DISTINCT room_nid FROM roomserver_membership WHERE target_nid=$1 AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + ") AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " AND event_state_key LIKE $2 LIMIT $3" +// selectLocalServerInRoomSQL is an optimised case for checking if we, the local server, +// are in the room by using the target_local column of the membership table. Normally when +// we want to know if a server is in a room, we have to unmarshal the entire room state which +// is expensive. The presence of a single row from this query suggests we're still in the +// room, no rows returned suggests we aren't. +const selectLocalServerInRoomSQL = "" + + "SELECT room_nid FROM roomserver_membership WHERE target_local = true AND membership_nid = $1 AND room_nid = $2 LIMIT 1" + type membershipStatements struct { insertMembershipStmt *sql.Stmt selectMembershipForUpdateStmt *sql.Stmt @@ -137,6 +145,7 @@ type membershipStatements struct { selectJoinedUsersSetForRoomsStmt *sql.Stmt selectKnownUsersStmt *sql.Stmt updateMembershipForgetRoomStmt *sql.Stmt + selectLocalServerInRoomStmt *sql.Stmt } func createMembershipTable(db *sql.DB) error { @@ -160,6 +169,7 @@ func prepareMembershipTable(db *sql.DB) (tables.Membership, error) { {&s.selectJoinedUsersSetForRoomsStmt, selectJoinedUsersSetForRoomsSQL}, {&s.selectKnownUsersStmt, selectKnownUsersSQL}, {&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom}, + {&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL}, }.Prepare(db) } @@ -324,3 +334,16 @@ func (s *membershipStatements) UpdateForgetMembership( ) return err } + +func (s *membershipStatements) SelectLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) { + var nid types.RoomNID + err := s.selectLocalServerInRoomStmt.QueryRowContext(ctx, tables.MembershipStateJoin, roomNID).Scan(&nid) + if err != nil { + if err == sql.ErrNoRows { + return false, nil + } + return false, err + } + found := nid > 0 + return found, nil +} diff --git a/roomserver/storage/postgres/state_block_table.go b/roomserver/storage/postgres/state_block_table.go index 4523d18bb..7ae987d6d 100644 --- a/roomserver/storage/postgres/state_block_table.go +++ b/roomserver/storage/postgres/state_block_table.go @@ -64,7 +64,7 @@ const insertStateDataSQL = "" + const bulkSelectStateBlockEntriesSQL = "" + "SELECT state_block_nid, event_nids" + - " FROM roomserver_state_block WHERE state_block_nid = ANY($1)" + " FROM roomserver_state_block WHERE state_block_nid = ANY($1) ORDER BY state_block_nid ASC" type stateBlockStatements struct { insertStateDataStmt *sql.Stmt diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index e77d62e06..9d9434cbb 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -1059,6 +1059,11 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) return result, nil } +// GetLocalServerInRoom returns true if we think we're in a given room or false otherwise. +func (d *Database) GetLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) { + return d.MembershipTable.SelectLocalServerInRoom(ctx, roomNID) +} + // GetKnownUsers searches all users that userID knows about. func (d *Database) GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) { stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, userID) diff --git a/roomserver/storage/sqlite3/membership_table.go b/roomserver/storage/sqlite3/membership_table.go index d9fe32cf8..82babe0d2 100644 --- a/roomserver/storage/sqlite3/membership_table.go +++ b/roomserver/storage/sqlite3/membership_table.go @@ -100,6 +100,14 @@ var selectKnownUsersSQL = "" + " SELECT DISTINCT room_nid FROM roomserver_membership WHERE target_nid=$1 AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + ") AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " AND event_state_key LIKE $2 LIMIT $3" +// selectLocalServerInRoomSQL is an optimised case for checking if we, the local server, +// are in the room by using the target_local column of the membership table. Normally when +// we want to know if a server is in a room, we have to unmarshal the entire room state which +// is expensive. The presence of a single row from this query suggests we're still in the +// room, no rows returned suggests we aren't. +const selectLocalServerInRoomSQL = "" + + "SELECT room_nid FROM roomserver_membership WHERE target_local = 1 AND membership_nid = $1 AND room_nid = $2 LIMIT 1" + type membershipStatements struct { db *sql.DB insertMembershipStmt *sql.Stmt @@ -113,6 +121,7 @@ type membershipStatements struct { updateMembershipStmt *sql.Stmt selectKnownUsersStmt *sql.Stmt updateMembershipForgetRoomStmt *sql.Stmt + selectLocalServerInRoomStmt *sql.Stmt } func createMembershipTable(db *sql.DB) error { @@ -137,6 +146,7 @@ func prepareMembershipTable(db *sql.DB) (tables.Membership, error) { {&s.selectRoomsWithMembershipStmt, selectRoomsWithMembershipSQL}, {&s.selectKnownUsersStmt, selectKnownUsersSQL}, {&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom}, + {&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL}, }.Prepare(db) } @@ -304,3 +314,16 @@ func (s *membershipStatements) UpdateForgetMembership( ) return err } + +func (s *membershipStatements) SelectLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) { + var nid types.RoomNID + err := s.selectLocalServerInRoomStmt.QueryRowContext(ctx, tables.MembershipStateJoin, roomNID).Scan(&nid) + if err != nil { + if err == sql.ErrNoRows { + return false, nil + } + return false, err + } + found := nid > 0 + return found, nil +} diff --git a/roomserver/storage/sqlite3/state_block_table.go b/roomserver/storage/sqlite3/state_block_table.go index cfb2a49e5..5cb21e91c 100644 --- a/roomserver/storage/sqlite3/state_block_table.go +++ b/roomserver/storage/sqlite3/state_block_table.go @@ -57,7 +57,7 @@ const insertStateDataSQL = ` const bulkSelectStateBlockEntriesSQL = "" + "SELECT state_block_nid, event_nids" + - " FROM roomserver_state_block WHERE state_block_nid IN ($1)" + " FROM roomserver_state_block WHERE state_block_nid IN ($1) ORDER BY state_block_nid ASC" type stateBlockStatements struct { db *sql.DB diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index dd486873a..4a893663f 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -135,6 +135,7 @@ type Membership interface { SelectJoinedUsersSetForRooms(ctx context.Context, roomNIDs []types.RoomNID) (map[types.EventStateKeyNID]int, error) SelectKnownUsers(ctx context.Context, userID types.EventStateKeyNID, searchString string, limit int) ([]string, error) UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error + SelectLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) } type Published interface { diff --git a/setup/base.go b/setup/base.go index 6bdeb80f7..7b691608d 100644 --- a/setup/base.go +++ b/setup/base.go @@ -77,6 +77,7 @@ type BaseDendrite struct { PublicKeyAPIMux *mux.Router PublicMediaAPIMux *mux.Router InternalAPIMux *mux.Router + SynapseAdminMux *mux.Router UseHTTPAPIs bool apiHttpClient *http.Client httpClient *http.Client @@ -199,6 +200,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo PublicKeyAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicKeyPathPrefix).Subrouter().UseEncodedPath(), PublicMediaAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicMediaPathPrefix).Subrouter().UseEncodedPath(), InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), + SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix("/_synapse/").Subrouter().UseEncodedPath(), apiHttpClient: &apiClient, httpClient: &client, } @@ -391,6 +393,7 @@ func (b *BaseDendrite) SetupAndServeHTTP( externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux) externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(federationHandler) } + externalRouter.PathPrefix("/_synapse/").Handler(b.SynapseAdminMux) externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux) if internalAddr != NoListener && internalAddr != externalAddr { diff --git a/setup/monolith.go b/setup/monolith.go index 235be4474..5ceb4ed30 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -57,9 +57,9 @@ type Monolith struct { } // AddAllPublicRoutes attaches all public paths to the given router -func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, mediaMux *mux.Router) { +func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, mediaMux, synapseMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, &m.Config.ClientAPI, m.AccountDB, + csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,