Merge pull request #3 from matrix-org/master

sync with upstream
This commit is contained in:
Kilos Liu 2021-07-10 22:41:52 +08:00 committed by GitHub
commit da323d1ee3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 1187 additions and 131 deletions

View file

@ -334,6 +334,7 @@ func (m *DendriteMonolith) Start() {
base.PublicFederationAPIMux, base.PublicFederationAPIMux,
base.PublicKeyAPIMux, base.PublicKeyAPIMux,
base.PublicMediaAPIMux, base.PublicMediaAPIMux,
base.SynapseAdminMux,
) )
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()

View file

@ -173,6 +173,7 @@ func (m *DendriteMonolith) Start() {
base.PublicFederationAPIMux, base.PublicFederationAPIMux,
base.PublicKeyAPIMux, base.PublicKeyAPIMux,
base.PublicMediaAPIMux, base.PublicMediaAPIMux,
base.SynapseAdminMux,
) )
httpRouter := mux.NewRouter() httpRouter := mux.NewRouter()

View file

@ -35,6 +35,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
func AddPublicRoutes( func AddPublicRoutes(
router *mux.Router, router *mux.Router,
synapseAdminRouter *mux.Router,
cfg *config.ClientAPI, cfg *config.ClientAPI,
accountsDB accounts.Database, accountsDB accounts.Database,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
@ -56,7 +57,7 @@ func AddPublicRoutes(
} }
routing.Setup( routing.Setup(
router, cfg, eduInputAPI, rsAPI, asAPI, router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, userAPI, federation, accountsDB, userAPI, federation,
syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg, syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg,
) )

View file

@ -17,10 +17,7 @@ package routing
import ( import (
"context" "context"
"crypto/hmac"
"crypto/sha1"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -594,7 +591,6 @@ func handleRegistrationFlow(
accessToken string, accessToken string,
accessTokenErr error, accessTokenErr error,
) util.JSONResponse { ) util.JSONResponse {
// TODO: Shared secret registration (create new user scripts)
// TODO: Enable registration config flag // TODO: Enable registration config flag
// TODO: Guest account upgrading // TODO: Guest account upgrading
@ -643,20 +639,6 @@ func handleRegistrationFlow(
// Add Recaptcha to the list of completed registration stages // Add Recaptcha to the list of completed registration stages
AddCompletedSessionStage(sessionID, authtypes.LoginTypeRecaptcha) AddCompletedSessionStage(sessionID, authtypes.LoginTypeRecaptcha)
case authtypes.LoginTypeSharedSecret:
// Check shared secret against config
valid, err := isValidMacLogin(cfg, r.Username, r.Password, r.Admin, r.Auth.Mac)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("isValidMacLogin failed")
return jsonerror.InternalServerError()
} else if !valid {
return util.MessageResponse(http.StatusForbidden, "HMAC incorrect")
}
// Add SharedSecret to the list of completed registration stages
AddCompletedSessionStage(sessionID, authtypes.LoginTypeSharedSecret)
case authtypes.LoginTypeDummy: case authtypes.LoginTypeDummy:
// there is nothing to do // there is nothing to do
// Add Dummy to the list of completed registration stages // Add Dummy to the list of completed registration stages
@ -849,49 +831,6 @@ func completeRegistration(
} }
} }
// Used for shared secret registration.
// Checks if the username, password and isAdmin flag matches the given mac.
func isValidMacLogin(
cfg *config.ClientAPI,
username, password string,
isAdmin bool,
givenMac []byte,
) (bool, error) {
sharedSecret := cfg.RegistrationSharedSecret
// Check that shared secret registration isn't disabled.
if cfg.RegistrationSharedSecret == "" {
return false, errors.New("Shared secret registration is disabled")
}
// Double check that username/password don't contain the HMAC delimiters. We should have
// already checked this.
if strings.Contains(username, "\x00") {
return false, errors.New("Username contains invalid character")
}
if strings.Contains(password, "\x00") {
return false, errors.New("Password contains invalid character")
}
if sharedSecret == "" {
return false, errors.New("Shared secret registration is disabled")
}
adminString := "notadmin"
if isAdmin {
adminString = "admin"
}
joined := strings.Join([]string{username, password, adminString}, "\x00")
mac := hmac.New(sha1.New, []byte(sharedSecret))
_, err := mac.Write([]byte(joined))
if err != nil {
return false, err
}
expectedMAC := mac.Sum(nil)
return hmac.Equal(givenMac, expectedMAC), nil
}
// checkFlows checks a single completed flow against another required one. If // checkFlows checks a single completed flow against another required one. If
// one contains at least all of the stages that the other does, checkFlows // one contains at least all of the stages that the other does, checkFlows
// returns true. // returns true.
@ -995,3 +934,34 @@ func RegisterAvailable(
}, },
} }
} }
func handleSharedSecretRegistration(userAPI userapi.UserInternalAPI, sr *SharedSecretRegistration, req *http.Request) util.JSONResponse {
ssrr, err := NewSharedSecretRegistrationRequest(req.Body)
if err != nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON(fmt.Sprintf("malformed json: %s", err)),
}
}
valid, err := sr.IsValidMacLogin(ssrr.Nonce, ssrr.User, ssrr.Password, ssrr.Admin, ssrr.MacBytes)
if err != nil {
return util.ErrorResponse(err)
}
if !valid {
return util.JSONResponse{
Code: 403,
JSON: jsonerror.Forbidden("bad mac"),
}
}
// downcase capitals
ssrr.User = strings.ToLower(ssrr.User)
if resErr := validateUsername(ssrr.User); resErr != nil {
return *resErr
}
if resErr := validatePassword(ssrr.Password); resErr != nil {
return *resErr
}
deviceID := "shared_secret_registration"
return completeRegistration(req.Context(), userAPI, ssrr.User, ssrr.Password, "", req.RemoteAddr, req.UserAgent(), false, &ssrr.User, &deviceID)
}

View file

@ -0,0 +1,99 @@
package routing
import (
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/util"
cache "github.com/patrickmn/go-cache"
)
type SharedSecretRegistrationRequest struct {
User string `json:"username"`
Password string `json:"password"`
Nonce string `json:"nonce"`
MacBytes []byte
MacStr string `json:"mac"`
Admin bool `json:"admin"`
}
func NewSharedSecretRegistrationRequest(reader io.ReadCloser) (*SharedSecretRegistrationRequest, error) {
defer internal.CloseAndLogIfError(context.Background(), reader, "NewSharedSecretRegistrationRequest: failed to close request body")
var ssrr SharedSecretRegistrationRequest
err := json.NewDecoder(reader).Decode(&ssrr)
if err != nil {
return nil, err
}
ssrr.MacBytes, err = hex.DecodeString(ssrr.MacStr)
return &ssrr, err
}
type SharedSecretRegistration struct {
sharedSecret string
nonces *cache.Cache
}
func NewSharedSecretRegistration(sharedSecret string) *SharedSecretRegistration {
return &SharedSecretRegistration{
sharedSecret: sharedSecret,
// nonces live for 5mins, purge every 10mins
nonces: cache.New(5*time.Minute, 10*time.Minute),
}
}
func (r *SharedSecretRegistration) GenerateNonce() string {
nonce := util.RandomString(16)
r.nonces.Set(nonce, true, cache.DefaultExpiration)
return nonce
}
func (r *SharedSecretRegistration) validNonce(nonce string) bool {
_, exists := r.nonces.Get(nonce)
return exists
}
func (r *SharedSecretRegistration) IsValidMacLogin(
nonce, username, password string,
isAdmin bool,
givenMac []byte,
) (bool, error) {
// Check that shared secret registration isn't disabled.
if r.sharedSecret == "" {
return false, errors.New("Shared secret registration is disabled")
}
if !r.validNonce(nonce) {
return false, fmt.Errorf("Incorrect or expired nonce: %s", nonce)
}
// Check that username/password don't contain the HMAC delimiters.
if strings.Contains(username, "\x00") {
return false, errors.New("Username contains invalid character")
}
if strings.Contains(password, "\x00") {
return false, errors.New("Password contains invalid character")
}
adminString := "notadmin"
if isAdmin {
adminString = "admin"
}
joined := strings.Join([]string{nonce, username, password, adminString}, "\x00")
mac := hmac.New(sha1.New, []byte(r.sharedSecret))
_, err := mac.Write([]byte(joined))
if err != nil {
return false, err
}
expectedMAC := mac.Sum(nil)
return hmac.Equal(givenMac, expectedMAC), nil
}

View file

