mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-23 06:41:56 -06:00
Merge branch 'master' of https://github.com/matrix-org/dendrite into add-nats-support
This commit is contained in:
commit
4f1f4c320a
503
cmd/dendrite-upgrade-tests/main.go
Normal file
503
cmd/dendrite-upgrade-tests/main.go
Normal file
|
@ -0,0 +1,503 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/semver/v3"
|
||||
"github.com/codeclysm/extract"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/mount"
|
||||
"github.com/docker/docker/api/types/volume"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/docker/go-connections/nat"
|
||||
)
|
||||
|
||||
var (
|
||||
flagTempDir = flag.String("tmp", "tmp", "Path to temporary directory to dump tarballs to")
|
||||
flagFrom = flag.String("from", "HEAD-1", "The version to start from e.g '0.3.1'. If 'HEAD-N' then starts N versions behind HEAD.")
|
||||
flagTo = flag.String("to", "HEAD", "The version to end on e.g '0.3.3'.")
|
||||
flagBuildConcurrency = flag.Int("build-concurrency", runtime.NumCPU(), "The amount of build concurrency when building images")
|
||||
flagHead = flag.String("head", "", "Location to a dendrite repository to treat as HEAD instead of Github")
|
||||
flagDockerHost = flag.String("docker-host", "localhost", "The hostname of the docker client. 'localhost' if running locally, 'host.docker.internal' if running in Docker.")
|
||||
alphaNumerics = regexp.MustCompile("[^a-zA-Z0-9]+")
|
||||
)
|
||||
|
||||
const HEAD = "HEAD"
|
||||
|
||||
// Embed the Dockerfile to use when building dendrite versions.
|
||||
// We cannot use the dockerfile associated with the repo with each version sadly due to changes in
|
||||
// Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients
|
||||
// due to the error:
|
||||
// When using COPY with more than one source file, the destination must be a directory and end with a /
|
||||
// We need to run a postgres anyway, so use the dockerfile associated with Complement instead.
|
||||
const Dockerfile = `FROM golang:1.13-stretch as build
|
||||
RUN apt-get update && apt-get install -y postgresql
|
||||
WORKDIR /build
|
||||
|
||||
# Copy the build context to the repo as this is the right dendrite code. This is different to the
|
||||
# Complement Dockerfile which wgets a branch.
|
||||
COPY . .
|
||||
|
||||
RUN go build ./cmd/dendrite-monolith-server
|
||||
RUN go build ./cmd/generate-keys
|
||||
RUN go build ./cmd/generate-config
|
||||
RUN ./generate-config --ci > dendrite.yaml
|
||||
RUN ./generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key
|
||||
|
||||
# Replace the connection string with a single postgres DB, using user/db = 'postgres' and no password
|
||||
RUN sed -i "s%connection_string:.*$%connection_string: postgresql://postgres@localhost/postgres?sslmode=disable%g" dendrite.yaml
|
||||
# No password when connecting over localhost
|
||||
RUN sed -i "s%127.0.0.1/32 md5%127.0.0.1/32 trust%g" /etc/postgresql/9.6/main/pg_hba.conf
|
||||
# Bump up max conns for moar concurrency
|
||||
RUN sed -i 's/max_connections = 100/max_connections = 2000/g' /etc/postgresql/9.6/main/postgresql.conf
|
||||
RUN sed -i 's/max_open_conns:.*$/max_open_conns: 100/g' dendrite.yaml
|
||||
|
||||
# This entry script starts postgres, waits for it to be up then starts dendrite
|
||||
RUN echo '\
|
||||
#!/bin/bash -eu \n\
|
||||
pg_lsclusters \n\
|
||||
pg_ctlcluster 9.6 main start \n\
|
||||
\n\
|
||||
until pg_isready \n\
|
||||
do \n\
|
||||
echo "Waiting for postgres"; \n\
|
||||
sleep 1; \n\
|
||||
done \n\
|
||||
\n\
|
||||
sed -i "s/server_name: localhost/server_name: ${SERVER_NAME}/g" dendrite.yaml \n\
|
||||
./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml \n\
|
||||
' > run_dendrite.sh && chmod +x run_dendrite.sh
|
||||
|
||||
ENV SERVER_NAME=localhost
|
||||
EXPOSE 8008 8448
|
||||
CMD /build/run_dendrite.sh `
|
||||
|
||||
const dendriteUpgradeTestLabel = "dendrite_upgrade_test"
|
||||
|
||||
// downloadArchive downloads an arbitrary github archive of the form:
|
||||
// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz
|
||||
// and re-tarballs it without the top-level directory which contains branch information. It inserts
|
||||
// the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that
|
||||
// you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile.
|
||||
// Returns the tarball buffer on success.
|
||||
func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []byte) (*bytes.Buffer, error) {
|
||||
resp, err := cli.Get(archiveURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// nolint:errcheck
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("got HTTP %d", resp.StatusCode)
|
||||
}
|
||||
_ = os.RemoveAll(tmpDir)
|
||||
if err = os.Mkdir(tmpDir, os.ModePerm); err != nil {
|
||||
return nil, fmt.Errorf("failed to make temporary directory: %s", err)
|
||||
}
|
||||
// nolint:errcheck
|
||||
defer os.RemoveAll(tmpDir)
|
||||
// dump the tarball temporarily, stripping the top-level directory
|
||||
err = extract.Archive(context.Background(), resp.Body, tmpDir, func(inPath string) string {
|
||||
// remove top level
|
||||
segments := strings.Split(inPath, "/")
|
||||
return strings.Join(segments[1:], "/")
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// add top level Dockerfile
|
||||
err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err)
|
||||
}
|
||||
// now re-tarball it :/
|
||||
var tarball bytes.Buffer
|
||||
err = compress(tmpDir, &tarball)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &tarball, nil
|
||||
}
|
||||
|
||||
// buildDendrite builds Dendrite on the branchOrTagName given. Returns the image ID or an error
|
||||
func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir, branchOrTagName string) (string, error) {
|
||||
var tarball *bytes.Buffer
|
||||
var err error
|
||||
// If a custom HEAD location is given, use that, else pull from github. Mostly useful for CI
|
||||
// where we want to use the working directory.
|
||||
if branchOrTagName == HEAD && *flagHead != "" {
|
||||
log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead)
|
||||
// add top level Dockerfile
|
||||
err = ioutil.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Custom HEAD: failed to inject /Dockerfile: %w", err)
|
||||
}
|
||||
// now tarball it
|
||||
var buffer bytes.Buffer
|
||||
err = compress(*flagHead, &buffer)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to tarball custom HEAD %s : %s", *flagHead, err)
|
||||
}
|
||||
tarball = &buffer
|
||||
} else {
|
||||
log.Printf("%s: Downloading version %s to %s\n", branchOrTagName, branchOrTagName, tmpDir)
|
||||
// pull an archive, this contains a top-level directory which screws with the build context
|
||||
// which we need to fix up post download
|
||||
u := fmt.Sprintf("https://github.com/matrix-org/dendrite/archive/%s.tar.gz", branchOrTagName)
|
||||
tarball, err = downloadArchive(httpClient, tmpDir, u, []byte(Dockerfile))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to download archive %s: %w", u, err)
|
||||
}
|
||||
log.Printf("%s: %s => %d bytes\n", branchOrTagName, u, tarball.Len())
|
||||
}
|
||||
|
||||
log.Printf("%s: Building version %s\n", branchOrTagName, branchOrTagName)
|
||||
res, err := dockerClient.ImageBuild(context.Background(), tarball, types.ImageBuildOptions{
|
||||
Tags: []string{"dendrite-upgrade"},
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to start building image: %s", err)
|
||||
}
|
||||
// nolint:errcheck
|
||||
defer res.Body.Close()
|
||||
decoder := json.NewDecoder(res.Body)
|
||||
// {"aux":{"ID":"sha256:247082c717963bc2639fc2daed08838d67811ea12356cd4fda43e1ffef94f2eb"}}
|
||||
var imageID string
|
||||
for decoder.More() {
|
||||
var dl struct {
|
||||
Stream string `json:"stream"`
|
||||
Aux map[string]interface{} `json:"aux"`
|
||||
}
|
||||
if err := decoder.Decode(&dl); err != nil {
|
||||
return "", fmt.Errorf("failed to decode build image output line: %w", err)
|
||||
}
|
||||
log.Printf("%s: %s", branchOrTagName, dl.Stream)
|
||||
if dl.Aux != nil {
|
||||
imgID, ok := dl.Aux["ID"]
|
||||
if ok {
|
||||
imageID = imgID.(string)
|
||||
}
|
||||
}
|
||||
}
|
||||
return imageID, nil
|
||||
}
|
||||
|
||||
func getAndSortVersionsFromGithub(httpClient *http.Client) (semVers []*semver.Version, err error) {
|
||||
u := "https://api.github.com/repos/matrix-org/dendrite/tags"
|
||||
res, err := httpClient.Get(u)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("%s returned HTTP %d", u, res.StatusCode)
|
||||
}
|
||||
resp := []struct {
|
||||
Name string `json:"name"`
|
||||
}{}
|
||||
if err = json.NewDecoder(res.Body).Decode(&resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, r := range resp {
|
||||
v, err := semver.NewVersion(r.Name)
|
||||
if err != nil {
|
||||
continue // not a semver, that's ok and isn't an error, we allow tags that aren't semvers
|
||||
}
|
||||
semVers = append(semVers, v)
|
||||
}
|
||||
sort.Sort(semver.Collection(semVers))
|
||||
return semVers, nil
|
||||
}
|
||||
|
||||
func calculateVersions(cli *http.Client, from, to string) []string {
|
||||
semvers, err := getAndSortVersionsFromGithub(cli)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to collect semvers from github: %s", err)
|
||||
}
|
||||
// snip the lower bound depending on --from
|
||||
if from != "" {
|
||||
if strings.HasPrefix(from, "HEAD-") {
|
||||
var headN int
|
||||
headN, err = strconv.Atoi(strings.TrimPrefix(from, "HEAD-"))
|
||||
if err != nil {
|
||||
log.Fatalf("invalid --from, try 'HEAD-1'")
|
||||
}
|
||||
if headN >= len(semvers) {
|
||||
log.Fatalf("only have %d versions, but asked to go to HEAD-%d", len(semvers), headN)
|
||||
}
|
||||
if headN > 0 {
|
||||
semvers = semvers[len(semvers)-headN:]
|
||||
}
|
||||
} else {
|
||||
fromVer, err := semver.NewVersion(from)
|
||||
if err != nil {
|
||||
log.Fatalf("invalid --from: %s", err)
|
||||
}
|
||||
i := 0
|
||||
for i = 0; i < len(semvers); i++ {
|
||||
if semvers[i].LessThan(fromVer) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
semvers = semvers[i:]
|
||||
}
|
||||
}
|
||||
if to != "" && to != HEAD {
|
||||
toVer, err := semver.NewVersion(to)
|
||||
if err != nil {
|
||||
log.Fatalf("invalid --to: %s", err)
|
||||
}
|
||||
var i int
|
||||
for i = len(semvers) - 1; i >= 0; i-- {
|
||||
if semvers[i].GreaterThan(toVer) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
semvers = semvers[:i+1]
|
||||
}
|
||||
var versions []string
|
||||
for _, sv := range semvers {
|
||||
versions = append(versions, sv.Original())
|
||||
}
|
||||
if to == HEAD {
|
||||
versions = append(versions, HEAD)
|
||||
}
|
||||
return versions
|
||||
}
|
||||
|
||||
func buildDendriteImages(httpClient *http.Client, dockerClient *client.Client, baseTempDir string, concurrency int, branchOrTagNames []string) map[string]string {
|
||||
// concurrently build all versions, this can be done in any order. The mutex protects the map
|
||||
branchToImageID := make(map[string]string)
|
||||
var mu sync.Mutex
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(concurrency)
|
||||
ch := make(chan string, len(branchOrTagNames))
|
||||
for _, branchName := range branchOrTagNames {
|
||||
ch <- branchName
|
||||
}
|
||||
close(ch)
|
||||
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for branchName := range ch {
|
||||
tmpDir := baseTempDir + alphaNumerics.ReplaceAllString(branchName, "")
|
||||
imgID, err := buildDendrite(httpClient, dockerClient, tmpDir, branchName)
|
||||
if err != nil {
|
||||
log.Fatalf("%s: failed to build dendrite image: %s", branchName, err)
|
||||
}
|
||||
mu.Lock()
|
||||
branchToImageID[branchName] = imgID
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return branchToImageID
|
||||
}
|
||||
|
||||
func runImage(dockerClient *client.Client, volumeName, version, imageID string) (csAPIURL, containerID string, err error) {
|
||||
log.Printf("%s: running image %s\n", version, imageID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||
defer cancel()
|
||||
body, err := dockerClient.ContainerCreate(ctx, &container.Config{
|
||||
Image: imageID,
|
||||
Env: []string{"SERVER_NAME=hs1"},
|
||||
Labels: map[string]string{
|
||||
dendriteUpgradeTestLabel: "yes",
|
||||
},
|
||||
}, &container.HostConfig{
|
||||
PublishAllPorts: true,
|
||||
Mounts: []mount.Mount{
|
||||
{
|
||||
Type: mount.TypeVolume,
|
||||
Source: volumeName,
|
||||
Target: "/var/lib/postgresql/9.6/main",
|
||||
},
|
||||
},
|
||||
}, nil, nil, "dendrite_upgrade_test_"+version)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to ContainerCreate: %s", err)
|
||||
}
|
||||
containerID = body.ID
|
||||
|
||||
err = dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{})
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to ContainerStart: %s", err)
|
||||
}
|
||||
inspect, err := dockerClient.ContainerInspect(ctx, containerID)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
csapiPortInfo, ok := inspect.NetworkSettings.Ports[nat.Port("8008/tcp")]
|
||||
if !ok {
|
||||
return "", "", fmt.Errorf("port 8008 not exposed - exposed ports: %v", inspect.NetworkSettings.Ports)
|
||||
}
|
||||
baseURL := fmt.Sprintf("http://%s:%s", *flagDockerHost, csapiPortInfo[0].HostPort)
|
||||
versionsURL := fmt.Sprintf("%s/_matrix/client/versions", baseURL)
|
||||
// hit /versions to check it is up
|
||||
var lastErr error
|
||||
for i := 0; i < 500; i++ {
|
||||
res, err := http.Get(versionsURL)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
if res.StatusCode != 200 {
|
||||
lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
lastErr = nil
|
||||
break
|
||||
}
|
||||
if lastErr != nil {
|
||||
logs, err := dockerClient.ContainerLogs(context.Background(), containerID, types.ContainerLogsOptions{
|
||||
ShowStdout: true,
|
||||
ShowStderr: true,
|
||||
})
|
||||
// ignore errors when cannot get logs, it's just for debugging anyways
|
||||
if err == nil {
|
||||
logbody, err := ioutil.ReadAll(logs)
|
||||
if err == nil {
|
||||
log.Printf("Container logs:\n\n%s\n\n", string(logbody))
|
||||
}
|
||||
}
|
||||
}
|
||||
return baseURL, containerID, lastErr
|
||||
}
|
||||
|
||||
func destroyContainer(dockerClient *client.Client, containerID string) {
|
||||
err := dockerClient.ContainerRemove(context.TODO(), containerID, types.ContainerRemoveOptions{
|
||||
Force: true,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("failed to remove container %s : %s", containerID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func loadAndRunTests(dockerClient *client.Client, volumeName, v string, branchToImageID map[string]string) error {
|
||||
csAPIURL, containerID, err := runImage(dockerClient, volumeName, v, branchToImageID[v])
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to run container for branch %v: %v", v, err)
|
||||
}
|
||||
defer destroyContainer(dockerClient, containerID)
|
||||
log.Printf("URL %s -> %s \n", csAPIURL, containerID)
|
||||
if err = runTests(csAPIURL, v); err != nil {
|
||||
return fmt.Errorf("failed to run tests on version %s: %s", v, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyTests(dockerClient *client.Client, volumeName string, versions []string, branchToImageID map[string]string) error {
|
||||
lastVer := versions[len(versions)-1]
|
||||
csAPIURL, containerID, err := runImage(dockerClient, volumeName, lastVer, branchToImageID[lastVer])
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to run container for branch %v: %v", lastVer, err)
|
||||
}
|
||||
defer destroyContainer(dockerClient, containerID)
|
||||
return verifyTestsRan(csAPIURL, versions)
|
||||
}
|
||||
|
||||
// cleanup old containers/volumes from a previous run
|
||||
func cleanup(dockerClient *client.Client) {
|
||||
// ignore all errors, we are just cleaning up and don't want to fail just because we fail to cleanup
|
||||
containers, _ := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{
|
||||
Filters: label(dendriteUpgradeTestLabel),
|
||||
})
|
||||
for _, c := range containers {
|
||||
s := time.Second
|
||||
_ = dockerClient.ContainerStop(context.Background(), c.ID, &s)
|
||||
_ = dockerClient.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{
|
||||
Force: true,
|
||||
})
|
||||
}
|
||||
_ = dockerClient.VolumeRemove(context.Background(), "dendrite_upgrade_test", true)
|
||||
}
|
||||
|
||||
func label(in string) filters.Args {
|
||||
f := filters.NewArgs()
|
||||
f.Add("label", in)
|
||||
return f
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
httpClient := &http.Client{
|
||||
Timeout: 60 * time.Second,
|
||||
}
|
||||
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to make docker client: %s", err)
|
||||
}
|
||||
if *flagFrom == "" {
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
cleanup(dockerClient)
|
||||
versions := calculateVersions(httpClient, *flagFrom, *flagTo)
|
||||
log.Printf("Testing dendrite versions: %v\n", versions)
|
||||
|
||||
branchToImageID := buildDendriteImages(httpClient, dockerClient, *flagTempDir, *flagBuildConcurrency, versions)
|
||||
|
||||
// make a shared postgres volume
|
||||
volume, err := dockerClient.VolumeCreate(context.Background(), volume.VolumeCreateBody{
|
||||
Name: "dendrite_upgrade_test",
|
||||
Labels: map[string]string{
|
||||
dendriteUpgradeTestLabel: "yes",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("failed to make docker volume: %s", err)
|
||||
}
|
||||
|
||||
failed := false
|
||||
defer func() {
|
||||
perr := recover()
|
||||
log.Println("removing postgres volume")
|
||||
verr := dockerClient.VolumeRemove(context.Background(), volume.Name, true)
|
||||
if perr == nil {
|
||||
perr = verr
|
||||
}
|
||||
if perr != nil {
|
||||
panic(perr)
|
||||
}
|
||||
if failed {
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
|
||||
// run through images sequentially
|
||||
for _, v := range versions {
|
||||
if err = loadAndRunTests(dockerClient, volume.Name, v, branchToImageID); err != nil {
|
||||
log.Printf("failed to run tests for %v: %s\n", v, err)
|
||||
failed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := verifyTests(dockerClient, volume.Name, versions, branchToImageID); err != nil {
|
||||
log.Printf("failed to verify test results: %s", err)
|
||||
failed = true
|
||||
}
|
||||
}
|
63
cmd/dendrite-upgrade-tests/tar.go
Normal file
63
cmd/dendrite-upgrade-tests/tar.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// From https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726
|
||||
// Modified to strip off top-level when compressing
|
||||
func compress(src string, buf io.Writer) error {
|
||||
// tar > gzip > buf
|
||||
zr := gzip.NewWriter(buf)
|
||||
tw := tar.NewWriter(zr)
|
||||
|
||||
// walk through every file in the folder
|
||||
err := filepath.Walk(src, func(file string, fi os.FileInfo, e error) error {
|
||||
// generate tar header
|
||||
header, err := tar.FileInfoHeader(fi, file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// must provide real name
|
||||
// (see https://golang.org/src/archive/tar/common.go?#L626)
|
||||
header.Name = strings.TrimPrefix(filepath.ToSlash(file), src+"/")
|
||||
// write header
|
||||
if err := tw.WriteHeader(header); err != nil {
|
||||
return err
|
||||
}
|
||||
// if not a dir, write file content
|
||||
if !fi.IsDir() {
|
||||
data, err := os.Open(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = io.Copy(tw, data); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = data.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// produce tar
|
||||
if err := tw.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
// produce gzip
|
||||
if err := zr.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
//
|
||||
return nil
|
||||
}
|
192
cmd/dendrite-upgrade-tests/tests.go
Normal file
192
cmd/dendrite-upgrade-tests/tests.go
Normal file
|
@ -0,0 +1,192 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const userPassword = "this_is_a_long_password"
|
||||
|
||||
type user struct {
|
||||
userID string
|
||||
localpart string
|
||||
client *gomatrix.Client
|
||||
}
|
||||
|
||||
// runTests performs the following operations:
|
||||
// - register alice and bob with branch name muxed into the localpart
|
||||
// - create a DM room for the 2 users and exchange messages
|
||||
// - create/join a public #global room and exchange messages
|
||||
func runTests(baseURL, branchName string) error {
|
||||
// register 2 users
|
||||
users := []user{
|
||||
{
|
||||
localpart: "alice" + branchName,
|
||||
},
|
||||
{
|
||||
localpart: "bob" + branchName,
|
||||
},
|
||||
}
|
||||
for i, u := range users {
|
||||
client, err := gomatrix.NewClient(baseURL, "", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := client.RegisterDummy(&gomatrix.ReqRegister{
|
||||
Username: strings.ToLower(u.localpart),
|
||||
Password: userPassword,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register %s: %s", u.localpart, err)
|
||||
}
|
||||
client, err = gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
users[i].client = client
|
||||
users[i].userID = resp.UserID
|
||||
}
|
||||
|
||||
// create DM room, join it and exchange messages
|
||||
createRoomResp, err := users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{
|
||||
Preset: "trusted_private_chat",
|
||||
Invite: []string{users[1].userID},
|
||||
IsDirect: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create DM room: %s", err)
|
||||
}
|
||||
dmRoomID := createRoomResp.RoomID
|
||||
if _, err = users[1].client.JoinRoom(dmRoomID, "", nil); err != nil {
|
||||
return fmt.Errorf("failed to join DM room: %s", err)
|
||||
}
|
||||
msgs := []struct {
|
||||
client *gomatrix.Client
|
||||
text string
|
||||
}{
|
||||
{
|
||||
client: users[0].client, text: "1: " + branchName,
|
||||
},
|
||||
{
|
||||
client: users[1].client, text: "2: " + branchName,
|
||||
},
|
||||
{
|
||||
client: users[0].client, text: "3: " + branchName,
|
||||
},
|
||||
{
|
||||
client: users[1].client, text: "4: " + branchName,
|
||||
},
|
||||
}
|
||||
for _, msg := range msgs {
|
||||
_, err = msg.client.SendText(dmRoomID, msg.text)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send text in dm room: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// attempt to create/join the shared public room
|
||||
publicRoomID := ""
|
||||
createRoomResp, err = users[0].client.CreateRoom(&gomatrix.ReqCreateRoom{
|
||||
RoomAliasName: "global",
|
||||
Preset: "public_chat",
|
||||
})
|
||||
if err != nil { // this is okay and expected if the room already exists and the aliases clash
|
||||
// try to join it
|
||||
_, domain, err2 := gomatrixserverlib.SplitID('@', users[0].userID)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("failed to split user ID: %s, %s", users[0].userID, err2)
|
||||
}
|
||||
joinRoomResp, err2 := users[0].client.JoinRoom(fmt.Sprintf("#global:%s", domain), "", nil)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("alice failed to join public room: %s", err2)
|
||||
}
|
||||
publicRoomID = joinRoomResp.RoomID
|
||||
} else {
|
||||
publicRoomID = createRoomResp.RoomID
|
||||
}
|
||||
if _, err = users[1].client.JoinRoom(publicRoomID, "", nil); err != nil {
|
||||
return fmt.Errorf("bob failed to join public room: %s", err)
|
||||
}
|
||||
// send messages
|
||||
for _, msg := range msgs {
|
||||
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send text in public room: %s", err)
|
||||
}
|
||||
}
|
||||
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyTestsRan checks that the HS has the right rooms/messages
|
||||
func verifyTestsRan(baseURL string, branchNames []string) error {
|
||||
log.Println("Verifying tests....")
|
||||
// check we can login as all users
|
||||
var resp *gomatrix.RespLogin
|
||||
for _, branchName := range branchNames {
|
||||
client, err := gomatrix.NewClient(baseURL, "", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userLocalparts := []string{
|
||||
"alice" + branchName,
|
||||
"bob" + branchName,
|
||||
}
|
||||
for _, userLocalpart := range userLocalparts {
|
||||
resp, err = client.Login(&gomatrix.ReqLogin{
|
||||
Type: "m.login.password",
|
||||
User: strings.ToLower(userLocalpart),
|
||||
Password: userPassword,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to login as %s: %s", userLocalpart, err)
|
||||
}
|
||||
if resp.AccessToken == "" {
|
||||
return fmt.Errorf("failed to login, bad response: %+v", resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Println(" accounts exist: OK")
|
||||
client, err := gomatrix.NewClient(baseURL, resp.UserID, resp.AccessToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', client.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
u := client.BuildURL("directory", "room", fmt.Sprintf("#global:%s", domain))
|
||||
r := struct {
|
||||
RoomID string `json:"room_id"`
|
||||
}{}
|
||||
err = client.MakeRequest("GET", u, nil, &r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to /directory: %s", err)
|
||||
}
|
||||
if r.RoomID == "" {
|
||||
return fmt.Errorf("/directory lookup returned no room ID")
|
||||
}
|
||||
log.Println(" public room exists: OK")
|
||||
|
||||
history, err := client.Messages(r.RoomID, client.Store.LoadNextBatch(client.UserID), "", 'b', 100)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get /messages: %s", err)
|
||||
}
|
||||
// we expect 4 messages per version
|
||||
msgCount := 0
|
||||
for _, ev := range history.Chunk {
|
||||
if ev.Type == "m.room.message" {
|
||||
msgCount += 1
|
||||
}
|
||||
}
|
||||
wantMsgCount := len(branchNames) * 4
|
||||
if msgCount != wantMsgCount {
|
||||
return fmt.Errorf("got %d messages in global room, want %d", msgCount, wantMsgCount)
|
||||
}
|
||||
log.Println(" messages exist: OK")
|
||||
return nil
|
||||
}
|
|
@ -232,7 +232,8 @@ type txnReq struct {
|
|||
// something that can tell us about which servers are in a room right now
|
||||
servers federationAPI.ServersInRoomProvider
|
||||
// a list of events from the auth and prev events which we already had
|
||||
hadEvents map[string]bool
|
||||
hadEvents map[string]bool
|
||||
hadEventsMutex sync.Mutex
|
||||
// local cache of events for auth checks, etc - this may include events
|
||||
// which the roomserver is unaware of.
|
||||
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
|
||||
|
@ -240,6 +241,12 @@ type txnReq struct {
|
|||
work string // metrics
|
||||
}
|
||||
|
||||
func (t *txnReq) hadEvent(eventID string, had bool) {
|
||||
t.hadEventsMutex.Lock()
|
||||
defer t.hadEventsMutex.Unlock()
|
||||
t.hadEvents[eventID] = had
|
||||
}
|
||||
|
||||
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
||||
type txnFederationClient interface {
|
||||
LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
|
@ -595,10 +602,10 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
|
|||
// Prepare a map of all the events we already had before this point, so
|
||||
// that we don't send them to the roomserver again.
|
||||
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
|
||||
t.hadEvents[eventID] = true
|
||||
t.hadEvent(eventID, true)
|
||||
}
|
||||
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
|
||||
t.hadEvents[eventID] = false
|
||||
t.hadEvent(eventID, false)
|
||||
}
|
||||
|
||||
if len(stateResp.MissingAuthEventIDs) > 0 {
|
||||
|
@ -673,7 +680,7 @@ withNextEvent:
|
|||
); err != nil {
|
||||
return fmt.Errorf("api.SendEvents: %w", err)
|
||||
}
|
||||
t.hadEvents[ev.EventID()] = true // if the roomserver didn't know about the event before, it does now
|
||||
t.hadEvent(ev.EventID(), true) // if the roomserver didn't know about the event before, it does now
|
||||
t.cacheAndReturn(ev.Headered(stateResp.RoomVersion))
|
||||
delete(missingAuthEvents, missingAuthEventID)
|
||||
continue withNextEvent
|
||||
|
@ -801,14 +808,23 @@ func (t *txnReq) processEventWithMissingState(
|
|||
|
||||
// First of all, send the backward extremity into the roomserver with the
|
||||
// newly resolved state. This marks the "oldest" point in the backfill and
|
||||
// sets the baseline state for any new events after this.
|
||||
// sets the baseline state for any new events after this. We'll make a
|
||||
// copy of the hadEvents map so that it can be taken downstream without
|
||||
// worrying about concurrent map reads/writes, since t.hadEvents is meant
|
||||
// to be protected by a mutex.
|
||||
hadEvents := map[string]bool{}
|
||||
t.hadEventsMutex.Lock()
|
||||
for k, v := range t.hadEvents {
|
||||
hadEvents[k] = v
|
||||
}
|
||||
t.hadEventsMutex.Unlock()
|
||||
err = api.SendEventWithState(
|
||||
context.Background(),
|
||||
t.rsAPI,
|
||||
api.KindOld,
|
||||
resolvedState,
|
||||
backwardsExtremity.Headered(roomVersion),
|
||||
t.hadEvents,
|
||||
hadEvents,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("api.SendEventWithState: %w", err)
|
||||
|
@ -904,7 +920,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
|||
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
|
||||
// processEvent request, which is better for memory.
|
||||
stateEvents[i] = t.cacheAndReturn(ev)
|
||||
t.hadEvents[ev.EventID()] = true
|
||||
t.hadEvent(ev.EventID(), true)
|
||||
}
|
||||
// we should never access res.StateEvents again so we delete it here to make GC faster
|
||||
res.StateEvents = nil
|
||||
|
@ -939,7 +955,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
|||
}
|
||||
for i, ev := range queryRes.Events {
|
||||
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
|
||||
t.hadEvents[ev.EventID()] = true
|
||||
t.hadEvent(ev.EventID(), true)
|
||||
}
|
||||
queryRes.Events = nil
|
||||
}
|
||||
|
@ -1016,7 +1032,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
|
|||
latestEvents := make([]string, len(res.LatestEvents))
|
||||
for i, ev := range res.LatestEvents {
|
||||
latestEvents[i] = res.LatestEvents[i].EventID
|
||||
t.hadEvents[ev.EventID] = true
|
||||
t.hadEvent(ev.EventID, true)
|
||||
}
|
||||
|
||||
var missingResp *gomatrixserverlib.RespMissingEvents
|
||||
|
@ -1152,7 +1168,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
|||
}
|
||||
for i, ev := range queryRes.Events {
|
||||
queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i])
|
||||
t.hadEvents[ev.EventID()] = true
|
||||
t.hadEvent(ev.EventID(), true)
|
||||
evID := queryRes.Events[i].EventID()
|
||||
if missing[evID] {
|
||||
delete(missing, evID)
|
||||
|
|
8
go.mod
8
go.mod
|
@ -3,13 +3,20 @@ module github.com/matrix-org/dendrite
|
|||
require (
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
|
||||
github.com/Masterminds/semver/v3 v3.1.1
|
||||
github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86
|
||||
github.com/Shopify/sarama v1.29.0
|
||||
github.com/codeclysm/extract v2.2.0+incompatible
|
||||
github.com/containerd/containerd v1.5.2 // indirect
|
||||
github.com/docker/docker v20.10.7+incompatible
|
||||
github.com/docker/go-connections v0.4.0
|
||||
github.com/getsentry/sentry-go v0.10.0
|
||||
github.com/gologme/log v1.2.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/h2non/filetype v1.1.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 // indirect
|
||||
github.com/klauspost/compress v1.13.0 // indirect
|
||||
github.com/lib/pq v1.9.0
|
||||
github.com/libp2p/go-libp2p v0.13.0
|
||||
|
@ -30,6 +37,7 @@ require (
|
|||
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/nats-io/nats.go v1.11.0
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
|
||||
|
|
|
@ -73,9 +73,9 @@ func callerPrettyfier(f *runtime.Frame) (string, string) {
|
|||
// Append a newline + tab to it to move the actual log content to its own line
|
||||
funcname += "\n\t"
|
||||
|
||||
// Surround the filepath in brackets and append line number so IDEs can quickly
|
||||
// navigate
|
||||
filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line)
|
||||
// Use a shortened file path which just has the filename to avoid having lots of redundant
|
||||
// directories which contribute significantly to overall log sizes!
|
||||
filename := fmt.Sprintf(" [%s:%d]", path.Base(f.File), f.Line)
|
||||
|
||||
return funcname, filename
|
||||
}
|
||||
|
|
|
@ -64,7 +64,14 @@ func (r *Leaver) performLeaveRoomByID(
|
|||
// that.
|
||||
isInvitePending, senderUser, eventID, err := helpers.IsInvitePending(ctx, r.DB, req.RoomID, req.UserID)
|
||||
if err == nil && isInvitePending {
|
||||
return r.performRejectInvite(ctx, req, res, senderUser, eventID)
|
||||
var host gomatrixserverlib.ServerName
|
||||
_, host, err = gomatrixserverlib.SplitID('@', senderUser)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Sender %q is invalid", senderUser)
|
||||
}
|
||||
if host != r.Cfg.Matrix.ServerName {
|
||||
return r.performFederatedRejectInvite(ctx, req, res, senderUser, eventID)
|
||||
}
|
||||
}
|
||||
|
||||
// There's no invite pending, so first of all we want to find out
|
||||
|
@ -94,9 +101,7 @@ func (r *Leaver) performLeaveRoomByID(
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("Error getting membership: %w", err)
|
||||
}
|
||||
if membership != gomatrixserverlib.Join {
|
||||
// TODO: should be able to handle "invite" in this case too, if
|
||||
// it's a case of kicking or banning or such
|
||||
if membership != gomatrixserverlib.Join && membership != gomatrixserverlib.Invite {
|
||||
return nil, fmt.Errorf("User %q is not joined to the room (membership is %q)", req.UserID, membership)
|
||||
}
|
||||
|
||||
|
@ -147,7 +152,7 @@ func (r *Leaver) performLeaveRoomByID(
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (r *Leaver) performRejectInvite(
|
||||
func (r *Leaver) performFederatedRejectInvite(
|
||||
ctx context.Context,
|
||||
req *api.PerformLeaveRequest,
|
||||
res *api.PerformLeaveResponse, // nolint:unparam
|
||||
|
|
|
@ -117,7 +117,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
|||
_roomserver_state_block.event_nid
|
||||
FROM
|
||||
_roomserver_state_snapshots
|
||||
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
|
||||
LEFT JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
|
||||
WHERE
|
||||
_roomserver_state_snapshots.state_snapshot_nid = ANY (
|
||||
SELECT
|
||||
|
@ -140,22 +140,49 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
|||
logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount)
|
||||
var snapshots []stateBlockData
|
||||
|
||||
var badCreateSnapshots []stateBlockData
|
||||
for snapshotrows.Next() {
|
||||
var snapshot stateBlockData
|
||||
var eventsarray pq.Int64Array
|
||||
if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil {
|
||||
var eventsarray []sql.NullInt64
|
||||
var nulStateBlockNID sql.NullInt64
|
||||
if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &nulStateBlockNID, pq.Array(&eventsarray)); err != nil {
|
||||
return fmt.Errorf("rows.Scan: %w", err)
|
||||
}
|
||||
if nulStateBlockNID.Valid {
|
||||
snapshot.StateBlockNID = types.StateBlockNID(nulStateBlockNID.Int64)
|
||||
}
|
||||
// Dendrite v0.1.0 would not make a state block for the create event, resulting in [NULL] from the query above.
|
||||
// Remember the snapshot and we'll fill it in after we close this cursor as we can't have 2 queries running at the same time
|
||||
if len(eventsarray) == 1 && !eventsarray[0].Valid {
|
||||
badCreateSnapshots = append(badCreateSnapshots, snapshot)
|
||||
continue
|
||||
}
|
||||
for _, e := range eventsarray {
|
||||
snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e))
|
||||
if e.Valid {
|
||||
snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e.Int64))
|
||||
}
|
||||
}
|
||||
snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)]
|
||||
snapshots = append(snapshots, snapshot)
|
||||
}
|
||||
|
||||
if err = snapshotrows.Close(); err != nil {
|
||||
return fmt.Errorf("snapshots.Close: %w", err)
|
||||
}
|
||||
// fill in bad create snapshots
|
||||
for _, s := range badCreateSnapshots {
|
||||
var createEventNID types.EventNID
|
||||
err = tx.QueryRow(
|
||||
`SELECT event_nid FROM roomserver_events WHERE state_snapshot_nid = $1 AND event_type_nid = 1`, s.StateSnapshotNID,
|
||||
).Scan(&createEventNID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot xref null state block with snapshot %d: %s", s.StateSnapshotNID, err)
|
||||
}
|
||||
if createEventNID == 0 {
|
||||
return fmt.Errorf("cannot xref null state block with snapshot %d, no create event", s.StateSnapshotNID)
|
||||
}
|
||||
s.EventNIDs = append(s.EventNIDs, createEventNID)
|
||||
snapshots = append(snapshots, s)
|
||||
}
|
||||
|
||||
newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{}
|
||||
|
||||
|
|
Loading…
Reference in a new issue