Merge origin main

This commit is contained in:
Brian Meek 2022-07-27 10:15:07 -07:00
commit 1d6febf5d6
No known key found for this signature in database
GPG key ID: ACBD71263BF42D00
84 changed files with 1007 additions and 1178 deletions

View file

@ -223,6 +223,31 @@ jobs:
- name: Test upgrade
run: ./dendrite-upgrade-tests --head .
# run database upgrade tests, skipping over one version
upgrade_test_direct:
name: Upgrade tests from HEAD-2
timeout-minutes: 20
needs: initial-tests-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup go
uses: actions/setup-go@v2
with:
go-version: "1.18"
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-upgrade-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-upgrade
- name: Build upgrade-tests
run: go build ./cmd/dendrite-upgrade-tests
- name: Test upgrade
run: ./dendrite-upgrade-tests -direct -from HEAD-2 --head .
# run Sytest in different variations
sytest:
timeout-minutes: 20
@ -359,7 +384,7 @@ jobs:
integration-tests-done:
name: Integration tests passed
needs: [initial-tests-done, upgrade_test, sytest, complement]
needs: [initial-tests-done, upgrade_test, upgrade_test_direct, sytest, complement]
runs-on: ubuntu-latest
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
steps:

View file

@ -8,7 +8,6 @@ COPY . /build
RUN mkdir -p bin
RUN go build -trimpath -o bin/ ./cmd/dendrite-monolith-server
RUN go build -trimpath -o bin/ ./cmd/goose
RUN go build -trimpath -o bin/ ./cmd/create-account
RUN go build -trimpath -o bin/ ./cmd/generate-keys

View file

@ -8,7 +8,6 @@ COPY . /build
RUN mkdir -p bin
RUN go build -trimpath -o bin/ ./cmd/dendrite-polylith-multi
RUN go build -trimpath -o bin/ ./cmd/goose
RUN go build -trimpath -o bin/ ./cmd/create-account
RUN go build -trimpath -o bin/ ./cmd/generate-keys

View file