@ -0,0 +1,43 @@
package routing
import (
"bytes"
"io/ioutil"
"testing"
"github.com/patrickmn/go-cache"
)
func TestSharedSecretRegister(t *testing.T) {
// these values have come from a local synapse instance to ensure compatibility
jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`)
sharedSecret := "dendritetest"
req, err := NewSharedSecretRegistrationRequest(ioutil.NopCloser(bytes.NewBuffer(jsonStr)))
if err != nil {
t.Fatalf("failed to read request: %s", err)
}
r := NewSharedSecretRegistration(sharedSecret)
// force the nonce to be known
r.nonces.Set(req.Nonce, true, cache.DefaultExpiration)
valid, err := r.IsValidMacLogin(req.Nonce, req.User, req.Password, req.Admin, req.MacBytes)
if err != nil {
t.Fatalf("failed to check for valid mac: %s", err)
}
if !valid {
t.Errorf("mac login failed, wanted success")
}
// modify the mac so it fails
req.MacBytes[0] = 0xff
valid, err = r.IsValidMacLogin(req.Nonce, req.User, req.Password, req.Admin, req.MacBytes)
if err != nil {
t.Fatalf("failed to check for valid mac: %s", err)
}
if valid {
t.Errorf("mac login succeeded, wanted failure")
}
}

View file

@ -37,6 +37,7 @@ import (
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
@ -46,7 +47,7 @@ import (
// applied: // applied:
// nolint: gocyclo // nolint: gocyclo
func Setup( func Setup(
publicAPIMux *mux.Router, cfg *config.ClientAPI, publicAPIMux, synapseAdminRouter *mux.Router, cfg *config.ClientAPI,
eduAPI eduServerAPI.EDUServerInputAPI, eduAPI eduServerAPI.EDUServerInputAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
@ -88,6 +89,32 @@ func Setup(
}), }),
).Methods(http.MethodGet, http.MethodOptions) ).Methods(http.MethodGet, http.MethodOptions)
if cfg.RegistrationSharedSecret != "" {
logrus.Info("Enabling shared secret registration at /_synapse/admin/v1/register")
sr := NewSharedSecretRegistration(cfg.RegistrationSharedSecret)
synapseAdminRouter.Handle("/admin/v1/register",
httputil.MakeExternalAPI("shared_secret_registration", func(req *http.Request) util.JSONResponse {
if req.Method == http.MethodGet {
return util.JSONResponse{
Code: 200,
JSON: struct {
Nonce string `json:"nonce"`
}{
Nonce: sr.GenerateNonce(),
},
}
}
if req.Method == http.MethodPost {
return handleSharedSecretRegistration(userAPI, sr, req)
}
return util.JSONResponse{
Code: http.StatusMethodNotAllowed,
JSON: jsonerror.NotFound("unknown method"),
}
}),
).Methods(http.MethodGet, http.MethodPost, http.MethodOptions)
}
r0mux := publicAPIMux.PathPrefix("/r0").Subrouter() r0mux := publicAPIMux.PathPrefix("/r0").Subrouter()
unstableMux := publicAPIMux.PathPrefix("/unstable").Subrouter() unstableMux := publicAPIMux.PathPrefix("/unstable").Subrouter()

View file

@ -197,6 +197,7 @@ func main() {
base.Base.PublicFederationAPIMux, base.Base.PublicFederationAPIMux,
base.Base.PublicKeyAPIMux, base.Base.PublicKeyAPIMux,
base.Base.PublicMediaAPIMux, base.Base.PublicMediaAPIMux,
base.Base.SynapseAdminMux,
) )
if err := mscs.Enable(&base.Base, &monolith); err != nil { if err := mscs.Enable(&base.Base, &monolith); err != nil {
logrus.WithError(err).Fatalf("Failed to enable MSCs") logrus.WithError(err).Fatalf("Failed to enable MSCs")

View file

@ -210,6 +210,7 @@ func main() {
base.PublicFederationAPIMux, base.PublicFederationAPIMux,
base.PublicKeyAPIMux, base.PublicKeyAPIMux,
base.PublicMediaAPIMux, base.PublicMediaAPIMux,
base.SynapseAdminMux,
) )
wsUpgrader := websocket.Upgrader{ wsUpgrader := websocket.Upgrader{

View file

@ -154,6 +154,7 @@ func main() {
base.PublicFederationAPIMux, base.PublicFederationAPIMux,
base.PublicKeyAPIMux, base.PublicKeyAPIMux,
base.PublicMediaAPIMux, base.PublicMediaAPIMux,
base.SynapseAdminMux,
) )
if err := mscs.Enable(base, &monolith); err != nil { if err := mscs.Enable(base, &monolith); err != nil {
logrus.WithError(err).Fatalf("Failed to enable MSCs") logrus.WithError(err).Fatalf("Failed to enable MSCs")

View file

@ -149,6 +149,7 @@ func main() {
base.PublicFederationAPIMux, base.PublicFederationAPIMux,
base.PublicKeyAPIMux, base.PublicKeyAPIMux,
base.PublicMediaAPIMux, base.PublicMediaAPIMux,
base.SynapseAdminMux,
) )
if len(base.Cfg.MSCs.MSCs) > 0 { if len(base.Cfg.MSCs.MSCs) > 0 {

View file

@ -33,7 +33,7 @@ func ClientAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
keyAPI := base.KeyServerHTTPClient() keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, accountDB, federation,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
&cfg.MSCs, &cfg.MSCs,
) )

View file

@ -0,0 +1,503 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/Masterminds/semver/v3"
"github.com/codeclysm/extract"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
)
var (
flagTempDir = flag.String("tmp", "tmp", "Path to temporary directory to dump tarballs to")
flagFrom = flag.String("from", "HEAD-1", "The version to start from e.g '0.3.1'. If 'HEAD-N' then starts N versions behind HEAD.")
flagTo = flag.String("to", "HEAD", "The version to end on e.g '0.3.3'.")
flagBuildConcurrency = flag.Int("build-concurrency", runtime.NumCPU(), "The amount of build concurrency when building images")
flagHead = flag.String("head", "", "Location to a dendrite repository to treat as HEAD instead of Github")
flagDockerHost = flag.String("docker-host", "localhost", "The hostname of the docker client. 'localhost' if running locally, 'host.docker.internal' if running in Docker.")
alphaNumerics = regexp.MustCompile("[^a-zA-Z0-9]+")
)
const HEAD = "HEAD"
// Embed the Dockerfile to use when building dendrite versions.
// We cannot use the dockerfile associated with the repo with each version sadly due to changes in
// Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients
// due to the error:
// When using COPY with more than one source file, the destination must be a directory and end with a /
// We need to run a postgres anyway, so use the dockerfile associated with Complement instead.
const Dockerfile = `FROM golang:1.13-stretch as build
RUN apt-get update && apt-get install -y postgresql
WORKDIR /build
# Copy the build context to the repo as this is the right dendrite code. This is different to the
# Complement Dockerfile which wgets a branch.
COPY . .
RUN go build ./cmd/dendrite-monolith-server
RUN go build ./cmd/generate-keys
RUN go build ./cmd/generate-config
RUN ./generate-config --ci > dendrite.yaml
RUN ./generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key
# Replace the connection string with a single postgres DB, using user/db = 'postgres' and no password
RUN sed -i "s%connection_string:.*$%connection_string: postgresql://postgres@localhost/postgres?sslmode=disable%g" dendrite.yaml
# No password when connecting over localhost
RUN sed -i "s%127.0.0.1/32 md5%127.0.0.1/32 trust%g" /etc/postgresql/9.6/main/pg_hba.conf
# Bump up max conns for moar concurrency
RUN sed -i 's/max_connections = 100/max_connections = 2000/g' /etc/postgresql/9.6/main/postgresql.conf
RUN sed -i 's/max_open_conns:.*$/max_open_conns: 100/g' dendrite.yaml
# This entry script starts postgres, waits for it to be up then starts dendrite
RUN echo '\
#!/bin/bash -eu \n\
pg_lsclusters \n\
pg_ctlcluster 9.6 main start \n\
\n\
until pg_isready \n\
do \n\
echo "Waiting for postgres"; \n\
sleep 1; \n\
done \n\
\n\
sed -i "s/server_name: localhost/server_name: ${SERVER_NAME}/g" dendrite.yaml \n\
./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml \n\
' > run_dendrite.sh && chmod +x run_dendrite.sh
ENV SERVER_NAME=localhost
EXPOSE 8008 8448
CMD /build/run_dendrite.sh `
const dendriteUpgradeTestLabel = "dendrite_upgrade_test"
// downloadArchive downloads an arbitrary github archive of the form:
// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz
// and re-tarballs it without the top-level directory which contains branch information. It inserts
// the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that
// you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile.
// Returns the tarball buffer on success.
func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []byte) (*bytes.Buffer, error) {
resp, err := cli.Get(archiveURL)
if err != nil {
return nil, err
}
// nolint:errcheck
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("got HTTP %d", resp.StatusCode)
}
_ = os.RemoveAll(tmpDir)
if err = os.Mkdir(tmpDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("failed to make temporary directory: %s", err)
}
// nolint:errcheck
defer os.RemoveAll(tmpDir)
// dump the tarball temporarily, stripping the top-level directory
err = extract.Archive(context.Background(), resp.Body, tmpDir, func(inPath string) string {
// remove top level
segments := strings.Split(inPath, "/")
return strings.Join(segments[1:], "/")
})
if err != nil {
return nil, err
}
// add top level Dockerfile
err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err)
}
// now re-tarball it :/
var tarball bytes.Buffer
err = compress(tmpDir, &tarball)
if err != nil {
return nil, err
}
return &tarball, nil
}
// buildDendrite builds Dendrite on the branchOrTagName given. Returns the image ID or an error
func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir, branchOrTagName string) (string, error) {
var tarball *bytes.Buffer
var err error
// If a custom HEAD location is given, use that, else pull from github. Mostly useful for CI
// where we want to use the working directory.
if branchOrTagName == HEAD && *flagHead != "" {
log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead)
// add top level Dockerfile
err = ioutil.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm)
if err != nil {
return "", fmt.Errorf("Custom HEAD: failed to inject /Dockerfile: %w", err)
}
// now tarball it
var buffer bytes.Buffer
err = compress(*flagHead, &buffer)
if err != nil {
return "", fmt.Errorf("failed to tarball custom HEAD %s : %s", *flagHead, err)
}
tarball = &buffer
} else {
log.Printf("%s: Downloading version %s to %s\n", branchOrTagName, branchOrTagName, tmpDir)
// pull an archive, this contains a top-level directory which screws with the build context
// which we need to fix up post download
u := fmt.Sprintf("https://github.com/matrix-org/dendrite/archive/%s.tar.gz", branchOrTagName)
tarball, err = downloadArchive(httpClient, tmpDir, u, []byte(Dockerfile))
if err != nil {
return "", fmt.Errorf("failed to download archive %s: %w", u, err)
}
log.Printf("%s: %s => %d bytes\n", branchOrTagName, u, tarball.Len())
}
log.Printf("%s: Building version %s\n", branchOrTagName, branchOrTagName)
res, err := dockerClient.ImageBuild(context.Background(), tarball, types.ImageBuildOptions{
Tags: []string{"dendrite-upgrade"},
})
if err != nil {
return "", fmt.Errorf("failed to start building image: %s", err)
}
// nolint:errcheck
defer res.Body.Close()
decoder := json.NewDecoder(res.Body)
// {"aux":{"ID":"sha256:247082c717963bc2639fc2daed08838d67811ea12356cd4fda43e1ffef94f2eb"}}
var imageID string
for decoder.More() {
var dl struct {
Stream string `json:"stream"`
Aux map[string]interface{} `json:"aux"`
}
if err := decoder.Decode(&dl); err != nil {
return "", fmt.Errorf("failed to decode build image output line: %w", err)
}
log.Printf("%s: %s", branchOrTagName, dl.Stream)
if dl.Aux != nil {
imgID, ok := dl.Aux["ID"]
if ok {
imageID = imgID.(string)
}
}
}
return imageID, nil
}
func getAndSortVersionsFromGithub(httpClient *http.Client) (semVers []*semver.Version, err error) {
u := "https://api.github.com/repos/matrix-org/dendrite/tags"
res, err := httpClient.Get(u)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
return nil, fmt.Errorf("%s returned HTTP %d", u, res.StatusCode)
}
resp := []struct {
Name string `json:"name"`
}{}
if err = json.NewDecoder(res.Body).Decode(&resp); err != nil {
return nil, err
}
for _, r := range resp {
v, err := semver.NewVersion(r.Name)
if err != nil {
continue // not a semver, that's ok and isn't an error, we allow tags that aren't semvers
}
semVers = append(semVers, v)
}
sort.Sort(semver.Collection(semVers))
return semVers, nil
}
func calculateVersions(cli *http.Client, from, to string) []string {
semvers, err := getAndSortVersionsFromGithub(cli)
if err != nil {
log.Fatalf("failed to collect semvers from github: %s", err)
}
// snip the lower bound depending on --from
if from != "" {
if strings.HasPrefix(from, "HEAD-") {
var headN int
headN, err = strconv.Atoi(strings.TrimPrefix(from, "HEAD-"))
if err != nil {
log.Fatalf("invalid --from, try 'HEAD-1'")
}
if headN >= len(semvers) {
log.Fatalf("only have %d versions, but asked to go to HEAD-%d", len(semvers), headN)
}
if headN > 0 {
semvers = semvers[len(semvers)-headN:]
}
} else {
fromVer, err := semver.NewVersion(from)
if err != nil {
log.Fatalf("invalid --from: %s", err)
}
i := 0
for i = 0; i < len(semvers); i++ {
if semvers[i].LessThan(fromVer) {
continue
}
break
}
semvers = semvers[i:]
}
}
if to != "" && to != HEAD {
toVer, err := semver.NewVersion(to)
if err != nil {
log.Fatalf("invalid --to: %s", err)
}
var i int
for i = len(semvers) - 1; i >= 0; i-- {
if semvers[i].GreaterThan(toVer) {
continue
}
break
}
semvers = semvers[:i+1]
}
var versions []string
for _, sv := range semvers {
versions = append(versions, sv.Original())
}
if to == HEAD {
versions = append(versions, HEAD)
}
return versions
}
func buildDendriteImages(httpClient *http.Client, dockerClient *client.Client, baseTempDir string, concurrency int, branchOrTagNames []string) map[string]string {
// concurrently build all versions, this can be done in any order. The mutex protects the map
branchToImageID := make(map[string]string)
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(concurrency)
ch := make(chan string, len(branchOrTagNames))
for _, branchName := range branchOrTagNames {
ch <- branchName
}
close(ch)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for branchName := range ch {
tmpDir := baseTempDir + alphaNumerics.ReplaceAllString(branchName, "")
imgID, err := buildDendrite(httpClient, dockerClient, tmpDir, branchName)
if err != nil {
log.Fatalf("%s: failed to build dendrite image: %s", branchName, err)
}
mu.Lock()
branchToImageID[branchName] = imgID
mu.Unlock()
}
}()
}
wg.Wait()
return branchToImageID
}
func runImage(dockerClient *client.Client, volumeName, version, imageID string) (csAPIURL, containerID string, err error) {
log.Printf("%s: running image %s\n", version, imageID)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
body, err := dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageID,
Env: []string{"SERVER_NAME=hs1"},
Labels: map[string]string{
dendriteUpgradeTestLabel: "yes",
},
}, &container.HostConfig{
PublishAllPorts: true,
Mounts: []mount.Mount{
{
Type: mount.TypeVolume,
Source: volumeName,
Target: "/var/lib/postgresql/9.6/main",
},
},
}, nil, nil, "dendrite_upgrade_test_"+version)
if err != nil {
return "", "", fmt.Errorf("failed to ContainerCreate: %s", err)
}
containerID = body.ID
err = dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{})
if err != nil {
return "", "", fmt.Errorf("failed to ContainerStart: %s", err)
}
inspect, err := dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return "", "", err
}
csapiPortInfo, ok := inspect.NetworkSettings.Ports[nat.Port("8008/tcp")]
if !ok {
return "", "", fmt.Errorf("port 8008 not exposed - exposed ports: %v", inspect.NetworkSettings.Ports)
}
baseURL := fmt.Sprintf("http://%s:%s", *flagDockerHost, csapiPortInfo[0].HostPort)
versionsURL := fmt.Sprintf("%s/_matrix/client/versions", baseURL)
// hit /versions to check it is up
var lastErr error
for i := 0; i < 500; i++ {
res, err := http.Get(versionsURL)
if err != nil {
lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err)
time.Sleep(50 * time.Millisecond)
continue
}
if res.StatusCode != 200 {
lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status)
time.Sleep(50 * time.Millisecond)
continue
}
lastErr = nil
break
}
if lastErr != nil {
logs, err := dockerClient.ContainerLogs(context.Background(), containerID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
})
// ignore errors when cannot get logs, it's just for debugging anyways
if err == nil {
logbody, err := ioutil.ReadAll(logs)
if err == nil {
log.Printf("Container logs:\n\n%s\n\n", string(logbody))
}
}
}
return baseURL, containerID, lastErr
}
func destroyContainer(dockerClient *client.Client, containerID string) {
err := dockerClient.ContainerRemove(context.TODO(), containerID, types.ContainerRemoveOptions{
Force: true,
})
if err != nil {
log.Printf("failed to remove container %s : %s", containerID, err)
}
}
func loadAndRunTests(dockerClient *client.Client, volumeName, v string, branchToImageID map[string]string) error {
csAPIURL, containerID, err := runImage(dockerClient, volumeName, v, branchToImageID[v])
if err != nil {
return fmt.Errorf("failed to run container for branch %v: %v", v, err)
}
defer destroyContainer(dockerClient, containerID)
log.Printf("URL %s -> %s \n", csAPIURL, containerID)
if err = runTests(csAPIURL, v); err != nil {
return fmt.Errorf("failed to run tests on version %s: %s", v, err)
}
return nil
}
func verifyTests(dockerClient *client.Client, volumeName string, versions []string, branchToImageID map[string]string) error {
lastVer := versions[len(versions)-1]
csAPIURL, containerID, err := runImage(dockerClient, volumeName, lastVer, branchToImageID[lastVer])
if err != nil {
return fmt.Errorf("failed to run container for branch %v: %v", lastVer, err)
}
defer destroyContainer(dockerClient, containerID)
return verifyTestsRan(csAPIURL, versions)
}
// cleanup old containers/volumes from a previous run
func cleanup(dockerClient *client.Client) {
// ignore all errors, we are just cleaning up and don't want to fail just because we fail to cleanup
containers, _ := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{
Filters: label(dendriteUpgradeTestLabel),
})
for _, c := range containers {
s := time.Second
_ = dockerClient.ContainerStop(context.Background(), c.ID, &s)
_ = dockerClient.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{
Force: true,
})
}
_ = dockerClient.VolumeRemove(context.Background(), "dendrite_upgrade_test", true)
}
func label(in string) filters.Args {
f := filters.NewArgs()
f.Add("label", in)
return f
}
func main() {
flag.Parse()
httpClient := &http.Client{
Timeout: 60 * time.Second,
}
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Fatalf("failed to make docker client: %s", err)
}
if *flagFrom == "" {
flag.Usage()
os.Exit(1)
}
cleanup(dockerClient)
versions := calculateVersions(httpClient, *flagFrom, *flagTo)
log.Printf("Testing dendrite versions: %v\n", versions)
branchToImageID := buildDendriteImages(httpClient, dockerClient, *flagTempDir, *flagBuildConcurrency, versions)
// make a shared postgres volume
volume, err := dockerClient.VolumeCreate(context.Background(), volume.VolumeCreateBody{
Name: "dendrite_upgrade_test",
Labels: map[string]string{
dendriteUpgradeTestLabel: "yes",
},
})
if err != nil {
log.Fatalf("failed to make docker volume: %s", err)
}
failed := false
defer func() {
perr := recover()
log.Println("removing postgres volume")
verr := dockerClient.VolumeRemove(context.Background(), volume.Name, true)
if perr == nil {
perr = verr
}
if perr != nil {
panic(perr)
}
if failed {
os.Exit(1)
}
}()
// run through images sequentially
for _, v := range versions {
if err = loadAndRunTests(dockerClient, volume.Name, v, branchToImageID); err != nil {
log.Printf("failed to run tests for %v: %s\n", v, err)
failed = true
break
}
}
if err := verifyTests(dockerClient, volume.Name, versions, branchToImageID); err != nil {
log.Printf("failed to verify test results: %s", err)
failed = true
}
}

View file

@ -0,0 +1,63 @@
package main
import (
"archive/tar"
"compress/gzip"
"io"
"os"
"path/filepath"
"strings"
)
// From https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726
// Modified to strip off top-level when compressing
func compress(src string, buf io.Writer) error {
// tar > gzip > buf
zr := gzip.NewWriter(buf)
tw := tar.NewWriter(zr)
// walk through every file in the folder
err := filepath.Walk(src, func(file string, fi os.FileInfo, e error) error {
// generate tar header
header, err := tar.FileInfoHeader(fi, file)
if err != nil {
return err
}
// must provide real name
// (see https://golang.org/src/archive/tar/common.go?#L626)
header.Name = strings.TrimPrefix(filepath.ToSlash(file), src+"/")
// write header
if err := tw.WriteHeader(header); err != nil {
return err
}
// if not a dir, write file content
if !fi.IsDir() {
data, err := os.Open(file)
if err != nil {
return err
}
if _, err = io.Copy(tw, data); err != nil {
return err
}
if err = data.Close(); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
// produce tar
if err := tw.Close(); err != nil {
return err
}
// produce gzip
if err := zr.Close(); err != nil {
return err
}
//
return nil
}

View file

@ -0,0 +1,192 @@
package main
import (
"fmt"
"log"
"strings"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
)
const userPassword = "this_is_a_long_password"
type user struct {
userID string
localpart string
client *gomatrix.Client
}
// runTests performs the following operations:
// - register alice and bob with branch name muxed into the localpart
// - create a DM room for the 2 users and exchange messages
// - create/join a public #global room and exchange messages
func runTests(baseURL, branchName string) error {
// register 2 users
users := []user{
{
localpart: "alice" + branchName,
},
{
localpart: "bob" + branchName,
},
}
for i, u := range users {
client, err := gomatrix.NewClient(baseURL, "", "")
if err != nil {
return err
}
resp, err := client.RegisterDummy(&gomatrix.ReqRegister{
Username: strings.ToLower(u.localpart),
Password: userPassword,
})
if err != nil {
return fmt.Errorf("failed to register %s: %s", u.localpart, err)
}
client, err = gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken)
if err != nil {
return err
}
users[i].client = client
users[i].userID = resp.UserID
}
// create DM room, join it and exchange messages
createRoomResp, err := users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{
Preset: "trusted_private_chat",
Invite: []string{users[1].userID},
IsDirect: true,
})
if err != nil {
return fmt.Errorf("failed to create DM room: %s", err)
}
dmRoomID := createRoomResp.RoomID
if _, err = users[1].client.JoinRoom(dmRoomID, "", nil); err != nil {
return fmt.Errorf("failed to join DM room: %s", err)
}
msgs := []struct {
client *gomatrix.Client
text string
}{
{
client: users[0].client, text: "1: " + branchName,
},
{
client: users[1].client, text: "2: " + branchName,
},
{
client: users[0].client, text: "3: " + branchName,
},
{
client: users[1].client, text: "4: " + branchName,
},
}
for _, msg := range msgs {
_, err = msg.client.SendText(dmRoomID, msg.text)
if err != nil {
return fmt.Errorf("failed to send text in dm room: %s", err)
}
}
// attempt to create/join the shared public room
publicRoomID := ""
createRoomResp, err = users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{
RoomAliasName: "global",
Preset: "public_chat",
})
if err != nil { // this is okay and expected if the room already exists and the aliases clash
// try to join it
_, domain, err2 := gomatrixserverlib.SplitID('@', users[0].userID)
if err2 != nil {
return fmt.Errorf("failed to split user ID: %s, %s", users[0].userID, err2)
}
joinRoomResp, err2 := users[0].client.JoinRoom(fmt.Sprintf("#global:%s", domain), "", nil)
if err2 != nil {
return fmt.Errorf("alice failed to join public room: %s", err2)
}
publicRoomID = joinRoomResp.RoomID
} else {
publicRoomID = createRoomResp.RoomID
}
if _, err = users[1].client.JoinRoom(publicRoomID, "", nil); err != nil {
return fmt.Errorf("bob failed to join public room: %s", err)
}
// send messages
for _, msg := range msgs {
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
if err != nil {
return fmt.Errorf("failed to send text in public room: %s", err)
}
}
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
return nil
}
// verifyTestsRan checks that the HS has the right rooms/messages
func verifyTestsRan(baseURL string, branchNames []string) error {
log.Println("Verifying tests....")
// check we can login as all users
var resp *gomatrix.RespLogin
for _, branchName := range branchNames {
client, err := gomatrix.NewClient(baseURL, "", "")
if err != nil {
return err
}
userLocalparts := []string{
"alice" + branchName,
"bob" + branchName,
}
for _, userLocalpart := range userLocalparts {
resp, err = client.Login(&gomatrix.ReqLogin{
Type: "m.login.password",
User: strings.ToLower(userLocalpart),
Password: userPassword,
})
if err != nil {
return fmt.Errorf("failed to login as %s: %s", userLocalpart, err)
}
if resp.AccessToken == "" {
return fmt.Errorf("failed to login, bad response: %+v", resp)
}
}
}
log.Println(" accounts exist: OK")
client, err := gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken)
if err != nil {
return err
}
_, domain, err := gomatrixserverlib.SplitID('@', client.UserID)
if err != nil {
return err
}
u := client.BuildURL("directory", "room", fmt.Sprintf("#global:%s", domain))
r := struct {
RoomID string `json:"room_id"`
}{}
err = client.MakeRequest("GET", u, nil, &r)
if err != nil {
return fmt.Errorf("failed to /directory: %s", err)
}
if r.RoomID == "" {
return fmt.Errorf("/directory lookup returned no room ID")
}
log.Println(" public room exists: OK")
history, err := client.Messages(r.RoomID, client.Store.LoadNextBatch(client.UserID), "", 'b', 100)
if err != nil {
return fmt.Errorf("failed to get /messages: %s", err)
}
// we expect 4 messages per version
msgCount := 0
for _, ev := range history.Chunk {
if ev.Type == "m.room.message" {
msgCount += 1
}
}
wantMsgCount := len(branchNames) * 4
if msgCount != wantMsgCount {
return fmt.Errorf("got %d messages in global room, want %d", msgCount, wantMsgCount)
}
log.Println(" messages exist: OK")
return nil
}

View file

@ -215,6 +215,7 @@ func main() {
base.PublicFederationAPIMux, base.PublicFederationAPIMux,
base.PublicKeyAPIMux, base.PublicKeyAPIMux,
base.PublicMediaAPIMux, base.PublicMediaAPIMux,
base.SynapseAdminMux,
) )
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()

View file

@ -236,6 +236,7 @@ func main() {
base.PublicFederationAPIMux, base.PublicFederationAPIMux,
base.PublicKeyAPIMux, base.PublicKeyAPIMux,
base.PublicMediaAPIMux, base.PublicMediaAPIMux,
base.SynapseAdminMux,
) )
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()

View file

@ -205,7 +205,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(context.Background()) resp, jsonErr := t.processTransaction(httpReq.Context())
if jsonErr != nil { if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr return *jsonErr
@ -232,7 +232,8 @@ type txnReq struct {
// something that can tell us about which servers are in a room right now // something that can tell us about which servers are in a room right now
servers federationAPI.ServersInRoomProvider servers federationAPI.ServersInRoomProvider
// a list of events from the auth and prev events which we already had // a list of events from the auth and prev events which we already had
hadEvents map[string]bool hadEvents map[string]bool
hadEventsMutex sync.Mutex
// local cache of events for auth checks, etc - this may include events // local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of. // which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent haveEvents map[string]*gomatrixserverlib.HeaderedEvent
@ -240,6 +241,12 @@ type txnReq struct {
work string // metrics work string // metrics
} }
func (t *txnReq) hadEvent(eventID string, had bool) {
t.hadEventsMutex.Lock()
defer t.hadEventsMutex.Unlock()
t.hadEvents[eventID] = had
}
// A subset of FederationClient functionality that txn requires. Useful for testing. // A subset of FederationClient functionality that txn requires. Useful for testing.
type txnFederationClient interface { type txnFederationClient interface {
LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
@ -253,11 +260,8 @@ type txnFederationClient interface {
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult) results := make(map[string]gomatrixserverlib.PDUResult)
//var resultsMutex sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
var tasks []*inputTask var tasks []*inputTask
wg.Add(1) // for processEDUs
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
pduCountTotal.WithLabelValues("total").Inc() pduCountTotal.WithLabelValues("total").Inc()
@ -313,9 +317,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
input: newSendFIFOQueue(), input: newSendFIFOQueue(),
}) })
worker := v.(*inputWorker) worker := v.(*inputWorker)
if !worker.running.Load() {
go worker.run()
}
wg.Add(1) wg.Add(1)
task := &inputTask{ task := &inputTask{
ctx: ctx, ctx: ctx,
@ -325,13 +326,12 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
tasks = append(tasks, task) tasks = append(tasks, task)
worker.input.push(task) worker.input.push(task)
if worker.running.CAS(false, true) {
go worker.run()
}
} }
go func() { t.processEDUs(ctx)
defer wg.Done()
t.processEDUs(ctx)
}()
wg.Wait() wg.Wait()
for _, task := range tasks { for _, task := range tasks {
@ -351,9 +351,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
func (t *inputWorker) run() { func (t *inputWorker) run() {
if !t.running.CAS(false, true) {
return
}
defer t.running.Store(false) defer t.running.Store(false)
for { for {
task, ok := t.input.pop() task, ok := t.input.pop()
@ -368,10 +365,14 @@ func (t *inputWorker) run() {
select { select {
case <-task.ctx.Done(): case <-task.ctx.Done():
task.err = context.DeadlineExceeded task.err = context.DeadlineExceeded
pduCountTotal.WithLabelValues("expired").Inc()
return return
default: default:
evStart := time.Now() evStart := time.Now()
task.err = task.t.processEvent(task.ctx, task.event) // TODO: Is 5 minutes too long?
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
task.err = task.t.processEvent(ctx, task.event)
cancel()
task.duration = time.Since(evStart) task.duration = time.Since(evStart)
if err := task.err; err != nil { if err := task.err; err != nil {
switch err.(type) { switch err.(type) {
@ -572,6 +573,23 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
t.work = "" // reset from previous event t.work = "" // reset from previous event
// Ask the roomserver if we know about the room and/or if we're joined
// to it. If we aren't then we won't bother processing the event.
joinedReq := api.QueryServerJoinedToRoomRequest{
RoomID: e.RoomID(),
}
var joinedRes api.QueryServerJoinedToRoomResponse
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, &joinedReq, &joinedRes); err != nil {
return fmt.Errorf("t.rsAPI.QueryServerJoinedToRoom: %w", err)
}
if !joinedRes.RoomExists || !joinedRes.IsInRoom {
// We don't believe we're a member of this room, therefore there's
// no point in wasting work trying to figure out what to do with
// missing auth or prev events. Drop the event.
return roomNotFoundError{e.RoomID()}
}
// Work out if the roomserver knows everything it needs to know to auth // Work out if the roomserver knows everything it needs to know to auth
// the event. This includes the prev_events and auth_events. // the event. This includes the prev_events and auth_events.
// NOTE! This is going to include prev_events that have an empty state // NOTE! This is going to include prev_events that have an empty state
@ -588,23 +606,13 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err) return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
} }
if !stateResp.RoomExists {
// TODO: When synapse receives a message for a room it is not in it
// asks the remote server for the state of the room so that it can
// check if the remote server knows of a join "m.room.member" event
// that this server is unaware of.
// However generally speaking we should reject events for rooms we
// aren't a member of.
return roomNotFoundError{e.RoomID()}
}
// Prepare a map of all the events we already had before this point, so // Prepare a map of all the events we already had before this point, so
// that we don't send them to the roomserver again. // that we don't send them to the roomserver again.
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) { for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
t.hadEvents[eventID] = true t.hadEvent(eventID, true)
} }
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) { for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
t.hadEvents[eventID] = false t.hadEvent(eventID, false)
} }
if len(stateResp.MissingAuthEventIDs) > 0 { if len(stateResp.MissingAuthEventIDs) > 0 {
@ -679,7 +687,7 @@ withNextEvent:
); err != nil { ); err != nil {
return fmt.Errorf("api.SendEvents: %w", err) return fmt.Errorf("api.SendEvents: %w", err)
} }
t.hadEvents[ev.EventID()] = true // if the roomserver didn't know about the event before, it does now t.hadEvent(ev.EventID(), true) // if the roomserver didn't know about the event before, it does now
t.cacheAndReturn(ev.Headered(stateResp.RoomVersion)) t.cacheAndReturn(ev.Headered(stateResp.RoomVersion))
delete(missingAuthEvents, missingAuthEventID) delete(missingAuthEvents, missingAuthEventID)
continue withNextEvent continue withNextEvent
@ -703,14 +711,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
return gomatrixserverlib.Allowed(e, &authUsingState) return gomatrixserverlib.Allowed(e, &authUsingState)
} }
var processEventWithMissingStateMutexes = internal.NewMutexByRoom()
func (t *txnReq) processEventWithMissingState( func (t *txnReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) error { ) error {
processEventWithMissingStateMutexes.Lock(e.RoomID())
defer processEventWithMissingStateMutexes.Unlock(e.RoomID())
// We are missing the previous events for this events. // We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the // This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap: // room. There two ways that we can handle such a gap:
@ -812,14 +815,23 @@ func (t *txnReq) processEventWithMissingState(
// First of all, send the backward extremity into the roomserver with the // First of all, send the backward extremity into the roomserver with the
// newly resolved state. This marks the "oldest" point in the backfill and // newly resolved state. This marks the "oldest" point in the backfill and
// sets the baseline state for any new events after this. // sets the baseline state for any new events after this. We'll make a
// copy of the hadEvents map so that it can be taken downstream without
// worrying about concurrent map reads/writes, since t.hadEvents is meant
// to be protected by a mutex.
hadEvents := map[string]bool{}
t.hadEventsMutex.Lock()
for k, v := range t.hadEvents {
hadEvents[k] = v
}
t.hadEventsMutex.Unlock()
err = api.SendEventWithState( err = api.SendEventWithState(
context.Background(), context.Background(),
t.rsAPI, t.rsAPI,
api.KindOld, api.KindOld,
resolvedState, resolvedState,
backwardsExtremity.Headered(roomVersion), backwardsExtremity.Headered(roomVersion),
t.hadEvents, hadEvents,
) )
if err != nil { if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err) return fmt.Errorf("api.SendEventWithState: %w", err)
@ -915,7 +927,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this // set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
// processEvent request, which is better for memory. // processEvent request, which is better for memory.
stateEvents[i] = t.cacheAndReturn(ev) stateEvents[i] = t.cacheAndReturn(ev)
t.hadEvents[ev.EventID()] = true t.hadEvent(ev.EventID(), true)
} }
// we should never access res.StateEvents again so we delete it here to make GC faster // we should never access res.StateEvents again so we delete it here to make GC faster
res.StateEvents = nil res.StateEvents = nil
@ -950,7 +962,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
} }
for i, ev := range queryRes.Events { for i, ev := range queryRes.Events {
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap()) authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
t.hadEvents[ev.EventID()] = true t.hadEvent(ev.EventID(), true)
} }
queryRes.Events = nil queryRes.Events = nil
} }
@ -1027,7 +1039,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
latestEvents := make([]string, len(res.LatestEvents)) latestEvents := make([]string, len(res.LatestEvents))
for i, ev := range res.LatestEvents { for i, ev := range res.LatestEvents {
latestEvents[i] = res.LatestEvents[i].EventID latestEvents[i] = res.LatestEvents[i].EventID
t.hadEvents[ev.EventID] = true t.hadEvent(ev.EventID, true)
} }
var missingResp *gomatrixserverlib.RespMissingEvents var missingResp *gomatrixserverlib.RespMissingEvents
@ -1163,7 +1175,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
} }
for i, ev := range queryRes.Events { for i, ev := range queryRes.Events {
queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i]) queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i])
t.hadEvents[ev.EventID()] = true t.hadEvent(ev.EventID(), true)
evID := queryRes.Events[i].EventID() evID := queryRes.Events[i].EventID()
if missing[evID] { if missing[evID] {
delete(missing, evID) delete(missing, evID)

View file

@ -190,7 +190,9 @@ func (t *testRoomserverAPI) QueryServerJoinedToRoom(
request *api.QueryServerJoinedToRoomRequest, request *api.QueryServerJoinedToRoomRequest,
response *api.QueryServerJoinedToRoomResponse, response *api.QueryServerJoinedToRoomResponse,
) error { ) error {
return fmt.Errorf("not implemented") response.RoomExists = true
response.IsInRoom = true
return nil
} }
// Query whether a server is allowed to see an event // Query whether a server is allowed to see an event

11
go.mod
View file

@ -3,12 +3,19 @@ module github.com/matrix-org/dendrite
require ( require (
github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/Masterminds/semver/v3 v3.1.1
github.com/Shopify/sarama v1.28.0 github.com/Shopify/sarama v1.28.0
github.com/codeclysm/extract v2.2.0+incompatible
github.com/containerd/containerd v1.5.2 // indirect
github.com/docker/docker v20.10.7+incompatible
github.com/docker/go-connections v0.4.0
github.com/getsentry/sentry-go v0.10.0 github.com/getsentry/sentry-go v0.10.0
github.com/gologme/log v1.2.0 github.com/gologme/log v1.2.0
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/h2non/filetype v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 // indirect
github.com/lib/pq v1.9.0 github.com/lib/pq v1.9.0
github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-circuit v0.4.0 github.com/libp2p/go-libp2p-circuit v0.4.0
@ -28,9 +35,11 @@ require (
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
github.com/morikuni/aec v1.0.0 // indirect
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6 github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
github.com/opentracing/opentracing-go v1.2.0 github.com/opentracing/opentracing-go v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/pressly/goose v2.7.0+incompatible github.com/pressly/goose v2.7.0+incompatible
github.com/prometheus/client_golang v1.9.0 github.com/prometheus/client_golang v1.9.0
@ -41,7 +50,7 @@ require (
github.com/uber/jaeger-lib v2.4.0+incompatible github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa
go.uber.org/atomic v1.7.0 go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08 golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1

View file

@ -73,9 +73,9 @@ func callerPrettyfier(f *runtime.Frame) (string, string) {
// Append a newline + tab to it to move the actual log content to its own line // Append a newline + tab to it to move the actual log content to its own line
funcname += "\n\t" funcname += "\n\t"
// Surround the filepath in brackets and append line number so IDEs can quickly // Use a shortened file path which just has the filename to avoid having lots of redundant
// navigate // directories which contribute significantly to overall log sizes!
filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line) filename := fmt.Sprintf(" [%s:%d]", path.Base(f.File), f.Line)
return funcname, filename return funcname, filename
} }

View file

@ -170,7 +170,8 @@ type QueryMembershipsForRoomResponse struct {
// QueryServerJoinedToRoomRequest is a request to QueryServerJoinedToRoom // QueryServerJoinedToRoomRequest is a request to QueryServerJoinedToRoom
type QueryServerJoinedToRoomRequest struct { type QueryServerJoinedToRoomRequest struct {
// Server name of the server to find // Server name of the server to find. If not specified, we will
// default to checking if the local server is joined.
ServerName gomatrixserverlib.ServerName `json:"server_name"` ServerName gomatrixserverlib.ServerName `json:"server_name"`
// ID of the room to see if we are still joined to // ID of the room to see if we are still joined to
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
@ -182,7 +183,8 @@ type QueryServerJoinedToRoomResponse struct {
RoomExists bool `json:"room_exists"` RoomExists bool `json:"room_exists"`
// True if we still believe that we are participating in the room // True if we still believe that we are participating in the room
IsInRoom bool `json:"is_in_room"` IsInRoom bool `json:"is_in_room"`
// List of servers that are also in the room // List of servers that are also in the room. This will not be populated
// if the queried ServerName is the local server name.
ServerNames []gomatrixserverlib.ServerName `json:"server_names"` ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
} }

View file

@ -59,6 +59,7 @@ func NewRoomserverAPI(
Queryer: &query.Queryer{ Queryer: &query.Queryer{
DB: roomserverDB, DB: roomserverDB,
Cache: caches, Cache: caches,
ServerName: cfg.Matrix.ServerName,
ServerACLs: serverACLs, ServerACLs: serverACLs,
}, },
Inputer: &input.Inputer{ Inputer: &input.Inputer{
@ -92,6 +93,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
RSAPI: r, RSAPI: r,
Inputer: r.Inputer, Inputer: r.Inputer,
Queryer: r.Queryer,
} }
r.Peeker = &perform.Peeker{ r.Peeker = &perform.Peeker{
ServerName: r.Cfg.Matrix.ServerName, ServerName: r.Cfg.Matrix.ServerName,

View file

@ -50,6 +50,10 @@ func UpdateToInviteMembership(
return updates, nil return updates, nil
} }
// IsServerCurrentlyInRoom checks if a server is in a given room, based on the room
// memberships. If the servername is not supplied then the local server will be
// checked instead using a faster code path.
// TODO: This should probably be replaced by an API call.
func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
info, err := db.RoomInfo(ctx, roomID) info, err := db.RoomInfo(ctx, roomID)
if err != nil { if err != nil {
@ -59,6 +63,10 @@ func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverNam
return false, fmt.Errorf("unknown room %s", roomID) return false, fmt.Errorf("unknown room %s", roomID)
} }
if serverName == "" {
return db.GetLocalServerInRoom(ctx, info.RoomNID)
}
eventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) eventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
if err != nil { if err != nil {
return false, err return false, err

View file

@ -28,6 +28,7 @@ import (
rsAPI "github.com/matrix-org/dendrite/roomserver/api" rsAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -42,6 +43,7 @@ type Joiner struct {
DB storage.Database DB storage.Database
Inputer *input.Inputer Inputer *input.Inputer
Queryer *query.Queryer
} }
// PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender. // PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender.
@ -205,7 +207,14 @@ func (r *Joiner) performJoinRoomByID(
// Force a federated join if we aren't in the room and we've been // Force a federated join if we aren't in the room and we've been
// given some server names to try joining by. // given some server names to try joining by.
serverInRoom, _ := helpers.IsServerCurrentlyInRoom(ctx, r.DB, r.ServerName, req.RoomIDOrAlias) inRoomReq := &api.QueryServerJoinedToRoomRequest{
RoomID: req.RoomIDOrAlias,
}
inRoomRes := &api.QueryServerJoinedToRoomResponse{}
if err = r.Queryer.QueryServerJoinedToRoom(ctx, inRoomReq, inRoomRes); err != nil {
return "", "", fmt.Errorf("r.Queryer.QueryServerJoinedToRoom: %w", err)
}
serverInRoom := inRoomRes.IsInRoom
forceFederatedJoin := len(req.ServerNames) > 0 && !serverInRoom forceFederatedJoin := len(req.ServerNames) > 0 && !serverInRoom
// Force a federated join if we're dealing with a pending invite // Force a federated join if we're dealing with a pending invite

View file

@ -64,7 +64,14 @@ func (r *Leaver) performLeaveRoomByID(
// that. // that.
isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID) isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID)
if err == nil && isInvitePending { if err == nil && isInvitePending {
return r.performRejectInvite(ctx, req, res, senderUser, eventID) var host gomatrixserverlib.ServerName
_, host, err = gomatrixserverlib.SplitID('@', senderUser)
if err != nil {
return nil, fmt.Errorf("Sender %q is invalid", senderUser)
}
if host != r.Cfg.Matrix.ServerName {
return r.performFederatedRejectInvite(ctx, req, res, senderUser, eventID)
}
} }
// There's no invite pending, so first of all we want to find out // There's no invite pending, so first of all we want to find out
@ -94,9 +101,7 @@ func (r *Leaver) performLeaveRoomByID(
if err != nil { if err != nil {
return nil, fmt.Errorf("Error getting membership: %w", err) return nil, fmt.Errorf("Error getting membership: %w", err)
} }
if membership != gomatrixserverlib.Join { if membership != gomatrixserverlib.Join && membership != gomatrixserverlib.Invite {
// TODO: should be able to handle "invite" in this case too, if
// it's a case of kicking or banning or such
return nil, fmt.Errorf("User %q is not joined to the room (membership is %q)", req.UserID, membership) return nil, fmt.Errorf("User %q is not joined to the room (membership is %q)", req.UserID, membership)
} }
@ -147,7 +152,7 @@ func (r *Leaver) performLeaveRoomByID(
return nil, nil return nil, nil
} }
func (r *Leaver) performRejectInvite( func (r *Leaver) performFederatedRejectInvite(
ctx context.Context, ctx context.Context,
req *api.PerformLeaveRequest, req *api.PerformLeaveRequest,
res *api.PerformLeaveResponse, // nolint:unparam res *api.PerformLeaveResponse, // nolint:unparam

View file

@ -36,6 +36,7 @@ import (
type Queryer struct { type Queryer struct {
DB storage.Database DB storage.Database
Cache caching.RoomServerCaches Cache caching.RoomServerCaches
ServerName gomatrixserverlib.ServerName
ServerACLs *acls.ServerACLs ServerACLs *acls.ServerACLs
} }
@ -328,6 +329,16 @@ func (r *Queryer) QueryServerJoinedToRoom(
} }
response.RoomExists = true response.RoomExists = true
if request.ServerName == r.ServerName || request.ServerName == "" {
var joined bool
joined, err = r.DB.GetLocalServerInRoom(ctx, info.RoomNID)
if err != nil {
return fmt.Errorf("r.DB.GetLocalServerInRoom: %w", err)
}
response.IsInRoom = joined
return nil
}
eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
if err != nil { if err != nil {
return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err) return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err)
@ -377,10 +388,16 @@ func (r *Queryer) QueryServerAllowedToSeeEvent(
return return
} }
roomID := events[0].RoomID() roomID := events[0].RoomID()
isServerInRoom, err := helpers.IsServerCurrentlyInRoom(ctx, r.DB, request.ServerName, roomID)
if err != nil { inRoomReq := &api.QueryServerJoinedToRoomRequest{
return RoomID: roomID,
ServerName: request.ServerName,
} }
inRoomRes := &api.QueryServerJoinedToRoomResponse{}
if err = r.QueryServerJoinedToRoom(ctx, inRoomReq, inRoomRes); err != nil {
return fmt.Errorf("r.Queryer.QueryServerJoinedToRoom: %w", err)
}
info, err := r.DB.RoomInfo(ctx, roomID) info, err := r.DB.RoomInfo(ctx, roomID)
if err != nil { if err != nil {
return err return err
@ -389,7 +406,7 @@ func (r *Queryer) QueryServerAllowedToSeeEvent(
return fmt.Errorf("QueryServerAllowedToSeeEvent: no room info for room %s", roomID) return fmt.Errorf("QueryServerAllowedToSeeEvent: no room info for room %s", roomID)
} }
response.AllowedToSeeEvent, err = helpers.CheckServerAllowedToSeeEvent( response.AllowedToSeeEvent, err = helpers.CheckServerAllowedToSeeEvent(
ctx, r.DB, *info, request.EventID, request.ServerName, isServerInRoom, ctx, r.DB, *info, request.EventID, request.ServerName, inRoomRes.IsInRoom,
) )
return return
} }

View file

@ -154,6 +154,8 @@ type Database interface {
GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error)
// JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear. // JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear.
JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) (map[string]int, error) JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) (map[string]int, error)
// GetLocalServerInRoom returns true if we think we're in a given room or false otherwise.
GetLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error)
// GetKnownUsers searches all users that userID knows about. // GetKnownUsers searches all users that userID knows about.
GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
// GetKnownRooms returns a list of all rooms we know about. // GetKnownRooms returns a list of all rooms we know about.

View file

@ -117,7 +117,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
_roomserver_state_block.event_nid _roomserver_state_block.event_nid
FROM FROM
_roomserver_state_snapshots _roomserver_state_snapshots
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids) LEFT JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
WHERE WHERE
_roomserver_state_snapshots.state_snapshot_nid = ANY ( _roomserver_state_snapshots.state_snapshot_nid = ANY (
SELECT SELECT
@ -140,22 +140,49 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount) logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount)
var snapshots []stateBlockData var snapshots []stateBlockData
var badCreateSnapshots []stateBlockData
for snapshotrows.Next() { for snapshotrows.Next() {
var snapshot stateBlockData var snapshot stateBlockData
var eventsarray pq.Int64Array var eventsarray []sql.NullInt64
if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil { var nulStateBlockNID sql.NullInt64
if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &nulStateBlockNID, pq.Array(&eventsarray)); err != nil {
return fmt.Errorf("rows.Scan: %w", err) return fmt.Errorf("rows.Scan: %w", err)
} }
if nulStateBlockNID.Valid {
snapshot.StateBlockNID = types.StateBlockNID(nulStateBlockNID.Int64)
}
// Dendrite v0.1.0 would not make a state block for the create event, resulting in [NULL] from the query above.
// Remember the snapshot and we'll fill it in after we close this cursor as we can't have 2 queries running at the same time
if len(eventsarray) == 1 && !eventsarray[0].Valid {
badCreateSnapshots = append(badCreateSnapshots, snapshot)
continue
}
for _, e := range eventsarray { for _, e := range eventsarray {
snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e)) if e.Valid {
snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e.Int64))
}
} }
snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)] snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)]
snapshots = append(snapshots, snapshot) snapshots = append(snapshots, snapshot)
} }
if err = snapshotrows.Close(); err != nil { if err = snapshotrows.Close(); err != nil {
return fmt.Errorf("snapshots.Close: %w", err) return fmt.Errorf("snapshots.Close: %w", err)
} }
// fill in bad create snapshots
for _, s := range badCreateSnapshots {
var createEventNID types.EventNID
err = tx.QueryRow(
`SELECT event_nid FROM roomserver_events WHERE state_snapshot_nid = $1 AND event_type_nid = 1`, s.StateSnapshotNID,
).Scan(&createEventNID)
if err != nil {
return fmt.Errorf("cannot xref null state block with snapshot %d: %s", s.StateSnapshotNID, err)
}
if createEventNID == 0 {
return fmt.Errorf("cannot xref null state block with snapshot %d, no create event", s.StateSnapshotNID)
}
s.EventNIDs = append(s.EventNIDs, createEventNID)
snapshots = append(snapshots, s)
}
newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{} newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{}

View file

@ -124,6 +124,14 @@ var selectKnownUsersSQL = "" +
" SELECT DISTINCT room_nid FROM roomserver_membership WHERE target_nid=$1 AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " SELECT DISTINCT room_nid FROM roomserver_membership WHERE target_nid=$1 AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) +
") AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " AND event_state_key LIKE $2 LIMIT $3" ") AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " AND event_state_key LIKE $2 LIMIT $3"
// selectLocalServerInRoomSQL is an optimised case for checking if we, the local server,
// are in the room by using the target_local column of the membership table. Normally when
// we want to know if a server is in a room, we have to unmarshal the entire room state which
// is expensive. The presence of a single row from this query suggests we're still in the
// room, no rows returned suggests we aren't.
const selectLocalServerInRoomSQL = "" +
"SELECT room_nid FROM roomserver_membership WHERE target_local = true AND membership_nid = $1 AND room_nid = $2 LIMIT 1"
type membershipStatements struct { type membershipStatements struct {
insertMembershipStmt *sql.Stmt insertMembershipStmt *sql.Stmt
selectMembershipForUpdateStmt *sql.Stmt selectMembershipForUpdateStmt *sql.Stmt
@ -137,6 +145,7 @@ type membershipStatements struct {
selectJoinedUsersSetForRoomsStmt *sql.Stmt selectJoinedUsersSetForRoomsStmt *sql.Stmt
selectKnownUsersStmt *sql.Stmt selectKnownUsersStmt *sql.Stmt
updateMembershipForgetRoomStmt *sql.Stmt updateMembershipForgetRoomStmt *sql.Stmt
selectLocalServerInRoomStmt *sql.Stmt
} }
func createMembershipTable(db *sql.DB) error { func createMembershipTable(db *sql.DB) error {
@ -160,6 +169,7 @@ func prepareMembershipTable(db *sql.DB) (tables.Membership, error) {
{&s.selectJoinedUsersSetForRoomsStmt, selectJoinedUsersSetForRoomsSQL}, {&s.selectJoinedUsersSetForRoomsStmt, selectJoinedUsersSetForRoomsSQL},
{&s.selectKnownUsersStmt, selectKnownUsersSQL}, {&s.selectKnownUsersStmt, selectKnownUsersSQL},
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom}, {&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -324,3 +334,16 @@ func (s *membershipStatements) UpdateForgetMembership(
) )
return err return err
} }
func (s *membershipStatements) SelectLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) {
var nid types.RoomNID
err := s.selectLocalServerInRoomStmt.QueryRowContext(ctx, tables.MembershipStateJoin, roomNID).Scan(&nid)
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
found := nid > 0
return found, nil
}

View file

@ -64,7 +64,7 @@ const insertStateDataSQL = "" +
const bulkSelectStateBlockEntriesSQL = "" + const bulkSelectStateBlockEntriesSQL = "" +
"SELECT state_block_nid, event_nids" + "SELECT state_block_nid, event_nids" +
" FROM roomserver_state_block WHERE state_block_nid = ANY($1)" " FROM roomserver_state_block WHERE state_block_nid = ANY($1) ORDER BY state_block_nid ASC"
type stateBlockStatements struct { type stateBlockStatements struct {
insertStateDataStmt *sql.Stmt insertStateDataStmt *sql.Stmt

View file

@ -1059,6 +1059,11 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs []string)
return result, nil return result, nil
} }
// GetLocalServerInRoom returns true if we think we're in a given room or false otherwise.
func (d *Database) GetLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) {
return d.MembershipTable.SelectLocalServerInRoom(ctx, roomNID)
}
// GetKnownUsers searches all users that userID knows about. // GetKnownUsers searches all users that userID knows about.
func (d *Database) GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) { func (d *Database) GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) {
stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, userID) stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, userID)

View file

@ -100,6 +100,14 @@ var selectKnownUsersSQL = "" +
" SELECT DISTINCT room_nid FROM roomserver_membership WHERE target_nid=$1 AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " SELECT DISTINCT room_nid FROM roomserver_membership WHERE target_nid=$1 AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) +
") AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " AND event_state_key LIKE $2 LIMIT $3" ") AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " AND event_state_key LIKE $2 LIMIT $3"
// selectLocalServerInRoomSQL is an optimised case for checking if we, the local server,
// are in the room by using the target_local column of the membership table. Normally when
// we want to know if a server is in a room, we have to unmarshal the entire room state which
// is expensive. The presence of a single row from this query suggests we're still in the
// room, no rows returned suggests we aren't.
const selectLocalServerInRoomSQL = "" +
"SELECT room_nid FROM roomserver_membership WHERE target_local = 1 AND membership_nid = $1 AND room_nid = $2 LIMIT 1"
type membershipStatements struct { type membershipStatements struct {
db *sql.DB db *sql.DB
insertMembershipStmt *sql.Stmt insertMembershipStmt *sql.Stmt
@ -113,6 +121,7 @@ type membershipStatements struct {
updateMembershipStmt *sql.Stmt updateMembershipStmt *sql.Stmt
selectKnownUsersStmt *sql.Stmt selectKnownUsersStmt *sql.Stmt
updateMembershipForgetRoomStmt *sql.Stmt updateMembershipForgetRoomStmt *sql.Stmt
selectLocalServerInRoomStmt *sql.Stmt
} }
func createMembershipTable(db *sql.DB) error { func createMembershipTable(db *sql.DB) error {
@ -137,6 +146,7 @@ func prepareMembershipTable(db *sql.DB) (tables.Membership, error) {
{&s.selectRoomsWithMembershipStmt, selectRoomsWithMembershipSQL}, {&s.selectRoomsWithMembershipStmt, selectRoomsWithMembershipSQL},
{&s.selectKnownUsersStmt, selectKnownUsersSQL}, {&s.selectKnownUsersStmt, selectKnownUsersSQL},
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom}, {&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -304,3 +314,16 @@ func (s *membershipStatements) UpdateForgetMembership(
) )
return err return err
} }
func (s *membershipStatements) SelectLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) {
var nid types.RoomNID
err := s.selectLocalServerInRoomStmt.QueryRowContext(ctx, tables.MembershipStateJoin, roomNID).Scan(&nid)
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
found := nid > 0
return found, nil
}

View file

@ -57,7 +57,7 @@ const insertStateDataSQL = `
const bulkSelectStateBlockEntriesSQL = "" + const bulkSelectStateBlockEntriesSQL = "" +
"SELECT state_block_nid, event_nids" + "SELECT state_block_nid, event_nids" +
" FROM roomserver_state_block WHERE state_block_nid IN ($1)" " FROM roomserver_state_block WHERE state_block_nid IN ($1) ORDER BY state_block_nid ASC"
type stateBlockStatements struct { type stateBlockStatements struct {
db *sql.DB db *sql.DB

View file

@ -135,6 +135,7 @@ type Membership interface {
SelectJoinedUsersSetForRooms(ctx context.Context, roomNIDs []types.RoomNID) (map[types.EventStateKeyNID]int, error) SelectJoinedUsersSetForRooms(ctx context.Context, roomNIDs []types.RoomNID) (map[types.EventStateKeyNID]int, error)
SelectKnownUsers(ctx context.Context, userID types.EventStateKeyNID, searchString string, limit int) ([]string, error) SelectKnownUsers(ctx context.Context, userID types.EventStateKeyNID, searchString string, limit int) ([]string, error)
UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error
SelectLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error)
} }
type Published interface { type Published interface {

View file

@ -77,6 +77,7 @@ type BaseDendrite struct {
PublicKeyAPIMux *mux.Router PublicKeyAPIMux *mux.Router
PublicMediaAPIMux *mux.Router PublicMediaAPIMux *mux.Router
InternalAPIMux *mux.Router InternalAPIMux *mux.Router
SynapseAdminMux *mux.Router
UseHTTPAPIs bool UseHTTPAPIs bool
apiHttpClient *http.Client apiHttpClient *http.Client
httpClient *http.Client httpClient *http.Client
@ -199,6 +200,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
PublicKeyAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicKeyPathPrefix).Subrouter().UseEncodedPath(), PublicKeyAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicKeyPathPrefix).Subrouter().UseEncodedPath(),
PublicMediaAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicMediaPathPrefix).Subrouter().UseEncodedPath(), PublicMediaAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicMediaPathPrefix).Subrouter().UseEncodedPath(),
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix("/_synapse/").Subrouter().UseEncodedPath(),
apiHttpClient: &apiClient, apiHttpClient: &apiClient,
httpClient: &client, httpClient: &client,
} }
@ -391,6 +393,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(
externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux) externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux)
externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(federationHandler) externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(federationHandler)
} }
externalRouter.PathPrefix("/_synapse/").Handler(b.SynapseAdminMux)
externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux) externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux)
if internalAddr != NoListener && internalAddr != externalAddr { if internalAddr != NoListener && internalAddr != externalAddr {

View file

@ -57,9 +57,9 @@ type Monolith struct {
} }
// AddAllPublicRoutes attaches all public paths to the given router // AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, mediaMux *mux.Router) { func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, mediaMux, synapseMux *mux.Router) {
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
csMux, &m.Config.ClientAPI, m.AccountDB, csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI, m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,