@ -48,7 +48,6 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
@ -59,6 +58,7 @@ func AddPublicRoutes(
routing.Setup(
base.PublicClientAPIMux,
base.PublicWellKnownAPIMux,
base.SynapseAdminMux,
base.DendriteAdminMux,
cfg, rsAPI, asAPI,

View file

@ -21,7 +21,6 @@ import (
"strconv"
"time"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@ -32,7 +31,6 @@ import (
// SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct {
TopicClientData string
TopicReceiptEvent string
TopicSendToDeviceEvent string
TopicTypingEvent string
@ -42,36 +40,6 @@ type SyncAPIProducer struct {
UserAPI userapi.ClientUserAPI
}
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error {
m := &nats.Msg{
Subject: p.TopicClientData,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
data := eventutil.AccountData{
RoomID: roomID,
Type: dataType,
ReadMarker: readMarker,
IgnoredUsers: ignoredUsers,
}
var err error
m.Data, err = json.Marshal(data)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
}).Tracef("Producing to topic '%s'", p.TopicClientData)
_, err = p.JetStream.PublishMsg(m)
return err
}
func (p *SyncAPIProducer) SendReceipt(
ctx context.Context,
userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
@ -127,18 +126,6 @@ func SaveAccountData(
return util.ErrorResponse(err)
}
var ignoredUsers *types.IgnoredUsers
if dataType == "m.ignored_user_list" {
ignoredUsers = &types.IgnoredUsers{}
_ = json.Unmarshal(body, ignoredUsers)
}
// TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, dataType, nil, ignoredUsers); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
@ -191,11 +178,6 @@ func SaveReadMarker(
return util.ErrorResponse(err)
}
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}
// Handle the read receipt that may be included in the read marker
if r.Read != "" {
return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read)

View file

@ -18,8 +18,6 @@ import (
"encoding/json"
"net/http"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
@ -98,10 +96,6 @@ func PutTag(
return jsonerror.InternalServerError()
}
if err = syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
@ -150,11 +144,6 @@ func DeleteTag(
return jsonerror.InternalServerError()
}
// TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},

View file

@ -48,7 +48,7 @@ import (
// applied:
// nolint: gocyclo
func Setup(
publicAPIMux, synapseAdminRouter, dendriteAdminRouter *mux.Router,
publicAPIMux, wkMux, synapseAdminRouter, dendriteAdminRouter *mux.Router,
cfg *config.ClientAPI,
rsAPI roomserverAPI.ClientRoomserverAPI,
asAPI appserviceAPI.AppServiceInternalAPI,
@ -74,6 +74,26 @@ func Setup(
unstableFeatures["org.matrix."+msc] = true
}
if cfg.Matrix.WellKnownClientName != "" {
logrus.Infof("Setting m.homeserver base_url as %s at /.well-known/matrix/client", cfg.Matrix.WellKnownClientName)
wkMux.Handle("/client", httputil.MakeExternalAPI("wellknown", func(r *http.Request) util.JSONResponse {
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct {
HomeserverName struct {
BaseUrl string `json:"base_url"`
} `json:"m.homeserver"`
}{
HomeserverName: struct {
BaseUrl string `json:"base_url"`
}{
BaseUrl: cfg.Matrix.WellKnownClientName,
},
},
}
})).Methods(http.MethodGet, http.MethodOptions)
}
publicAPIMux.Handle("/versions",
httputil.MakeExternalAPI("versions", func(req *http.Request) util.JSONResponse {
return util.JSONResponse{

View file

@ -37,6 +37,7 @@ var (
flagBuildConcurrency = flag.Int("build-concurrency", runtime.NumCPU(), "The amount of build concurrency when building images")
flagHead = flag.String("head", "", "Location to a dendrite repository to treat as HEAD instead of Github")
flagDockerHost = flag.String("docker-host", "localhost", "The hostname of the docker client. 'localhost' if running locally, 'host.docker.internal' if running in Docker.")
flagDirect = flag.Bool("direct", false, "If a direct upgrade from the defined FROM version to TO should be done")
alphaNumerics = regexp.MustCompile("[^a-zA-Z0-9]+")
)
@ -229,7 +230,7 @@ func getAndSortVersionsFromGithub(httpClient *http.Client) (semVers []*semver.Ve
return semVers, nil
}
func calculateVersions(cli *http.Client, from, to string) []string {
func calculateVersions(cli *http.Client, from, to string, direct bool) []string {
semvers, err := getAndSortVersionsFromGithub(cli)
if err != nil {
log.Fatalf("failed to collect semvers from github: %s", err)
@ -284,6 +285,9 @@ func calculateVersions(cli *http.Client, from, to string) []string {
if to == HEAD {
versions = append(versions, HEAD)
}
if direct {
versions = []string{versions[0], versions[len(versions)-1]}
}
return versions
}
@ -461,7 +465,7 @@ func main() {
os.Exit(1)
}
cleanup(dockerClient)
versions := calculateVersions(httpClient, *flagFrom, *flagTo)
versions := calculateVersions(httpClient, *flagFrom, *flagTo, *flagDirect)
log.Printf("Testing dendrite versions: %v\n", versions)
branchToImageID := buildDendriteImages(httpClient, dockerClient, *flagTempDir, *flagBuildConcurrency, versions)

View file

@ -1,109 +0,0 @@
## Database migrations
We use [goose](https://github.com/pressly/goose) to handle database migrations. This allows us to execute
both SQL deltas (e.g `ALTER TABLE ...`) as well as manipulate data in the database in Go using Go functions.
To run a migration, the `goose` binary in this directory needs to be built:
```
$ go build ./cmd/goose
```
This binary allows Dendrite databases to be upgraded and downgraded. Sample usage for upgrading the roomserver database:
```
# for sqlite
$ ./goose -dir roomserver/storage/sqlite3/deltas sqlite3 ./roomserver.db up
# for postgres
$ ./goose -dir roomserver/storage/postgres/deltas postgres "user=dendrite dbname=dendrite sslmode=disable" up
```
For a full list of options, including rollbacks, see https://github.com/pressly/goose or use `goose` with no args.
### Rationale
Dendrite creates tables on startup using `CREATE TABLE IF NOT EXISTS`, so you might think that we should also
apply version upgrades on startup as well. This is convenient and doesn't involve an additional binary to run
which complicates upgrades. However, combining the upgrade mechanism and the server binary makes it difficult
to handle rollbacks. Firstly, how do you specify you wish to rollback? We would have to add additional flags
to the main server binary to say "rollback to version X". Secondly, if you roll back the server binary from
version 5 to version 4, the version 4 binary doesn't know how to rollback the database from version 5 to
version 4! For these reasons, we prefer to have a separate "upgrade" binary which is run for database upgrades.
Rather than roll-our-own migration tool, we decided to use [goose](https://github.com/pressly/goose) as it supports
complex migrations in Go code in addition to just executing SQL deltas. Other alternatives like
`github.com/golang-migrate/migrate` [do not support](https://github.com/golang-migrate/migrate/issues/15) these
kinds of complex migrations.
### Adding new deltas
You can add `.sql` or `.go` files manually or you can use goose to create them for you.
If you only want to add a SQL delta then run:
```
$ ./goose -dir serverkeyapi/storage/sqlite3/deltas sqlite3 ./foo.db create new_col sql
2020/09/09 14:37:43 Created new file: serverkeyapi/storage/sqlite3/deltas/20200909143743_new_col.sql
```
In this case, the version number is `20200909143743`. The important thing is that it is always increasing.
Then add up/downgrade SQL commands to the created file which looks like:
```sql
-- +goose Up
-- +goose StatementBegin
SELECT 'up SQL query';
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query';
-- +goose StatementEnd
```
You __must__ keep the `+goose` annotations. You'll need to repeat this process for Postgres.
For complex Go migrations:
```
$ ./goose -dir serverkeyapi/storage/sqlite3/deltas sqlite3 ./foo.db create complex_update go
2020/09/09 14:40:38 Created new file: serverkeyapi/storage/sqlite3/deltas/20200909144038_complex_update.go
```
Then modify the created `.go` file which looks like:
```go
package migrations
import (
"database/sql"
"fmt"
"github.com/pressly/goose"
)
func init() {
goose.AddMigration(upComplexUpdate, downComplexUpdate)
}
func upComplexUpdate(tx *sql.Tx) error {
// This code is executed when the migration is applied.
return nil
}
func downComplexUpdate(tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
```
You __must__ import the package in `/cmd/goose/main.go` so `func init()` gets called.
#### Database limitations
- SQLite3 does NOT support `ALTER TABLE table_name DROP COLUMN` - you would have to rename the column or drop the table
entirely and recreate it. ([example](https://github.com/matrix-org/dendrite/blob/master/userapi/storage/accounts/sqlite3/deltas/20200929203058_is_active.sql))
More information: [sqlite.org](https://www.sqlite.org/lang_altertable.html)

View file

@ -1,154 +0,0 @@
// This is custom goose binary
package main
import (
"flag"
"fmt"
"log"
"os"
"github.com/pressly/goose"
pgusers "github.com/matrix-org/dendrite/userapi/storage/postgres/deltas"
slusers "github.com/matrix-org/dendrite/userapi/storage/sqlite3/deltas"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
)
const (
AppService = "appservice"
FederationSender = "federationapi"
KeyServer = "keyserver"
MediaAPI = "mediaapi"
RoomServer = "roomserver"
SigningKeyServer = "signingkeyserver"
SyncAPI = "syncapi"
UserAPI = "userapi"
)
var (
dir = flags.String("dir", "", "directory with migration files")
flags = flag.NewFlagSet("goose", flag.ExitOnError)
component = flags.String("component", "", "dendrite component name")
knownDBs = []string{
AppService, FederationSender, KeyServer, MediaAPI, RoomServer, SigningKeyServer, SyncAPI, UserAPI,
}
)
// nolint: gocyclo
func main() {
err := flags.Parse(os.Args[1:])
if err != nil {
panic(err.Error())
}
args := flags.Args()
if len(args) < 3 {
fmt.Println(
`Usage: goose [OPTIONS] DRIVER DBSTRING COMMAND
Drivers:
postgres
sqlite3
Examples:
goose -component roomserver sqlite3 ./roomserver.db status
goose -component roomserver sqlite3 ./roomserver.db up
goose -component roomserver postgres "user=dendrite dbname=dendrite sslmode=disable" status
Options:
-component string
Dendrite component name e.g roomserver, signingkeyserver, clientapi, syncapi
-table string
migrations table name (default "goose_db_version")
-h print help
-v enable verbose mode
-dir string
directory with migration files, only relevant when creating new migrations.
-version
print version
Commands:
up Migrate the DB to the most recent version available
up-by-one Migrate the DB up by 1
up-to VERSION Migrate the DB to a specific VERSION
down Roll back the version by 1
down-to VERSION Roll back to a specific VERSION
redo Re-run the latest migration
reset Roll back all migrations
status Dump the migration status for the current DB
version Print the current version of the database
create NAME [sql|go] Creates new migration file with the current timestamp
fix Apply sequential ordering to migrations`,
)
return
}
engine := args[0]
if engine != "sqlite3" && engine != "postgres" {
fmt.Println("engine must be one of 'sqlite3' or 'postgres'")
return
}
knownComponent := false
for _, c := range knownDBs {
if c == *component {
knownComponent = true
break
}
}
if !knownComponent {
fmt.Printf("component must be one of %v\n", knownDBs)
return
}
if engine == "sqlite3" {
loadSQLiteDeltas(*component)
} else {
loadPostgresDeltas(*component)
}
dbstring, command := args[1], args[2]
db, err := goose.OpenDBWithDriver(engine, dbstring)
if err != nil {
log.Fatalf("goose: failed to open DB: %v\n", err)
}
defer func() {
if err := db.Close(); err != nil {
log.Fatalf("goose: failed to close DB: %v\n", err)
}
}()
arguments := []string{}
if len(args) > 3 {
arguments = append(arguments, args[3:]...)
}
// goose demands a directory even though we don't use it for upgrades
d := *dir
if d == "" {
d = os.TempDir()
}
if err := goose.Run(command, db, d, arguments...); err != nil {
log.Fatalf("goose %v: %v", command, err)
}
}
func loadSQLiteDeltas(component string) {
switch component {
case UserAPI:
slusers.LoadFromGoose()
}
}
func loadPostgresDeltas(component string) {
switch component {
case UserAPI:
pgusers.LoadFromGoose()
}
}

View file

@ -64,6 +64,10 @@ global:
# e.g. localhost:443
well_known_server_name: ""
# The server name to delegate client-server communications to, with optional port
# e.g. localhost:443
well_known_client_name: ""
# Lists of domains that the server will trust as identity servers to verify third
# party identifiers such as phone numbers and email addresses.
trusted_third_party_id_servers:

View file

@ -54,6 +54,10 @@ global:
# e.g. localhost:443
well_known_server_name: ""
# The server name to delegate client-server communications to, with optional port
# e.g. localhost:443
well_known_client_name: ""
# Lists of domains that the server will trust as identity servers to verify third
# party identifiers such as phone numbers and email addresses.
trusted_third_party_id_servers:

View file

@ -233,6 +233,8 @@ GEM
multipart-post (2.1.1)
nokogiri (1.13.6-arm64-darwin)
racc (~> 1.4)
nokogiri (1.13.6-x86_64-linux)
racc (~> 1.4)
octokit (4.22.0)
faraday (>= 0.9)
sawyer (~> 0.8.0, >= 0.5.3)
@ -263,7 +265,7 @@ GEM
thread_safe (0.3.6)
typhoeus (1.4.0)
ethon (>= 0.9.0)
tzinfo (1.2.9)
tzinfo (1.2.10)
thread_safe (~> 0.1)
unf (0.1.4)
unf_ext
@ -273,11 +275,11 @@ GEM
PLATFORMS
arm64-darwin-21
x86_64-linux
DEPENDENCIES
github-pages (~> 226)
jekyll-feed (~> 0.15.1)
minima (~> 2.5.1)
BUNDLED WITH
2.3.7

View file

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// InviteV2 implements /_matrix/federation/v2/invite/{roomID}/{eventID}
@ -144,7 +143,6 @@ func processInvite(
// Check that the event is signed by the server sending the request.
redacted, err := gomatrixserverlib.RedactEventJSON(event.JSON(), event.Version())
if err != nil {
logrus.WithError(err).Errorf("XXX: invite.go")
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The event JSON could not be redacted"),

View file

@ -15,23 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func LoadRemoveRoomsTable(m *sqlutil.Migrations) {
m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func UpRemoveRoomsTable(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpRemoveRoomsTable(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
DROP TABLE IF EXISTS federationsender_rooms;
`)
if err != nil {

View file

@ -82,9 +82,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadRemoveRoomsTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
m := sqlutil.NewMigrator(d.db)
m.AddMigrations(sqlutil.Migration{
Version: "federationsender: drop federationsender_rooms",
Up: deltas.UpRemoveRoomsTable,
})
err = m.Up(base.Context())
if err != nil {
return nil, err
}
d.Database = shared.Database{

View file

@ -15,23 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func LoadRemoveRoomsTable(m *sqlutil.Migrations) {
m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func UpRemoveRoomsTable(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpRemoveRoomsTable(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
DROP TABLE IF EXISTS federationsender_rooms;
`)
if err != nil {

View file

@ -81,9 +81,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadRemoveRoomsTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
m := sqlutil.NewMigrator(d.db)
m.AddMigrations(sqlutil.Migration{
Version: "federationsender: drop federationsender_rooms",
Up: deltas.UpRemoveRoomsTable,
})
err = m.Up(base.Context())
if err != nil {
return nil, err
}
d.Database = shared.Database{

3
go.mod
View file

@ -25,7 +25,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d
github.com/matrix-org/gomatrixserverlib v0.0.0-20220725104114-b6003e522771
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13
@ -37,7 +37,6 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/pressly/goose v2.7.0+incompatible
github.com/prometheus/client_golang v1.12.2
github.com/sasha-s/go-deadlock v0.3.1
github.com/sirupsen/logrus v1.8.1

6
go.sum
View file

@ -341,8 +341,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d h1:BWInUURXVOW+OiifMapoRIS7i122KWdEKj6fnDFXgBo=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220725104114-b6003e522771 h1:ZIPHFIPNDS9dmEbPEiJbNmyCGJtn9exfpLC7JOcn/bE=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220725104114-b6003e522771/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a h1:DdG8vXMlZ65EAtc4V+3t7zHZ2Gqs24pSnyXS+4BRHUs=
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
@ -434,8 +434,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pressly/goose v2.7.0+incompatible h1:PWejVEv07LCerQEzMMeAtjuyCKbyprZ/LBa6K5P0OCQ=
github.com/pressly/goose v2.7.0+incompatible/go.mod h1:m+QHWCqxR3k8D9l7qfzuC/djtlfzxr34mozWDYEu1z8=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=

View file

@ -1,134 +1,146 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlutil
import (
"context"
"database/sql"
"fmt"
"runtime"
"sort"
"sync"
"time"
"github.com/matrix-org/dendrite/setup/config"
"github.com/pressly/goose"
"github.com/matrix-org/dendrite/internal"
"github.com/sasha-s/go-deadlock"
"github.com/sirupsen/logrus"
)
type Migrations struct {
gooseMutex deadlock.Mutex
registeredGoMigrations map[int64]*goose.Migration
const createDBMigrationsSQL = "" +
"CREATE TABLE IF NOT EXISTS db_migrations (" +
" version TEXT PRIMARY KEY NOT NULL," +
" time TEXT NOT NULL," +
" dendrite_version TEXT NOT NULL" +
");"
const insertVersionSQL = "" +
"INSERT INTO db_migrations (version, time, dendrite_version)" +
" VALUES ($1, $2, $3)"
const selectDBMigrationsSQL = "SELECT version FROM db_migrations"
// Migration defines a migration to be run.
type Migration struct {
// Version is a simple description/name of this migration.
Version string
// Up defines the function to execute for an upgrade.
Up func(ctx context.Context, txn *sql.Tx) error
// Down defines the function to execute for a downgrade (not implemented yet).
Down func(ctx context.Context, txn *sql.Tx) error
}
func NewMigrations() *Migrations {
return &Migrations{
registeredGoMigrations: make(map[int64]*goose.Migration),
// Migrator
type Migrator struct {
gooseMutex deadlock.Mutex
db *sql.DB
migrations []Migration
knownMigrations map[string]struct{}
mutex *sync.Mutex
}
// NewMigrator creates a new DB migrator.
func NewMigrator(db *sql.DB) *Migrator {
return &Migrator{
db: db,
migrations: []Migration{},
knownMigrations: make(map[string]struct{}),
mutex: &sync.Mutex{},
}
}
// Copy-pasted from goose directly to store migrations into a map we control
// AddMigration adds a migration.
func (m *Migrations) AddMigration(up func(*sql.Tx) error, down func(*sql.Tx) error) {
_, filename, _, _ := runtime.Caller(1)
m.AddNamedMigration(filename, up, down)
}
// AddNamedMigration : Add a named migration.
func (m *Migrations) AddNamedMigration(filename string, up func(*sql.Tx) error, down func(*sql.Tx) error) {
v, _ := goose.NumericComponent(filename)
migration := &goose.Migration{Version: v, Next: -1, Previous: -1, Registered: true, UpFn: up, DownFn: down, Source: filename}
if existing, ok := m.registeredGoMigrations[v]; ok {
panic(fmt.Sprintf("failed to add migration %q: version conflicts with %q", filename, existing.Source))
// AddMigrations appends migrations to the list of migrations. Migrations are executed
// in the order they are added to the list. De-duplicates migrations using their Version field.
func (m *Migrator) AddMigrations(migrations ...Migration) {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, mig := range migrations {
if _, ok := m.knownMigrations[mig.Version]; !ok {
m.migrations = append(m.migrations, mig)
m.knownMigrations[mig.Version] = struct{}{}
}
}
m.registeredGoMigrations[v] = migration
}
// RunDeltas up to the latest version.
func (m *Migrations) RunDeltas(db *sql.DB, props *config.DatabaseOptions) error {
// Up executes all migrations in order they were added.
func (m *Migrator) Up(ctx context.Context) error {
m.gooseMutex.Lock()
defer m.gooseMutex.Unlock()
maxVer := goose.MaxVersion
minVer := int64(0)
migrations, err := m.collect(minVer, maxVer)
var (
err error
dendriteVersion = internal.VersionString()
)
// ensure there is a table for known migrations
executedMigrations, err := m.ExecutedMigrations(ctx)
if err != nil {
return fmt.Errorf("runDeltas: Failed to collect migrations: %w", err)
return fmt.Errorf("unable to create/get migrations: %w", err)
}
if props.ConnectionString.IsPostgres() {
if err = goose.SetDialect("postgres"); err != nil {
return err
}
} else if props.ConnectionString.IsSQLite() {
if err = goose.SetDialect("sqlite3"); err != nil {
return err
}
} else {
return fmt.Errorf("unknown connection string: %s", props.ConnectionString)
}
for {
current, err := goose.EnsureDBVersion(db)
if err != nil {
return fmt.Errorf("runDeltas: Failed to EnsureDBVersion: %w", err)
}
next, err := migrations.Next(current)
if err != nil {
if err == goose.ErrNoNextVersion {
return nil
return WithTransaction(m.db, func(txn *sql.Tx) error {
for i := range m.migrations {
now := time.Now().UTC().Format(time.RFC3339)
migration := m.migrations[i]
logrus.Debugf("Executing database migration '%s'", migration.Version)
// Skip migration if it was already executed
if _, ok := executedMigrations[migration.Version]; ok {
continue
}
err = migration.Up(ctx, txn)
if err != nil {
return fmt.Errorf("unable to execute migration '%s': %w", migration.Version, err)
}
_, err = txn.ExecContext(ctx, insertVersionSQL,
migration.Version,
now,
dendriteVersion,
)
if err != nil {
return fmt.Errorf("unable to insert executed migrations: %w", err)
}
return fmt.Errorf("runDeltas: Failed to load next migration to %+v : %w", next, err)
}
if err = next.Up(db); err != nil {
return fmt.Errorf("runDeltas: Failed run migration: %w", err)
}
}
return nil
})
}
func (m *Migrations) collect(current, target int64) (goose.Migrations, error) {
var migrations goose.Migrations
// Go migrations registered via goose.AddMigration().
for _, migration := range m.registeredGoMigrations {
v, err := goose.NumericComponent(migration.Source)
if err != nil {
return nil, err
}
if versionFilter(v, current, target) {
migrations = append(migrations, migration)
// ExecutedMigrations returns a map with already executed migrations in addition to creating the
// migrations table, if it doesn't exist.
func (m *Migrator) ExecutedMigrations(ctx context.Context) (map[string]struct{}, error) {
result := make(map[string]struct{})
_, err := m.db.ExecContext(ctx, createDBMigrationsSQL)
if err != nil {
return nil, fmt.Errorf("unable to create db_migrations: %w", err)
}
rows, err := m.db.QueryContext(ctx, selectDBMigrationsSQL)
if err != nil {
return nil, fmt.Errorf("unable to query db_migrations: %w", err)
}
defer internal.CloseAndLogIfError(ctx, rows, "ExecutedMigrations: rows.close() failed")
var version string
for rows.Next() {
if err = rows.Scan(&version); err != nil {
return nil, fmt.Errorf("unable to scan version: %w", err)
}
result[version] = struct{}{}
}
migrations = sortAndConnectMigrations(migrations)
return migrations, nil
}
func sortAndConnectMigrations(migrations goose.Migrations) goose.Migrations {
sort.Sort(migrations)
// now that we're sorted in the appropriate direction,
// populate next and previous for each migration
for i, m := range migrations {
prev := int64(-1)
if i > 0 {
prev = migrations[i-1].Version
migrations[i-1].Next = m.Version
}
migrations[i].Previous = prev
}
return migrations
}
func versionFilter(v, current, target int64) bool {
if target > current {
return v > current && v <= target
}
if target < current {
return v <= current && v > target
}
return false
return result, rows.Err()
}

View file

@ -0,0 +1,112 @@
package sqlutil_test
import (
"context"
"database/sql"
"fmt"
"reflect"
"testing"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/test"
_ "github.com/mattn/go-sqlite3"
)
var dummyMigrations = []sqlutil.Migration{
{
Version: "init",
Up: func(ctx context.Context, txn *sql.Tx) error {
_, err := txn.ExecContext(ctx, "CREATE TABLE IF NOT EXISTS dummy ( test TEXT );")
return err
},
},
{
Version: "v2",
Up: func(ctx context.Context, txn *sql.Tx) error {
_, err := txn.ExecContext(ctx, "ALTER TABLE dummy ADD COLUMN test2 TEXT;")
return err
},
},
{
Version: "v2", // duplicate, this migration will be skipped
Up: func(ctx context.Context, txn *sql.Tx) error {
_, err := txn.ExecContext(ctx, "ALTER TABLE dummy ADD COLUMN test2 TEXT;")
return err
},
},
{
Version: "multiple execs",
Up: func(ctx context.Context, txn *sql.Tx) error {
_, err := txn.ExecContext(ctx, "ALTER TABLE dummy ADD COLUMN test3 TEXT;")
if err != nil {
return err
}
_, err = txn.ExecContext(ctx, "ALTER TABLE dummy ADD COLUMN test4 TEXT;")
return err
},
},
}
var failMigration = sqlutil.Migration{
Version: "iFail",
Up: func(ctx context.Context, txn *sql.Tx) error {
return fmt.Errorf("iFail")
},
Down: nil,
}
func Test_migrations_Up(t *testing.T) {
withFail := append(dummyMigrations, failMigration)
tests := []struct {
name string
migrations []sqlutil.Migration
wantResult map[string]struct{}
wantErr bool
}{
{
name: "dummy migration",
migrations: dummyMigrations,
wantResult: map[string]struct{}{
"init": {},
"v2": {},
"multiple execs": {},
},
},
{
name: "with fail",
migrations: withFail,
wantErr: true,
},
}
ctx := context.Background()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
conStr, close := test.PrepareDBConnectionString(t, dbType)
defer close()
driverName := "sqlite3"
if dbType == test.DBTypePostgres {
driverName = "postgres"
}
db, err := sql.Open(driverName, conStr)
if err != nil {
t.Errorf("unable to open database: %v", err)
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(tt.migrations...)
if err = m.Up(ctx); (err != nil) != tt.wantErr {
t.Errorf("Up() error = %v, wantErr %v", err, tt.wantErr)
}
result, err := m.ExecutedMigrations(ctx)
if err != nil {
t.Errorf("unable to get executed migrations: %v", err)
}
if !tt.wantErr && !reflect.DeepEqual(result, tt.wantResult) {
t.Errorf("expected: %+v, got %v", tt.wantResult, result)
}
})
})
}
}

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
"github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib"
@ -66,6 +67,16 @@ func NewPostgresCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, erro
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "keyserver: cross signing signature indexes",
Up: deltas.UpFixCrossSigningSignatureIndexes,
})
if err = m.Up(context.Background()); err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},

View file

@ -15,37 +15,27 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges)
}
func LoadRefactorKeyChanges(m *sqlutil.Migrations) {
m.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges)
}
func UpRefactorKeyChanges(tx *sql.Tx) error {
func UpRefactorKeyChanges(ctx context.Context, tx *sql.Tx) error {
// start counting from the last max offset, else 0. We need to do a count(*) first to see if there
// even are entries in this table to know if we can query for log_offset. Without the count then
// the query to SELECT the max log offset fails on new Dendrite instances as log_offset doesn't
// exist on that table. Even though we discard the error, the txn is tainted and gets aborted :/
var count int
_ = tx.QueryRow(`SELECT count(*) FROM keyserver_key_changes`).Scan(&count)
_ = tx.QueryRowContext(ctx, `SELECT count(*) FROM keyserver_key_changes`).Scan(&count)
if count > 0 {
var maxOffset int64
_ = tx.QueryRow(`SELECT coalesce(MAX(log_offset), 0) AS offset FROM keyserver_key_changes`).Scan(&maxOffset)
if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE IF NOT EXISTS keyserver_key_changes_seq START %d`, maxOffset)); err != nil {
_ = tx.QueryRowContext(ctx, `SELECT coalesce(MAX(log_offset), 0) AS offset FROM keyserver_key_changes`).Scan(&maxOffset)
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE SEQUENCE IF NOT EXISTS keyserver_key_changes_seq START %d`, maxOffset)); err != nil {
return fmt.Errorf("failed to CREATE SEQUENCE for key changes, starting at %d: %s", maxOffset, err)
}
}
_, err := tx.Exec(`
_, err := tx.ExecContext(ctx, `
-- make the new table
DROP TABLE IF EXISTS keyserver_key_changes;
CREATE TABLE IF NOT EXISTS keyserver_key_changes (
@ -60,8 +50,8 @@ func UpRefactorKeyChanges(tx *sql.Tx) error {
return nil
}
func DownRefactorKeyChanges(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownRefactorKeyChanges(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
-- Drop all data and revert back, we can't keep the data as Kafka offsets determine the numbers
DROP SEQUENCE IF EXISTS keyserver_key_changes_seq;
DROP TABLE IF EXISTS keyserver_key_changes;

View file

@ -15,18 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) {
m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes)
}
func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpFixCrossSigningSignatureIndexes(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey;
ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id);
@ -38,8 +33,8 @@ func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
return nil
}
func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownFixCrossSigningSignatureIndexes(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey;
ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, target_user_id, target_key_id);

View file

@ -19,6 +19,8 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@ -55,7 +57,23 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
db: db,
}
_, err := db.Exec(keyChangesSchema)
return s, err
if err != nil {
return s, err
}
// TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the
// column partition was removed from the table
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan()
if err == nil {
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "keyserver: refactor key changes",
Up: deltas.UpRefactorKeyChanges,
})
return s, m.Up(context.Background())
}
return s, nil
}
func (s *keyChangesStatements) Prepare() (err error) {

View file

@ -16,7 +16,6 @@ package postgres
import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas"
"github.com/matrix-org/dendrite/keyserver/storage/shared"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
@ -53,12 +52,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadRefactorKeyChanges(m)
deltas.LoadFixCrossSigningSignatureIndexes(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
if err = kc.Prepare(); err != nil {
return nil, err
}

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
"github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib"
@ -65,6 +66,15 @@ func NewSqliteCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "keyserver: cross signing signature indexes",
Up: deltas.UpFixCrossSigningSignatureIndexes,
})
if err = m.Up(context.Background()); err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},

View file

@ -15,28 +15,18 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges)
}
func LoadRefactorKeyChanges(m *sqlutil.Migrations) {
m.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges)
}
func UpRefactorKeyChanges(tx *sql.Tx) error {
func UpRefactorKeyChanges(ctx context.Context, tx *sql.Tx) error {
// start counting from the last max offset, else 0.
var maxOffset int64
var userID string
_ = tx.QueryRow(`SELECT user_id, MAX(log_offset) FROM keyserver_key_changes GROUP BY user_id`).Scan(&userID, &maxOffset)
_ = tx.QueryRowContext(ctx, `SELECT user_id, MAX(log_offset) FROM keyserver_key_changes GROUP BY user_id`).Scan(&userID, &maxOffset)
_, err := tx.Exec(`
_, err := tx.ExecContext(ctx, `
-- make the new table
DROP TABLE IF EXISTS keyserver_key_changes;
CREATE TABLE IF NOT EXISTS keyserver_key_changes (
@ -51,14 +41,14 @@ func UpRefactorKeyChanges(tx *sql.Tx) error {
}
// to start counting from maxOffset, insert a row with that value
if userID != "" {
_, err = tx.Exec(`INSERT INTO keyserver_key_changes(change_id, user_id) VALUES($1, $2)`, maxOffset, userID)
_, err = tx.ExecContext(ctx, `INSERT INTO keyserver_key_changes(change_id, user_id) VALUES($1, $2)`, maxOffset, userID)
return err
}
return nil
}
func DownRefactorKeyChanges(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownRefactorKeyChanges(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
-- Drop all data and revert back, we can't keep the data as Kafka offsets determine the numbers
DROP TABLE IF EXISTS keyserver_key_changes;
CREATE TABLE IF NOT EXISTS keyserver_key_changes (

View file

@ -15,18 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) {
m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes)
}
func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpFixCrossSigningSignatureIndexes(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp (
origin_user_id TEXT NOT NULL,
origin_key_id TEXT NOT NULL,
@ -50,8 +45,8 @@ func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
return nil
}
func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownFixCrossSigningSignatureIndexes(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp (
origin_user_id TEXT NOT NULL,
origin_key_id TEXT NOT NULL,

View file

@ -19,6 +19,8 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@ -53,7 +55,23 @@ func NewSqliteKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
db: db,
}
_, err := db.Exec(keyChangesSchema)
return s, err
if err != nil {
return s, err
}
// TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the
// column partition was removed from the table
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan()
if err == nil {
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "keyserver: refactor key changes",
Up: deltas.UpRefactorKeyChanges,
})
return s, m.Up(context.Background())
}
return s, nil
}
func (s *keyChangesStatements) Prepare() (err error) {

View file

@ -17,7 +17,6 @@ package sqlite3
import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/shared"
"github.com/matrix-org/dendrite/keyserver/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
)
@ -52,12 +51,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadRefactorKeyChanges(m)
deltas.LoadFixCrossSigningSignatureIndexes(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
if err = kc.Prepare(); err != nil {
return nil, err
}

View file

@ -12,6 +12,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -21,14 +22,14 @@ import (
// Move these to a more sensible place.
func UpdateToInviteMembership(
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
roomVersion gomatrixserverlib.RoomVersion,
) ([]api.OutputEvent, error) {
// We may have already sent the invite to the user, either because we are
// reprocessing this event, or because the we received this invite from a
// remote server via the federation invite API. In those cases we don't need
// to send the event.
needsSending, err := mu.SetToInvite(add)
needsSending, retired, err := mu.Update(tables.MembershipStateInvite, add)
if err != nil {
return nil, err
}
@ -38,13 +39,23 @@ func UpdateToInviteMembership(
// room event stream. This ensures that the consumers only have to
// consider a single stream of events when determining whether a user
// is invited, rather than having to combine multiple streams themselves.
onie := api.OutputNewInviteEvent{
Event: add.Headered(roomVersion),
RoomVersion: roomVersion,
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeNewInviteEvent,
NewInviteEvent: &onie,
Type: api.OutputTypeNewInviteEvent,
NewInviteEvent: &api.OutputNewInviteEvent{
Event: add.Headered(roomVersion),
RoomVersion: roomVersion,
},
})
}
for _, eventID := range retired {
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
EventID: eventID,
Membership: gomatrixserverlib.Join,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
}
return updates, nil

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
@ -60,20 +61,14 @@ func (r *Inputer) updateMemberships(
var updates []api.OutputEvent
for _, change := range changes {
var ae *gomatrixserverlib.Event
var re *gomatrixserverlib.Event
var ae *types.Event
var re *types.Event
targetUserNID := change.EventStateKeyNID
if change.removedEventNID != 0 {
ev, _ := helpers.EventMap(events).Lookup(change.removedEventNID)
if ev != nil {
re = ev.Event
}
re, _ = helpers.EventMap(events).Lookup(change.removedEventNID)
}
if change.addedEventNID != 0 {
ev, _ := helpers.EventMap(events).Lookup(change.addedEventNID)
if ev != nil {
ae = ev.Event
}
ae, _ = helpers.EventMap(events).Lookup(change.addedEventNID)
}
if updates, err = r.updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
return nil, err
@ -85,30 +80,27 @@ func (r *Inputer) updateMemberships(
func (r *Inputer) updateMembership(
updater *shared.RoomUpdater,
targetUserNID types.EventStateKeyNID,
remove, add *gomatrixserverlib.Event,
remove, add *types.Event,
updates []api.OutputEvent,
) ([]api.OutputEvent, error) {
var err error
// Default the membership to Leave if no event was added or removed.
oldMembership := gomatrixserverlib.Leave
newMembership := gomatrixserverlib.Leave
if remove != nil {
oldMembership, err = remove.Membership()
if err != nil {
return nil, err
}
}
if add != nil {
newMembership, err = add.Membership()
if err != nil {
return nil, err
}
}
if oldMembership == newMembership && newMembership != gomatrixserverlib.Join {
// If the membership is the same then nothing changed and we can return
// immediately, unless it's a Join update (e.g. profile update).
return updates, nil
var targetLocal bool
if add != nil {
targetLocal = r.isLocalTarget(add)
}
mu, err := updater.MembershipUpdater(targetUserNID, targetLocal)
if err != nil {
return nil, err
}
// In an ideal world, we shouldn't ever have "add" be nil and "remove" be
@ -120,17 +112,10 @@ func (r *Inputer) updateMembership(
// after a state reset, often thinking that the user was still joined to
// the room even though the room state said otherwise, and this would prevent
// the user from being able to attempt to rejoin the room without modifying
// the database. So instead what we'll do is we'll just update the membership
// table to say that the user is "leave" and we'll use the old event to
// avoid nil pointer exceptions on the code path that follows.
if add == nil {
add = remove
newMembership = gomatrixserverlib.Leave
}
mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add))
if err != nil {
return nil, err
// the database. So instead we're going to remove the membership from the
// database altogether, so that it doesn't create future problems.
if add == nil && remove != nil {
return nil, mu.Delete()
}
switch newMembership {
@ -149,7 +134,7 @@ func (r *Inputer) updateMembership(
}
}
func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool {
func (r *Inputer) isLocalTarget(event *types.Event) bool {
isTargetLocalUser := false
if statekey := event.StateKey(); statekey != nil {
_, domain, _ := gomatrixserverlib.SplitID('@', *statekey)
@ -159,81 +144,61 @@ func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool {
}
func updateToJoinMembership(
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
) ([]api.OutputEvent, error) {
// If the user is already marked as being joined, we call SetToJoin to update
// the event ID then we can return immediately. Retired is ignored as there
// is no invite event to retire.
if mu.IsJoin() {
_, err := mu.SetToJoin(add.Sender(), add.EventID(), true)
if err != nil {
return nil, err
}
return updates, nil
}
// When we mark a user as being joined we will invalidate any invites that
// are active for that user. We notify the consumers that the invites have
// been retired using a special event, even though they could infer this
// by studying the state changes in the room event stream.
retired, err := mu.SetToJoin(add.Sender(), add.EventID(), false)
_, retired, err := mu.Update(tables.MembershipStateJoin, add)
if err != nil {
return nil, err
}
for _, eventID := range retired {
orie := api.OutputRetireInviteEvent{
EventID: eventID,
Membership: gomatrixserverlib.Join,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &orie,
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
EventID: eventID,
Membership: gomatrixserverlib.Join,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
}
return updates, nil
}
func updateToLeaveMembership(
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event,
mu *shared.MembershipUpdater, add *types.Event,
newMembership string, updates []api.OutputEvent,
) ([]api.OutputEvent, error) {
// If the user is already neither joined, nor invited to the room then we
// can return immediately.
if mu.IsLeave() {
return updates, nil
}
// When we mark a user as having left we will invalidate any invites that
// are active for that user. We notify the consumers that the invites have
// been retired using a special event, even though they could infer this
// by studying the state changes in the room event stream.
retired, err := mu.SetToLeave(add.Sender(), add.EventID())
_, retired, err := mu.Update(tables.MembershipStateLeaveOrBan, add)
if err != nil {
return nil, err
}
for _, eventID := range retired {
orie := api.OutputRetireInviteEvent{
EventID: eventID,
Membership: newMembership,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &orie,
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
EventID: eventID,
Membership: newMembership,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
}
return updates, nil
}
func updateToKnockMembership(
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
) ([]api.OutputEvent, error) {
if mu.IsLeave() {
_, err := mu.SetToKnock(add)
if err != nil {
return nil, err
}
if _, _, err := mu.Update(tables.MembershipStateKnock, add); err != nil {
return nil, err
}
return updates, nil
}

View file

@ -39,11 +39,13 @@ type Inviter struct {
Inputer *input.Inputer
}
// nolint:gocyclo
func (r *Inviter) PerformInvite(
ctx context.Context,
req *api.PerformInviteRequest,
res *api.PerformInviteResponse,
) ([]api.OutputEvent, error) {
var outputUpdates []api.OutputEvent
event := req.Event
if event.StateKey() == nil {
return nil, fmt.Errorf("invite must be a state event")
@ -66,6 +68,13 @@ func (r *Inviter) PerformInvite(
}
isTargetLocal := domain == r.Cfg.Matrix.ServerName
isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName
if !isOriginLocal && !isTargetLocal {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: "The invite must be either from or to a local user",
}
return nil, nil
}
logger := util.GetLogger(ctx).WithFields(map[string]interface{}{
"inviter": event.Sender(),
@ -97,6 +106,34 @@ func (r *Inviter) PerformInvite(
}
}
updateMembershipTableManually := func() ([]api.OutputEvent, error) {
var updater *shared.MembershipUpdater
if updater, err = r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion); err != nil {
return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err)
}
outputUpdates, err = helpers.UpdateToInviteMembership(updater, &types.Event{
EventNID: 0,
Event: event.Unwrap(),
}, outputUpdates, req.Event.RoomVersion)
if err != nil {
return nil, fmt.Errorf("updateToInviteMembership: %w", err)
}
if err = updater.Commit(); err != nil {
return nil, fmt.Errorf("updater.Commit: %w", err)
}
logger.Debugf("updated membership to invite and sending invite OutputEvent")
return outputUpdates, nil
}
if (info == nil || info.IsStub) && !isOriginLocal && isTargetLocal {
// The invite came in over federation for a room that we don't know about
// yet. We need to handle this a bit differently to most invites because
// we don't know the room state, therefore the roomserver can't process
// an input event. Instead we will update the membership table with the
// new invite and generate an output event.
return updateMembershipTableManually()
}
var isAlreadyJoined bool
if info != nil {
_, isAlreadyJoined, _, err = r.DB.GetMembership(ctx, info.RoomNID, *event.StateKey())
@ -140,31 +177,13 @@ func (r *Inviter) PerformInvite(
return nil, nil
}
// If the invite originated remotely then we can't send an
// InputRoomEvent for the invite as it will never pass auth checks
// due to lacking room state, but we still need to tell the client
// about the invite so we can accept it, hence we return an output
// event to send to the Sync API.
if !isOriginLocal {
// The invite originated over federation. Process the membership
// update, which will notify the sync API etc about the incoming
// invite. We do NOT send an InputRoomEvent for the invite as it
// will never pass auth checks due to lacking room state, but we
// still need to tell the client about the invite so we can accept
// it, hence we return an output event to send to the sync api.
var updater *shared.MembershipUpdater
updater, err = r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion)
if err != nil {
return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err)
}
unwrapped := event.Unwrap()
var outputUpdates []api.OutputEvent
outputUpdates, err = helpers.UpdateToInviteMembership(updater, unwrapped, nil, req.Event.RoomVersion)
if err != nil {
return nil, fmt.Errorf("updateToInviteMembership: %w", err)
}
if err = updater.Commit(); err != nil {
return nil, fmt.Errorf("updater.Commit: %w", err)
}
logger.Debugf("updated membership to invite and sending invite OutputEvent")
return outputUpdates, nil
return updateMembershipTableManually()
}
// The invite originated locally. Therefore we have a responsibility to
@ -229,12 +248,11 @@ func (r *Inviter) PerformInvite(
Code: api.PerformErrorNotAllowed,
}
logger.WithError(err).WithField("event_id", event.EventID()).Error("r.InputRoomEvents failed")
return nil, nil
}
// Don't notify the sync api of this event in the same way as a federated invite so the invitee
// gets the invite, as the roomserver will do this when it processes the m.room.member invite.
return nil, nil
return outputUpdates, nil
}
func buildInviteStrippedState(

View file

@ -268,21 +268,19 @@ func (r *Joiner) performJoinRoomByID(
case nil:
// The room join is local. Send the new join event into the
// roomserver. First of all check that the user isn't already
// a member of the room.
alreadyJoined := false
for _, se := range buildRes.StateEvents {
if !se.StateKeyEquals(userID) {
continue
}
if membership, merr := se.Membership(); merr == nil {
alreadyJoined = (membership == gomatrixserverlib.Join)
break
}
// a member of the room. This is best-effort (as in we won't
// fail if we can't find the existing membership) because there
// is really no harm in just sending another membership event.
membershipReq := &api.QueryMembershipForUserRequest{
RoomID: req.RoomIDOrAlias,
UserID: userID,
}
membershipRes := &api.QueryMembershipForUserResponse{}
_ = r.Queryer.QueryMembershipForUser(ctx, membershipReq, membershipRes)
// If we haven't already joined the room then send an event
// into the room changing our membership status.
if !alreadyJoined {
if !membershipRes.RoomExists || !membershipRes.IsInRoom {
inputReq := rsAPI.InputRoomEventsRequest{
InputRoomEvents: []rsAPI.InputRoomEvent{
{

View file

@ -228,14 +228,14 @@ func (r *Leaver) performFederatedRejectInvite(
util.GetLogger(ctx).WithError(err).Errorf("failed to get MembershipUpdater, still retiring invite event")
}
if updater != nil {
if _, err = updater.SetToLeave(req.UserID, eventID); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("failed to set membership to leave, still retiring invite event")
if err = updater.Delete(); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("failed to delete membership, still retiring invite event")
if err = updater.Rollback(); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("failed to rollback membership leave, still retiring invite event")
util.GetLogger(ctx).WithError(err).Errorf("failed to rollback deleting membership, still retiring invite event")
}
} else {
if err = updater.Commit(); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("failed to commit membership update, still retiring invite event")
util.GetLogger(ctx).WithError(err).Errorf("failed to commit deleting membership, still retiring invite event")
}
}
}

View file

@ -16,6 +16,7 @@ package query
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
@ -225,6 +226,9 @@ func (r *Queryer) QueryMembershipsForRoom(
var eventNIDs []types.EventNID
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, request.JoinedOnly, request.LocalOnly)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err)
}
events, err = r.DB.Events(ctx, eventNIDs)
@ -260,6 +264,9 @@ func (r *Queryer) QueryMembershipsForRoom(
var eventNIDs []types.EventNID
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, request.JoinedOnly, false)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}

View file

@ -15,32 +15,21 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
goose.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
m.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
}
func UpAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(`ALTER TABLE roomserver_membership ADD COLUMN IF NOT EXISTS forgotten BOOLEAN NOT NULL DEFAULT false;`)
func UpAddForgottenColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_membership ADD COLUMN IF NOT EXISTS forgotten BOOLEAN NOT NULL DEFAULT false;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(`ALTER TABLE roomserver_membership DROP COLUMN IF EXISTS forgotten;`)
func DownAddForgottenColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_membership DROP COLUMN IF EXISTS forgotten;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}

View file

@ -15,11 +15,11 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@ -36,48 +36,44 @@ type stateBlockData struct {
EventNIDs types.EventNIDs
}
func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
// nolint:gocyclo
func UpStateBlocksRefactor(tx *sql.Tx) error {
func UpStateBlocksRefactor(ctx context.Context, tx *sql.Tx) error {
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
defer logrus.Warn("State storage upgrade complete")
var snapshotcount int
var maxsnapshotid int
var maxblockid int
if err := tx.QueryRow(`SELECT COUNT(DISTINCT state_snapshot_nid) FROM roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
if err := tx.QueryRowContext(ctx, `SELECT COUNT(DISTINCT state_snapshot_nid) FROM roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil {
return fmt.Errorf("tx.QueryRowContext.Scan (count snapshots): %w", err)
}
if err := tx.QueryRow(`SELECT COALESCE(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
if err := tx.QueryRowContext(ctx, `SELECT COALESCE(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
return fmt.Errorf("tx.QueryRowContext.Scan (count snapshots): %w", err)
}
if err := tx.QueryRow(`SELECT COALESCE(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
if err := tx.QueryRowContext(ctx, `SELECT COALESCE(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
return fmt.Errorf("tx.QueryRowContext.Scan (count snapshots): %w", err)
}
maxsnapshotid++
maxblockid++
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
if _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.ExecContext: %w", err)
}
if _, err := tx.Exec(`ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
if _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.ExecContext: %w", err)
}
// We create new sequences starting with the maximum state snapshot and block NIDs.
// This means that all newly created snapshots and blocks by the migration will have
// NIDs higher than these values, so that when we come to update the references to
// these NIDs using UPDATE statements, we can guarantee we are only ever updating old
// values and not accidentally overwriting new ones.
if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_block_nid_sequence START WITH %d;`, maxblockid)); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE SEQUENCE roomserver_state_block_nid_sequence START WITH %d;`, maxblockid)); err != nil {
return fmt.Errorf("tx.ExecContext: %w", err)
}
if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_snapshot_nid_sequence START WITH %d;`, maxsnapshotid)); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE SEQUENCE roomserver_state_snapshot_nid_sequence START WITH %d;`, maxsnapshotid)); err != nil {
return fmt.Errorf("tx.ExecContext: %w", err)
}
_, err := tx.Exec(`
_, err := tx.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS roomserver_state_block (
state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_sequence'),
state_block_hash BYTEA UNIQUE,
@ -87,7 +83,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
if err != nil {
return fmt.Errorf("tx.Exec (create blocks table): %w", err)
}
_, err = tx.Exec(`
_, err = tx.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_sequence'),
state_snapshot_hash BYTEA UNIQUE,
@ -104,7 +100,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
// in question a state snapshot NID of 0 to indicate 'no snapshot'.
// If we don't do this, we'll fail the assertions later on which try to ensure we didn't forget
// any snapshots.
_, err = tx.Exec(
_, err = tx.ExecContext(ctx,
`UPDATE roomserver_events SET state_snapshot_nid = 0 WHERE event_type_nid = $1 AND event_state_key_nid = $2`,
types.MRoomCreateNID, types.EmptyStateKeyNID,
)
@ -115,7 +111,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
batchsize := 100
for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize {
var snapshotrows *sql.Rows
snapshotrows, err = tx.Query(`
snapshotrows, err = tx.QueryContext(ctx, `
SELECT
state_snapshot_nid,
room_nid,
@ -146,7 +142,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
state_block_nid;
`, batchsize, batchoffset)
if err != nil {
return fmt.Errorf("tx.Query: %w", err)
return fmt.Errorf("tx.QueryContext: %w", err)
}
logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount)
@ -183,7 +179,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
// fill in bad create snapshots
for _, s := range badCreateSnapshots {
var createEventNID types.EventNID
err = tx.QueryRow(
err = tx.QueryRowContext(ctx,
`SELECT event_nid FROM roomserver_events WHERE state_snapshot_nid = $1 AND event_type_nid = 1`, s.StateSnapshotNID,
).Scan(&createEventNID)
if err == sql.ErrNoRows {
@ -208,7 +204,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
var blocknid types.StateBlockNID
err = tx.QueryRow(`
err = tx.QueryRowContext(ctx, `
INSERT INTO roomserver_state_block (state_block_hash, event_nids)
VALUES ($1, $2)
ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2
@ -227,7 +223,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
var newNID types.StateSnapshotNID
err = tx.QueryRow(`
err = tx.QueryRowContext(ctx, `
INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)
VALUES ($1, $2, $3)
ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2
@ -237,12 +233,12 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
}
if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.Exec (update events): %w", err)
if _, err = tx.ExecContext(ctx, `UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.ExecContext (update events): %w", err)
}
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.Exec (update rooms): %w", err)
if _, err = tx.ExecContext(ctx, `UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.ExecContext (update rooms): %w", err)
}
}
}
@ -252,13 +248,13 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
// in roomserver_state_snapshots
var count int64
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
if err = tx.QueryRowContext(ctx, `SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
var res sql.Result
var c int64
res, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid = 0 WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid)
res, err = tx.ExecContext(ctx, `UPDATE roomserver_events SET state_snapshot_nid = 0 WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to reset invalid state snapshots: %w", err)
}
@ -268,13 +264,13 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("expected to reset %d event(s) but only updated %d event(s)", count, c)
}
}
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
if err = tx.QueryRowContext(ctx, `SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
var debugRoomID string
var debugSnapNID, debugLastEventNID int64
err = tx.QueryRow(
err = tx.QueryRowContext(ctx,
`SELECT room_id, state_snapshot_nid, last_event_sent_nid FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid,
).Scan(&debugRoomID, &debugSnapNID, &debugLastEventNID)
if err != nil {
@ -291,13 +287,13 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if _, err = tx.Exec(`
if _, err = tx.ExecContext(ctx, `
DROP TABLE _roomserver_state_snapshots;
DROP SEQUENCE roomserver_state_snapshot_nid_seq;
`); err != nil {
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
}
if _, err = tx.Exec(`
if _, err = tx.ExecContext(ctx, `
DROP TABLE _roomserver_state_block;
DROP SEQUENCE roomserver_state_block_nid_seq;
`); err != nil {
@ -307,6 +303,6 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return nil
}
func DownStateBlocksRefactor(tx *sql.Tx) error {
func DownStateBlocksRefactor(ctx context.Context, tx *sql.Tx) error {
panic("Downgrading state storage is not supported")
}

View file

@ -23,6 +23,7 @@ import (
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
@ -86,24 +87,24 @@ const insertMembershipSQL = "" +
const selectMembershipFromRoomAndTargetSQL = "" +
"SELECT membership_nid, event_nid, forgotten FROM roomserver_membership" +
" WHERE room_nid = $1 AND target_nid = $2"
" WHERE room_nid = $1 AND event_nid != 0 AND target_nid = $2"
const selectMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2 and forgotten = false"
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2 and forgotten = false"
const selectLocalMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2" +
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2" +
" AND target_local = true and forgotten = false"
const selectMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 and forgotten = false"
" WHERE room_nid = $1 AND event_nid != 0 and forgotten = false"
const selectLocalMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1" +
" WHERE room_nid = $1 AND event_nid != 0" +
" AND target_local = true and forgotten = false"
const selectMembershipForUpdateSQL = "" +
@ -118,6 +119,9 @@ const updateMembershipForgetRoom = "" +
"UPDATE roomserver_membership SET forgotten = $3" +
" WHERE room_nid = $1 AND target_nid = $2"
const deleteMembershipSQL = "" +
"DELETE FROM roomserver_membership WHERE room_nid = $1 AND target_nid = $2"
const selectRoomsWithMembershipSQL = "" +
"SELECT room_nid FROM roomserver_membership WHERE membership_nid = $1 AND target_nid = $2 and forgotten = false"
@ -165,11 +169,20 @@ type membershipStatements struct {
updateMembershipForgetRoomStmt *sql.Stmt
selectLocalServerInRoomStmt *sql.Stmt
selectServerInRoomStmt *sql.Stmt
deleteMembershipStmt *sql.Stmt
}
func CreateMembershipTable(db *sql.DB) error {
_, err := db.Exec(membershipSchema)
return err
if err != nil {
return err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "roomserver: add forgotten column",
Up: deltas.UpAddForgottenColumn,
})
return m.Up(context.Background())
}
func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
@ -191,6 +204,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
{&s.selectServerInRoomStmt, selectServerInRoomSQL},
{&s.deleteMembershipStmt, deleteMembershipSQL},
}.Prepare(db)
}
@ -412,3 +426,13 @@ func (s *membershipStatements) SelectServerInRoom(
}
return roomNID == nid, nil
}
func (s *membershipStatements) DeleteMembership(
ctx context.Context, txn *sql.Tx,
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
) error {
_, err := sqlutil.TxStmt(txn, s.deleteMembershipStmt).ExecContext(
ctx, roomNID, targetUserNID,
)
return err
}

View file

@ -21,7 +21,6 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas"
@ -45,17 +44,25 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
}
// Create the tables.
if err := d.create(db); err != nil {
if err = d.create(db); err != nil {
return nil, err
}
// Then execute the migrations. By this point the tables are created with the latest
// schemas.
m := sqlutil.NewMigrations()
deltas.LoadAddForgottenColumn(m)
deltas.LoadStateBlocksRefactor(m)
if err := m.RunDeltas(db, dbProperties); err != nil {
return nil, err
// Special case, since this migration uses several tables, so it needs to
// be sure that all tables are created first.
// TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the
// column event_nid was removed from the table
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan()
if err == nil {
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "roomserver: state blocks refactor",
Up: deltas.UpStateBlocksRefactor,
})
if err := m.Up(base.Context()); err != nil {
return nil, err
}
}
// Then prepare the statements. Now that the migrations have run, any columns referred

View file

@ -15,7 +15,7 @@ type MembershipUpdater struct {
d *Database
roomNID types.RoomNID
targetUserNID types.EventStateKeyNID
membership tables.MembershipState
oldMembership tables.MembershipState
}
func NewMembershipUpdater(
@ -30,7 +30,6 @@ func NewMembershipUpdater(
if err != nil {
return err
}
targetUserNID, err = d.assignStateKeyNID(ctx, targetUserID)
if err != nil {
return err
@ -73,146 +72,62 @@ func (d *Database) membershipUpdaterTxn(
// IsInvite implements types.MembershipUpdater
func (u *MembershipUpdater) IsInvite() bool {
return u.membership == tables.MembershipStateInvite
return u.oldMembership == tables.MembershipStateInvite
}
// IsJoin implements types.MembershipUpdater
func (u *MembershipUpdater) IsJoin() bool {
return u.membership == tables.MembershipStateJoin
return u.oldMembership == tables.MembershipStateJoin
}
// IsLeave implements types.MembershipUpdater
func (u *MembershipUpdater) IsLeave() bool {
return u.membership == tables.MembershipStateLeaveOrBan
return u.oldMembership == tables.MembershipStateLeaveOrBan
}
// IsKnock implements types.MembershipUpdater
func (u *MembershipUpdater) IsKnock() bool {
return u.membership == tables.MembershipStateKnock
return u.oldMembership == tables.MembershipStateKnock
}
// SetToInvite implements types.MembershipUpdater
func (u *MembershipUpdater) SetToInvite(event *gomatrixserverlib.Event) (bool, error) {
var inserted bool
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
func (u *MembershipUpdater) Delete() error {
if _, err := u.d.InvitesTable.UpdateInviteRetired(u.ctx, u.txn, u.roomNID, u.targetUserNID); err != nil {
return err
}
return u.d.MembershipTable.DeleteMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID)
}
func (u *MembershipUpdater) Update(newMembership tables.MembershipState, event *types.Event) (bool, []string, error) {
var inserted bool // Did the query result in a membership change?
var retired []string // Did we retire any updates in the process?
return inserted, retired, u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
if err != nil {
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
}
inserted, err = u.d.InvitesTable.InsertInviteEvent(
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
)
inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, newMembership, event.EventNID, false)
if err != nil {
return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err)
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
}
// Look up the NID of the invite event
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{event.EventID()}, false)
if err != nil {
return fmt.Errorf("u.d.EventNIDs: %w", err)
if !inserted {
return nil
}
if u.membership != tables.MembershipStateInvite {
if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, nIDs[event.EventID()], false); err != nil {
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
switch {
case u.oldMembership != tables.MembershipStateInvite && newMembership == tables.MembershipStateInvite:
inserted, err = u.d.InvitesTable.InsertInviteEvent(
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
)
if err != nil {
return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err)
}
}
return nil
})
return inserted, err
}
// SetToJoin implements types.MembershipUpdater
func (u *MembershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) {
var inviteEventIDs []string
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, senderUserID)
if err != nil {
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
}
// If this is a join event update, there is no invite to update
if !isUpdate {
inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired(
case u.oldMembership == tables.MembershipStateInvite && newMembership != tables.MembershipStateInvite:
retired, err = u.d.InvitesTable.UpdateInviteRetired(
u.ctx, u.txn, u.roomNID, u.targetUserNID,
)
if err != nil {
return fmt.Errorf("u.d.InvitesTables.UpdateInviteRetired: %w", err)
}
}
// Look up the NID of the new join event
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID}, false)
if err != nil {
return fmt.Errorf("u.d.EventNIDs: %w", err)
}
if u.membership != tables.MembershipStateJoin || isUpdate {
if _, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateJoin, nIDs[eventID], false); err != nil {
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
}
}
return nil
})
return inviteEventIDs, err
}
// SetToLeave implements types.MembershipUpdater
func (u *MembershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) {
var inviteEventIDs []string
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, senderUserID)
if err != nil {
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
}
inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired(
u.ctx, u.txn, u.roomNID, u.targetUserNID,
)
if err != nil {
return fmt.Errorf("u.d.InvitesTable.updateInviteRetired: %w", err)
}
// Look up the NID of the new leave event
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID}, false)
if err != nil {
return fmt.Errorf("u.d.EventNIDs: %w", err)
}
if u.membership != tables.MembershipStateLeaveOrBan {
if _, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateLeaveOrBan, nIDs[eventID], false); err != nil {
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
}
}
return nil
})
return inviteEventIDs, err
}
// SetToKnock implements types.MembershipUpdater
func (u *MembershipUpdater) SetToKnock(event *gomatrixserverlib.Event) (bool, error) {
var inserted bool
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
if err != nil {
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
}
if u.membership != tables.MembershipStateKnock {
// Look up the NID of the new knock event
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{event.EventID()}, false)
if err != nil {
return fmt.Errorf("u.d.EventNIDs: %w", err)
}
if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateKnock, nIDs[event.EventID()], false); err != nil {
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
}
}
return nil
})
return inserted, err
}

View file

@ -15,24 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
goose.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
m.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
}
func UpAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(` ALTER TABLE roomserver_membership RENAME TO roomserver_membership_tmp;
func UpAddForgottenColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, ` ALTER TABLE roomserver_membership RENAME TO roomserver_membership_tmp;
CREATE TABLE IF NOT EXISTS roomserver_membership (
room_nid INTEGER NOT NULL,
target_nid INTEGER NOT NULL,
@ -57,8 +46,8 @@ DROP TABLE roomserver_membership_tmp;`)
return nil
}
func DownAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(` ALTER TABLE roomserver_membership RENAME TO roomserver_membership_tmp;
func DownAddForgottenColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, ` ALTER TABLE roomserver_membership RENAME TO roomserver_membership_tmp;
CREATE TABLE IF NOT EXISTS roomserver_membership (
room_nid INTEGER NOT NULL,
target_nid INTEGER NOT NULL,

View file

@ -21,40 +21,35 @@ import (
"fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
// nolint:gocyclo
func UpStateBlocksRefactor(tx *sql.Tx) error {
func UpStateBlocksRefactor(ctx context.Context, tx *sql.Tx) error {
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
defer logrus.Warn("State storage upgrade complete")
var maxsnapshotid int
var maxblockid int
if err := tx.QueryRow(`SELECT IFNULL(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
if err := tx.QueryRowContext(ctx, `SELECT IFNULL(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
return fmt.Errorf("tx.QueryRowContext.Scan (count snapshots): %w", err)
}
if err := tx.QueryRow(`SELECT IFNULL(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
if err := tx.QueryRowContext(ctx, `SELECT IFNULL(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
return fmt.Errorf("tx.QueryRowContext.Scan (count snapshots): %w", err)
}
maxsnapshotid++
maxblockid++
oldMaxSnapshotID := maxsnapshotid
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
if _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.ExecContext: %w", err)
}
if _, err := tx.Exec(`ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
if _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.ExecContext: %w", err)
}
_, err := tx.Exec(`
_, err := tx.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS roomserver_state_block (
state_block_nid INTEGER PRIMARY KEY AUTOINCREMENT,
state_block_hash BLOB UNIQUE,
@ -62,9 +57,9 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
);
`)
if err != nil {
return fmt.Errorf("tx.Exec: %w", err)
return fmt.Errorf("tx.ExecContext: %w", err)
}
_, err = tx.Exec(`
_, err = tx.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
state_snapshot_nid INTEGER PRIMARY KEY AUTOINCREMENT,
state_snapshot_hash BLOB UNIQUE,
@ -73,11 +68,11 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
);
`)
if err != nil {
return fmt.Errorf("tx.Exec: %w", err)
return fmt.Errorf("tx.ExecContext: %w", err)
}
snapshotrows, err := tx.Query(`SELECT state_snapshot_nid, room_nid, state_block_nids FROM _roomserver_state_snapshots;`)
snapshotrows, err := tx.QueryContext(ctx, `SELECT state_snapshot_nid, room_nid, state_block_nids FROM _roomserver_state_snapshots;`)
if err != nil {
return fmt.Errorf("tx.Query: %w", err)
return fmt.Errorf("tx.QueryContext: %w", err)
}
defer internal.CloseAndLogIfError(context.TODO(), snapshotrows, "rows.close() failed")
for snapshotrows.Next() {
@ -99,7 +94,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
// in question a state snapshot NID of 0 to indicate 'no snapshot'.
// If we don't do this, we'll fail the assertions later on which try to ensure we didn't forget
// any snapshots.
_, err = tx.Exec(
_, err = tx.ExecContext(ctx,
`UPDATE roomserver_events SET state_snapshot_nid = 0 WHERE event_type_nid = $1 AND event_state_key_nid = $2 AND state_snapshot_nid = $3`,
types.MRoomCreateNID, types.EmptyStateKeyNID, snapshot,
)
@ -109,9 +104,9 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
for _, block := range blocks {
if err = func() error {
blockrows, berr := tx.Query(`SELECT event_nid FROM _roomserver_state_block WHERE state_block_nid = $1`, block)
blockrows, berr := tx.QueryContext(ctx, `SELECT event_nid FROM _roomserver_state_block WHERE state_block_nid = $1`, block)
if berr != nil {
return fmt.Errorf("tx.Query (event nids from old block): %w", berr)
return fmt.Errorf("tx.QueryContext (event nids from old block): %w", berr)
}
defer internal.CloseAndLogIfError(context.TODO(), blockrows, "rows.close() failed")
events := types.EventNIDs{}
@ -129,14 +124,14 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
var blocknid types.StateBlockNID
err = tx.QueryRow(`
err = tx.QueryRowContext(ctx, `
INSERT INTO roomserver_state_block (state_block_nid, state_block_hash, event_nids)
VALUES ($1, $2, $3)
ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$3
RETURNING state_block_nid
`, maxblockid, events.Hash(), eventjson).Scan(&blocknid)
if err != nil {
return fmt.Errorf("tx.QueryRow.Scan (insert new block): %w", err)
return fmt.Errorf("tx.QueryRowContext.Scan (insert new block): %w", err)
}
maxblockid++
newblocks = append(newblocks, blocknid)
@ -151,22 +146,22 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
var newsnapshot types.StateSnapshotNID
err = tx.QueryRow(`
err = tx.QueryRowContext(ctx, `
INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids)
VALUES ($1, $2, $3, $4)
ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$3
RETURNING state_snapshot_nid
`, maxsnapshotid, newblocks.Hash(), room, newblocksjson).Scan(&newsnapshot)
if err != nil {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
return fmt.Errorf("tx.QueryRowContext.Scan (insert new snapshot): %w", err)
}
maxsnapshotid++
_, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid)
_, err = tx.ExecContext(ctx, `UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid)
if err != nil {
return fmt.Errorf("tx.Exec (update events): %w", err)
return fmt.Errorf("tx.ExecContext (update events): %w", err)
}
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
return fmt.Errorf("tx.Exec (update rooms): %w", err)
if _, err = tx.ExecContext(ctx, `UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
return fmt.Errorf("tx.ExecContext (update rooms): %w", err)
}
}
}
@ -175,13 +170,13 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
// in roomserver_state_snapshots
var count int64
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
if err = tx.QueryRowContext(ctx, `SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
var res sql.Result
var c int64
res, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid = 0 WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID)
res, err = tx.ExecContext(ctx, `UPDATE roomserver_events SET state_snapshot_nid = 0 WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to reset invalid state snapshots: %w", err)
}
@ -191,23 +186,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("expected to reset %d event(s) but only updated %d event(s)", count, c)
}
}
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
if err = tx.QueryRowContext(ctx, `SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {
if _, err = tx.ExecContext(ctx, `DROP TABLE _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
}
if _, err = tx.Exec(`DROP TABLE _roomserver_state_block;`); err != nil {
if _, err = tx.ExecContext(ctx, `DROP TABLE _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec (delete old block table): %w", err)
}
return nil
}
func DownStateBlocksRefactor(tx *sql.Tx) error {
func DownStateBlocksRefactor(ctx context.Context, tx *sql.Tx) error {
panic("Downgrading state storage is not supported")
}

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
@ -62,24 +63,24 @@ const insertMembershipSQL = "" +
const selectMembershipFromRoomAndTargetSQL = "" +
"SELECT membership_nid, event_nid, forgotten FROM roomserver_membership" +
" WHERE room_nid = $1 AND target_nid = $2"
" WHERE room_nid = $1 AND event_nid != 0 AND target_nid = $2"
const selectMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2 and forgotten = false"
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2 and forgotten = false"
const selectLocalMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2" +
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2" +
" AND target_local = true and forgotten = false"
const selectMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 and forgotten = false"
" WHERE room_nid = $1 AND event_nid != 0 and forgotten = false"
const selectLocalMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1" +
" WHERE room_nid = $1 AND event_nid != 0" +
" AND target_local = true and forgotten = false"
const selectMembershipForUpdateSQL = "" +
@ -125,6 +126,9 @@ const selectServerInRoomSQL = "" +
" JOIN roomserver_event_state_keys ON roomserver_membership.target_nid = roomserver_event_state_keys.event_state_key_nid" +
" WHERE membership_nid = $1 AND room_nid = $2 AND event_state_key LIKE '%:' || $3 LIMIT 1"
const deleteMembershipSQL = "" +
"DELETE FROM roomserver_membership WHERE room_nid = $1 AND target_nid = $2"
type membershipStatements struct {
db *sql.DB
insertMembershipStmt *sql.Stmt
@ -140,11 +144,20 @@ type membershipStatements struct {
updateMembershipForgetRoomStmt *sql.Stmt
selectLocalServerInRoomStmt *sql.Stmt
selectServerInRoomStmt *sql.Stmt
deleteMembershipStmt *sql.Stmt
}
func CreateMembershipTable(db *sql.DB) error {
_, err := db.Exec(membershipSchema)
return err
if err != nil {
return err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "roomserver: add forgotten column",
Up: deltas.UpAddForgottenColumn,
})
return m.Up(context.Background())
}
func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
@ -166,6 +179,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
{&s.selectServerInRoomStmt, selectServerInRoomSQL},
{&s.deleteMembershipStmt, deleteMembershipSQL},
}.Prepare(db)
}
@ -383,3 +397,13 @@ func (s *membershipStatements) SelectServerInRoom(ctx context.Context, txn *sql.
}
return roomNID == nid, nil
}
func (s *membershipStatements) DeleteMembership(
ctx context.Context, txn *sql.Tx,
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
) error {
_, err := sqlutil.TxStmt(txn, s.deleteMembershipStmt).ExecContext(
ctx, roomNID, targetUserNID,
)
return err
}

View file

@ -54,17 +54,25 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
// db.SetMaxOpenConns(20)
// Create the tables.
if err := d.create(db); err != nil {
if err = d.create(db); err != nil {
return nil, err
}
// Then execute the migrations. By this point the tables are created with the latest
// schemas.
m := sqlutil.NewMigrations()
deltas.LoadAddForgottenColumn(m)
deltas.LoadStateBlocksRefactor(m)
if err := m.RunDeltas(db, dbProperties); err != nil {
return nil, err
// Special case, since this migration uses several tables, so it needs to
// be sure that all tables are created first.
// TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the
// column event_nid was removed from the table
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan()
if err == nil {
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "roomserver: state blocks refactor",
Up: deltas.UpStateBlocksRefactor,
})
if err := m.Up(base.Context()); err != nil {
return nil, err
}
}
// Then prepare the statements. Now that the migrations have run, any columns referred

View file

@ -133,6 +133,7 @@ type Membership interface {
UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error
SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error)
SelectServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, serverName gomatrixserverlib.ServerName) (bool, error)
DeleteMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) error
}
type Published interface {

View file

@ -60,6 +60,9 @@ func TestMembershipTable(t *testing.T) {
// This inserts a left user to the room
err = tab.InsertMembership(ctx, nil, 1, stateKeyNID, true)
assert.NoError(t, err)
// We must update the membership with a non-zero event NID or it will get filtered out in later queries
_, err = tab.UpdateMembership(ctx, nil, 1, stateKeyNID, userNIDs[0], tables.MembershipStateLeaveOrBan, 1, false)
assert.NoError(t, err)
}
// ... so this should be false

View file

@ -46,6 +46,9 @@ type Global struct {
// The server name to delegate server-server communications to, with optional port
WellKnownServerName string `yaml:"well_known_server_name"`
// The server name to delegate client-server communications to, with optional port
WellKnownClientName string `yaml:"well_known_client_name"`
// Disables federation. Dendrite will not be able to make any outbound HTTP requests
// to other servers and the federation API will not be exposed.
DisableFederation bool `yaml:"disable_federation"`

View file

@ -42,6 +42,7 @@ global:
key_id: ed25519:auto
key_validity_period: 168h0m0s
well_known_server_name: "localhost:443"
well_known_client_name: "localhost:443"
trusted_third_party_id_servers:
- matrix.org
- vector.im

View file

@ -708,7 +708,6 @@ func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEve
StateKey: *ev.StateKey(),
Content: ev.Content(),
Sender: ev.Sender(),
RoomID: ev.RoomID(),
OriginServerTS: ev.OriginServerTS(),
}
}

View file

@ -365,7 +365,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
"event": string(msg.Event.JSON()),
"pdupos": pduPos,
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite failure")
}).Errorf("roomserver output log: write invite failure")
return
}
@ -385,7 +385,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
log.WithFields(log.Fields{
"event_id": msg.EventID,
log.ErrorKey: err,
}).Panicf("roomserver output log: remove invite failure")
}).Errorf("roomserver output log: remove invite failure")
return
}
@ -403,7 +403,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panicf("roomserver output log: write peek failure")
}).Errorf("roomserver output log: write peek failure")
return
}
@ -422,7 +422,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panicf("roomserver output log: write peek failure")
}).Errorf("roomserver output log: write peek failure")
return
}

View file

@ -23,6 +23,7 @@ import (
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@ -133,6 +134,17 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: add history visibility column (current_room_state)",
Up: deltas.UpAddHistoryVisibilityColumnCurrentRoomState,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
return nil, err
}

View file

@ -15,24 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpFixSequences, DownFixSequences)
goose.AddMigration(UpRemoveSendToDeviceSentColumn, DownRemoveSendToDeviceSentColumn)
}
func LoadFixSequences(m *sqlutil.Migrations) {
m.AddMigration(UpFixSequences, DownFixSequences)
}
func UpFixSequences(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpFixSequences(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
-- We need to delete all of the existing receipts because the indexes
-- will be wrong, and we'll get primary key violations if we try to
-- reuse existing stream IDs from a different sequence.
@ -49,8 +38,8 @@ func UpFixSequences(tx *sql.Tx) error {
return nil
}
func DownFixSequences(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownFixSequences(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
-- We need to delete all of the existing receipts because the indexes
-- will be wrong, and we'll get primary key violations if we try to
-- reuse existing stream IDs from a different sequence.

View file

@ -15,18 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadRemoveSendToDeviceSentColumn(m *sqlutil.Migrations) {
m.AddMigration(UpRemoveSendToDeviceSentColumn, DownRemoveSendToDeviceSentColumn)
}
func UpRemoveSendToDeviceSentColumn(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpRemoveSendToDeviceSentColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_send_to_device
DROP COLUMN IF EXISTS sent_by_token;
`)
@ -36,8 +31,8 @@ func UpRemoveSendToDeviceSentColumn(tx *sql.Tx) error {
return nil
}
func DownRemoveSendToDeviceSentColumn(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownRemoveSendToDeviceSentColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_send_to_device
ADD COLUMN IF NOT EXISTS sent_by_token TEXT;
`)

View file

@ -15,20 +15,24 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) {
m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn)
}
func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_output_room_events ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
`)
@ -38,8 +42,8 @@ func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
return nil
}
func DownAddHistoryVisibilityColumn(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility;
ALTER TABLE syncapi_current_room_state DROP COLUMN IF EXISTS history_visibility;
`)

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
@ -188,6 +189,17 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: add history visibility column (output_room_events)",
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertEventStmt, insertEventSQL},
{&s.selectEventsStmt, selectEventsSQL},

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@ -73,6 +74,15 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: fix sequences",
Up: deltas.UpFixSequences,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
r := &receiptStatements{
db: db,
}

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/sirupsen/logrus"
@ -76,6 +77,15 @@ func NewPostgresSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) {
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: drop sent_by_token",
Up: deltas.UpRemoveSendToDeviceSentColumn,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
if s.insertSendToDeviceMessageStmt, err = db.Prepare(insertSendToDeviceMessageSQL); err != nil {
return nil, err
}

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
)
@ -42,16 +41,18 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
return nil, err
}
if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
return nil, err
}
if _, err = d.db.Exec(currentRoomStateSchema); err != nil {
return nil, err
}
accountData, err := NewPostgresAccountDataTable(d.db)
if err != nil {
return nil, err
}
events, err := NewPostgresEventsTable(d.db)
if err != nil {
return nil, err
}
currState, err := NewPostgresCurrentRoomStateTable(d.db)
if err != nil {
return nil, err
}
invites, err := NewPostgresInvitesTable(d.db)
if err != nil {
return nil, err
@ -96,22 +97,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
deltas.LoadAddHistoryVisibilityColumn(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return nil, err
}
// prepare statements after the migrations have run
events, err := NewPostgresEventsTable(d.db)
if err != nil {
return nil, err
}
currState, err := NewPostgresCurrentRoomStateTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@ -120,6 +121,17 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (t
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: add history visibility column (current_room_state)",
Up: deltas.UpAddHistoryVisibilityColumnCurrentRoomState,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
return nil, err
}

View file

@ -15,24 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpFixSequences, DownFixSequences)
goose.AddMigration(UpRemoveSendToDeviceSentColumn, DownRemoveSendToDeviceSentColumn)
}
func LoadFixSequences(m *sqlutil.Migrations) {
m.AddMigration(UpFixSequences, DownFixSequences)
}
func UpFixSequences(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpFixSequences(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
-- We need to delete all of the existing receipts because the indexes
-- will be wrong, and we'll get primary key violations if we try to
-- reuse existing stream IDs from a different sequence.
@ -45,8 +34,8 @@ func UpFixSequences(tx *sql.Tx) error {
return nil
}
func DownFixSequences(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownFixSequences(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
-- We need to delete all of the existing receipts because the indexes
-- will be wrong, and we'll get primary key violations if we try to
-- reuse existing stream IDs from a different sequence.

View file

@ -15,18 +15,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadRemoveSendToDeviceSentColumn(m *sqlutil.Migrations) {
m.AddMigration(UpRemoveSendToDeviceSentColumn, DownRemoveSendToDeviceSentColumn)
}
func UpRemoveSendToDeviceSentColumn(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpRemoveSendToDeviceSentColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
CREATE TEMPORARY TABLE syncapi_send_to_device_backup(id, user_id, device_id, content);
INSERT INTO syncapi_send_to_device_backup SELECT id, user_id, device_id, content FROM syncapi_send_to_device;
DROP TABLE syncapi_send_to_device;
@ -45,8 +40,8 @@ func UpRemoveSendToDeviceSentColumn(tx *sql.Tx) error {
return nil
}
func DownRemoveSendToDeviceSentColumn(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownRemoveSendToDeviceSentColumn(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
CREATE TEMPORARY TABLE syncapi_send_to_device_backup(id, user_id, device_id, content);
INSERT INTO syncapi_send_to_device_backup SELECT id, user_id, device_id, content FROM syncapi_send_to_device;
DROP TABLE syncapi_send_to_device;

View file

@ -15,36 +15,36 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) {
m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn)
}
func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error {
// SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists.
// Required for unit tests, as otherwise a duplicate column error will show up.
_, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
_, err := tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
if err == nil {
return nil
}
_, err = tx.Exec(`
_, err = tx.ExecContext(ctx, `
ALTER TABLE syncapi_output_room_events ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2;
UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
_, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error {
// SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists.
// Required for unit tests, as otherwise a duplicate column error will show up.
_, err := tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
if err == nil {
return nil
}
_, err = tx.Exec(`
_, err = tx.ExecContext(ctx, `
ALTER TABLE syncapi_current_room_state ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2;
UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
`)
@ -54,25 +54,25 @@ func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
return nil
}
func DownAddHistoryVisibilityColumn(tx *sql.Tx) error {
func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error {
// SQLite doesn't have "if exists", so check if the column exists.
_, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
_, err := tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
if err != nil {
// The column probably doesn't exist
return nil
}
_, err = tx.Exec(`
_, err = tx.ExecContext(ctx, `
ALTER TABLE syncapi_output_room_events DROP COLUMN history_visibility;
`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
_, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
_, err = tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
if err != nil {
// The column probably doesn't exist
return nil
}
_, err = tx.Exec(`
_, err = tx.ExecContext(ctx, `
ALTER TABLE syncapi_current_room_state DROP COLUMN history_visibility;
`)
if err != nil {

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
@ -136,6 +137,17 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: add history visibility column (output_room_events)",
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertEventStmt, insertEventSQL},
{&s.selectMaxEventIDStmt, selectMaxEventIDSQL},

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@ -70,6 +71,15 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Re
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: fix sequences",
Up: deltas.UpFixSequences,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
r := &receiptStatements{
db: db,
streamIDStatements: streamID,

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/sirupsen/logrus"
@ -76,6 +77,15 @@ func NewSqliteSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) {
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "syncapi: drop sent_by_token",
Up: deltas.UpRemoveSendToDeviceSentColumn,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
if s.insertSendToDeviceMessageStmt, err = db.Prepare(insertSendToDeviceMessageSQL); err != nil {
return nil, err
}

View file

@ -22,7 +22,6 @@ import (
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
)
// SyncServerDatasource represents a sync server datasource which manages
@ -42,26 +41,28 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil {
return nil, err
}
if err = d.prepare(dbProperties); err != nil {
if err = d.prepare(); err != nil {
return nil, err
}
return &d, nil
}
func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) {
func (d *SyncServerDatasource) prepare() (err error) {
if err = d.streamID.Prepare(d.db); err != nil {
return err
}
if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
return err
}
if _, err = d.db.Exec(currentRoomStateSchema); err != nil {
return err
}
accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
if err != nil {
return err
}
events, err := NewSqliteEventsTable(d.db, &d.streamID)
if err != nil {
return err
}
roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
if err != nil {
return err
}
invites, err := NewSqliteInvitesTable(d.db, &d.streamID)
if err != nil {
return err
@ -106,22 +107,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
deltas.LoadAddHistoryVisibilityColumn(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return err
}
// prepare statements after the migrations have run
events, err := NewSqliteEventsTable(d.db, &d.streamID)
if err != nil {
return err
}
roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
if err != nil {
return err
}
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,

View file

@ -30,11 +30,13 @@ import (
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
synctypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
@ -64,7 +66,24 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
if req.DataType == "" {
return fmt.Errorf("data type must not be empty")
}
return a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData)
if err := a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData); err != nil {
util.GetLogger(ctx).WithError(err).Error("a.DB.SaveAccountData failed")
return fmt.Errorf("failed to save account data: %w", err)
}
var ignoredUsers *synctypes.IgnoredUsers
if req.DataType == "m.ignored_user_list" {
ignoredUsers = &synctypes.IgnoredUsers{}
_ = json.Unmarshal(req.AccountData, ignoredUsers)
}
if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
RoomID: req.RoomID,
Type: req.DataType,
IgnoredUsers: ignoredUsers,
}); err != nil {
util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed")
return fmt.Errorf("failed to send account data to output: %w", err)
}
return nil
}
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
@ -93,7 +112,9 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P
}
// Inform the SyncAPI about the newly created push_rules
if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules"); err != nil {
if err = a.SyncProducer.SendAccountData(acc.UserID, eventutil.AccountData{
Type: "m.push_rules",
}); err != nil {
util.GetLogger(ctx).WithFields(logrus.Fields{
"user_id": acc.UserID,
}).WithError(err).Warn("failed to send account data to the SyncAPI")
@ -732,11 +753,11 @@ func (a *UserInternalAPI) PerformPushRulesPut(
if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil {
return err
}
if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil {
if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
Type: pushRulesAccountDataType,
}); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed")
}
return nil
}

View file

@ -34,7 +34,7 @@ func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic stri
}
// SendAccountData sends account data to the Sync API server.
func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error {
func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) error {
m := &nats.Msg{
Subject: p.clientDataTopic,
Header: nats.Header{},
@ -42,18 +42,15 @@ func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string)
m.Header.Set(jetstream.UserID, userID)
var err error
m.Data, err = json.Marshal(eventutil.AccountData{
RoomID: roomID,
Type: dataType,
})
m.Data, err = json.Marshal(data)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
"room_id": data.RoomID,
"data_type": data.Type,
}).Tracef("Producing to topic '%s'", p.clientDataTopic)
_, err = p.producer.PublishMsg(m)

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/userapi/storage/tables"
log "github.com/sirupsen/logrus"
@ -85,6 +86,23 @@ func NewPostgresAccountsTable(db *sql.DB, serverName gomatrixserverlib.ServerNam
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations([]sqlutil.Migration{
{
Version: "userapi: add is active",
Up: deltas.UpIsActive,
Down: deltas.DownIsActive,
},
{
Version: "userapi: add account type",
Up: deltas.UpAddAccountType,
Down: deltas.DownAddAccountType,
},
}...)
err = m.Up(context.Background())
if err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertAccountStmt, insertAccountSQL},
{&s.updatePasswordStmt, updatePasswordSQL},

View file

@ -1,33 +1,21 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/pressly/goose"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadFromGoose() {
goose.AddMigration(UpIsActive, DownIsActive)
goose.AddMigration(UpAddAccountType, DownAddAccountType)
}
func LoadIsActive(m *sqlutil.Migrations) {
m.AddMigration(UpIsActive, DownIsActive)
}
func UpIsActive(tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS is_deactivated BOOLEAN DEFAULT FALSE;")
func UpIsActive(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS is_deactivated BOOLEAN DEFAULT FALSE;")
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownIsActive(tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE account_accounts DROP COLUMN is_deactivated;")
func DownIsActive(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "ALTER TABLE account_accounts DROP COLUMN is_deactivated;")
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}

View file

@ -1,18 +1,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadLastSeenTSIP(m *sqlutil.Migrations) {
m.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP)
}
func UpLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpLastSeenTSIP(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS last_seen_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)*1000;
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS ip TEXT;
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS user_agent TEXT;`)
@ -22,8 +17,8 @@ ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS user_agent TEXT;`)
return nil
}
func DownLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownLastSeenTSIP(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE device_devices DROP COLUMN last_seen_ts;
ALTER TABLE device_devices DROP COLUMN ip;
ALTER TABLE device_devices DROP COLUMN user_agent;`)

View file

@ -1,20 +1,15 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadAddAccountType(m *sqlutil.Migrations) {
m.AddMigration(UpAddAccountType, DownAddAccountType)
}
func UpAddAccountType(tx *sql.Tx) error {
func UpAddAccountType(ctx context.Context, tx *sql.Tx) error {
// initially set every account to useraccount, change appservice and guest accounts afterwards
// (user = 1, guest = 2, admin = 3, appservice = 4)
_, err := tx.Exec(`ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS account_type SMALLINT NOT NULL DEFAULT 1;
_, err := tx.ExecContext(ctx, `ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS account_type SMALLINT NOT NULL DEFAULT 1;
UPDATE account_accounts SET account_type = 4 WHERE appservice_id <> '';
UPDATE account_accounts SET account_type = 2 WHERE localpart ~ '^[0-9]+$';
ALTER TABLE account_accounts ALTER COLUMN account_type DROP DEFAULT;`,
@ -25,8 +20,8 @@ ALTER TABLE account_accounts ALTER COLUMN account_type DROP DEFAULT;`,
return nil
}
func DownAddAccountType(tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE account_accounts DROP COLUMN account_type;")
func DownAddAccountType(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "ALTER TABLE account_accounts DROP COLUMN account_type;")
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
)
@ -120,6 +121,15 @@ func NewPostgresDevicesTable(db *sql.DB, serverName gomatrixserverlib.ServerName
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "userapi: add last_seen_ts",
Up: deltas.UpLastSeenTSIP,
})
err = m.Up(context.Background())
if err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertDeviceStmt, insertDeviceSQL},
{&s.selectDeviceByTokenStmt, selectDeviceByTokenSQL},

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/userapi/storage/shared"
// Import the postgres database driver.
@ -37,19 +36,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
return nil, err
}
m := sqlutil.NewMigrations()
if _, err = db.Exec(accountsSchema); err != nil {
// do this so that the migration can and we don't fail on
// preparing statements for columns that don't exist yet
return nil, err
}
deltas.LoadIsActive(m)
//deltas.LoadLastSeenTSIP(m)
deltas.LoadAddAccountType(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
accountDataTable, err := NewPostgresAccountDataTable(db)
if err != nil {
return nil, fmt.Errorf("NewPostgresAccountDataTable: %w", err)

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/userapi/storage/tables"
log "github.com/sirupsen/logrus"
@ -87,6 +88,23 @@ func NewSQLiteAccountsTable(db *sql.DB, serverName gomatrixserverlib.ServerName)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations([]sqlutil.Migration{
{
Version: "userapi: add is active",
Up: deltas.UpIsActive,
Down: deltas.DownIsActive,
},
{
Version: "userapi: add account type",
Up: deltas.UpAddAccountType,
Down: deltas.DownAddAccountType,
},
}...)
err = m.Up(context.Background())
if err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertAccountStmt, insertAccountSQL},
{&s.updatePasswordStmt, updatePasswordSQL},

View file

@ -1,25 +1,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/pressly/goose"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadFromGoose() {
goose.AddMigration(UpIsActive, DownIsActive)
goose.AddMigration(UpAddAccountType, DownAddAccountType)
}
func LoadIsActive(m *sqlutil.Migrations) {
m.AddMigration(UpIsActive, DownIsActive)
}
func UpIsActive(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpIsActive(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
CREATE TABLE account_accounts (
localpart TEXT NOT NULL PRIMARY KEY,
@ -42,8 +30,8 @@ DROP TABLE account_accounts_tmp;`)
return nil
}
func DownIsActive(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownIsActive(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
CREATE TABLE account_accounts (
localpart TEXT NOT NULL PRIMARY KEY,

View file

@ -1,18 +1,13 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func LoadLastSeenTSIP(m *sqlutil.Migrations) {
m.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP)
}
func UpLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
func UpLastSeenTSIP(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE device_devices RENAME TO device_devices_tmp;
CREATE TABLE device_devices (
access_token TEXT PRIMARY KEY,
@ -39,8 +34,8 @@ func UpLastSeenTSIP(tx *sql.Tx) error {
return nil
}
func DownLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownLastSeenTSIP(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE device_devices RENAME TO device_devices_tmp;
CREATE TABLE IF NOT EXISTS device_devices (
access_token TEXT PRIMARY KEY,

View file

@ -1,26 +1,15 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/pressly/goose"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
func init() {
goose.AddMigration(UpAddAccountType, DownAddAccountType)
}
func LoadAddAccountType(m *sqlutil.Migrations) {
m.AddMigration(UpAddAccountType, DownAddAccountType)
}
func UpAddAccountType(tx *sql.Tx) error {
func UpAddAccountType(ctx context.Context, tx *sql.Tx) error {
// initially set every account to useraccount, change appservice and guest accounts afterwards
// (user = 1, guest = 2, admin = 3, appservice = 4)
_, err := tx.Exec(`ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
_, err := tx.ExecContext(ctx, `ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
CREATE TABLE account_accounts (
localpart TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL,
@ -45,8 +34,8 @@ DROP TABLE account_accounts_tmp;`)
return nil
}
func DownAddAccountType(tx *sql.Tx) error {
_, err := tx.Exec(`ALTER TABLE account_accounts DROP COLUMN account_type;`)
func DownAddAccountType(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `ALTER TABLE account_accounts DROP COLUMN account_type;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/clientapi/userutil"
@ -107,6 +108,15 @@ func NewSQLiteDevicesTable(db *sql.DB, serverName gomatrixserverlib.ServerName)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "userapi: add last_seen_ts",
Up: deltas.UpLastSeenTSIP,
})
if err = m.Up(context.Background()); err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertDeviceStmt, insertDeviceSQL},
{&s.selectDevicesCountStmt, selectDevicesCountSQL},

View file

@ -25,10 +25,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/storage/shared"
"github.com/matrix-org/dendrite/userapi/storage/sqlite3/deltas"
// Import the postgres database driver.
_ "github.com/lib/pq"
)
// NewDatabase creates a new accounts and profiles database
@ -38,19 +34,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
return nil, err
}
m := sqlutil.NewMigrations()
if _, err = db.Exec(accountsSchema); err != nil {
// do this so that the migration can and we don't fail on
// preparing statements for columns that don't exist yet
return nil, err
}
deltas.LoadIsActive(m)
//deltas.LoadLastSeenTSIP(m)
deltas.LoadAddAccountType(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
accountDataTable, err := NewSQLiteAccountDataTable(db)
if err != nil {
return nil, fmt.Errorf("NewSQLiteAccountDataTable: %w", err)