From bdd1a87d4ddab4b937049f55ef9ceda943cff218 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Fri, 19 Jul 2019 07:04:06 +0100 Subject: [PATCH 01/14] Add appservice API to config unit test (#744) Fixes #558 --- common/config/config_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/config/config_test.go b/common/config/config_test.go index acc4dbd12..110c8b84c 100644 --- a/common/config/config_test.go +++ b/common/config/config_test.go @@ -54,12 +54,14 @@ database: server_key: "postgresql:///server_keys" sync_api: "postgresql:///syn_api" room_server: "postgresql:///room_server" + appservice: "postgresql:///appservice" listen: room_server: "localhost:7770" client_api: "localhost:7771" federation_api: "localhost:7772" sync_api: "localhost:7773" media_api: "localhost:7774" + appservice_api: "localhost:7777" typing_server: "localhost:7778" logging: - type: "file" From 78032b3f4c0ca2d272180afb09142fce82313109 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 22 Jul 2019 15:05:38 +0100 Subject: [PATCH 02/14] Correctly create new device when device_id is passed to /login (#753) Fixes https://github.com/matrix-org/dendrite/issues/401 Currently when passing a `device_id` parameter to `/login`, which is [supposed](https://matrix.org/docs/spec/client_server/unstable#post-matrix-client-r0-login) to return a device with that ID set, it instead just generates a random `device_id` and hands that back to you. The code was already there to do this correctly, it looks like it had just been broken during some change. Hopefully sytest will prevent this from becoming broken again. --- .../auth/storage/devices/devices_table.go | 2 ++ clientapi/auth/storage/devices/storage.go | 2 +- clientapi/routing/login.go | 25 ++++++++--------- clientapi/routing/register.go | 28 ++++++++++++------- testfile | 2 ++ 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/clientapi/auth/storage/devices/devices_table.go b/clientapi/auth/storage/devices/devices_table.go index 96d6521d8..60aa563a2 100644 --- a/clientapi/auth/storage/devices/devices_table.go +++ b/clientapi/auth/storage/devices/devices_table.go @@ -169,6 +169,8 @@ func (s *devicesStatements) selectDeviceByToken( return &dev, err } +// selectDeviceByID retrieves a device from the database with the given user +// localpart and deviceID func (s *devicesStatements) selectDeviceByID( ctx context.Context, localpart, deviceID string, ) (*authtypes.Device, error) { diff --git a/clientapi/auth/storage/devices/storage.go b/clientapi/auth/storage/devices/storage.go index 7032fe7bf..82c8e97a2 100644 --- a/clientapi/auth/storage/devices/storage.go +++ b/clientapi/auth/storage/devices/storage.go @@ -84,7 +84,7 @@ func (d *Database) CreateDevice( if deviceID != nil { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - // Revoke existing token for this device + // Revoke existing tokens for this device if err = d.devices.deleteDevice(ctx, txn, *deviceID, localpart); err != nil { return err } diff --git a/clientapi/routing/login.go b/clientapi/routing/login.go index 2e2d409f6..02d958152 100644 --- a/clientapi/routing/login.go +++ b/clientapi/routing/login.go @@ -18,7 +18,6 @@ import ( "net/http" "context" - "database/sql" "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -42,10 +41,12 @@ type flow struct { } type passwordRequest struct { - User string `json:"user"` - Password string `json:"password"` + User string `json:"user"` + Password string `json:"password"` + // Both DeviceID and InitialDisplayName can be omitted, or empty strings ("") + // Thus a pointer is needed to differentiate between the two InitialDisplayName *string `json:"initial_device_display_name"` - DeviceID string `json:"device_id"` + DeviceID *string `json:"device_id"` } type loginResponse struct { @@ -110,7 +111,7 @@ func Login( return httputil.LogThenError(req, err) } - dev, err := getDevice(req.Context(), r, deviceDB, acc, localpart, token) + dev, err := getDevice(req.Context(), r, deviceDB, acc, token) if err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError, @@ -134,20 +135,16 @@ func Login( } } -// check if device exists else create one +// getDevice returns a new or existing device func getDevice( ctx context.Context, r passwordRequest, deviceDB *devices.Database, acc *authtypes.Account, - localpart, token string, + token string, ) (dev *authtypes.Device, err error) { - dev, err = deviceDB.GetDeviceByID(ctx, localpart, r.DeviceID) - if err == sql.ErrNoRows { - // device doesn't exist, create one - dev, err = deviceDB.CreateDevice( - ctx, acc.Localpart, nil, token, r.InitialDisplayName, - ) - } + dev, err = deviceDB.CreateDevice( + ctx, acc.Localpart, r.DeviceID, token, r.InitialDisplayName, + ) return } diff --git a/clientapi/routing/register.go b/clientapi/routing/register.go index fa15f4fc0..c5a3d3018 100644 --- a/clientapi/routing/register.go +++ b/clientapi/routing/register.go @@ -121,7 +121,10 @@ type registerRequest struct { // user-interactive auth params Auth authDict `json:"auth"` + // Both DeviceID and InitialDisplayName can be omitted, or empty strings ("") + // Thus a pointer is needed to differentiate between the two InitialDisplayName *string `json:"initial_device_display_name"` + DeviceID *string `json:"device_id"` // Prevent this user from logging in InhibitLogin common.WeakBoolean `json:"inhibit_login"` @@ -626,7 +629,7 @@ func handleApplicationServiceRegistration( // application service registration is entirely separate. return completeRegistration( req.Context(), accountDB, deviceDB, r.Username, "", appserviceID, - r.InhibitLogin, r.InitialDisplayName, + r.InhibitLogin, r.InitialDisplayName, r.DeviceID, ) } @@ -646,7 +649,7 @@ func checkAndCompleteFlow( // This flow was completed, registration can continue return completeRegistration( req.Context(), accountDB, deviceDB, r.Username, r.Password, "", - r.InhibitLogin, r.InitialDisplayName, + r.InhibitLogin, r.InitialDisplayName, r.DeviceID, ) } @@ -697,10 +700,10 @@ func LegacyRegister( return util.MessageResponse(http.StatusForbidden, "HMAC incorrect") } - return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil) + return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil, nil) case authtypes.LoginTypeDummy: // there is nothing to do - return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil) + return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil, nil) default: return util.JSONResponse{ Code: http.StatusNotImplemented, @@ -738,13 +741,19 @@ func parseAndValidateLegacyLogin(req *http.Request, r *legacyRegisterRequest) *u return nil } +// completeRegistration runs some rudimentary checks against the submitted +// input, then if successful creates an account and a newly associated device +// We pass in each individual part of the request here instead of just passing a +// registerRequest, as this function serves requests encoded as both +// registerRequests and legacyRegisterRequests, which share some attributes but +// not all func completeRegistration( ctx context.Context, accountDB *accounts.Database, deviceDB *devices.Database, username, password, appserviceID string, inhibitLogin common.WeakBoolean, - displayName *string, + displayName, deviceID *string, ) util.JSONResponse { if username == "" { return util.JSONResponse{ @@ -773,6 +782,9 @@ func completeRegistration( } } + // Increment prometheus counter for created users + amtRegUsers.Inc() + // Check whether inhibit_login option is set. If so, don't create an access // token or a device for this user if inhibitLogin { @@ -793,8 +805,7 @@ func completeRegistration( } } - // TODO: Use the device ID in the request. - dev, err := deviceDB.CreateDevice(ctx, username, nil, token, displayName) + dev, err := deviceDB.CreateDevice(ctx, username, deviceID, token, displayName) if err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError, @@ -802,9 +813,6 @@ func completeRegistration( } } - // Increment prometheus counter for created users - amtRegUsers.Inc() - return util.JSONResponse{ Code: http.StatusOK, JSON: registerResponse{ diff --git a/testfile b/testfile index 8a8225a83..1b2ad9e69 100644 --- a/testfile +++ b/testfile @@ -149,3 +149,5 @@ Typing events appear in incremental sync Typing events appear in gapped sync Inbound federation of state requires event_id as a mandatory paramater Inbound federation of state_ids requires event_id as a mandatory paramater +POST /register returns the same device_id as that in the request +POST /login returns the same device_id as that in the request From 4410acc673b27e747e8ba757e9b271ada55c0269 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 24 Jul 2019 05:44:05 +0100 Subject: [PATCH 03/14] Add filepath and function name to log output (#755) Adds detailed logging, describing which file/line a log message came from, as well as the name of the function that it was contained within. --- common/log.go | 28 ++++++++++++++++++++++++++-- go.mod | 8 +++++--- go.sum | 9 +++++++++ 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/common/log.go b/common/log.go index 89a705822..f9ed84edb 100644 --- a/common/log.go +++ b/common/log.go @@ -15,9 +15,12 @@ package common import ( + "fmt" "os" "path" "path/filepath" + "runtime" + "strings" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dugong" @@ -54,15 +57,35 @@ func (h *logLevelHook) Levels() []logrus.Level { return levels } +// callerPrettyfier is a function that given a runtime.Frame object, will +// extract the calling function's name and file, and return them in a nicely +// formatted way +func callerPrettyfier(f *runtime.Frame) (string, string) { + // Retrieve just the function name + s := strings.Split(f.Function, ".") + funcname := s[len(s)-1] + + // Append a newline + tab to it to move the actual log content to its own line + funcname += "\n\t" + + // Surround the filepath in brackets and append line number so IDEs can quickly + // navigate + filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line) + + return funcname, filename +} + // SetupStdLogging configures the logging format to standard output. Typically, it is called when the config is not yet loaded. func SetupStdLogging() { + logrus.SetReportCaller(true) logrus.SetFormatter(&utcFormatter{ &logrus.TextFormatter{ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00", FullTimestamp: true, DisableColors: false, DisableTimestamp: false, - DisableSorting: false, + QuoteEmptyFields: true, + CallerPrettyfier: callerPrettyfier, }, }) } @@ -71,8 +94,8 @@ func SetupStdLogging() { // If something fails here it means that the logging was improperly configured, // so we just exit with the error func SetupHookLogging(hooks []config.LogrusHook, componentName string) { + logrus.SetReportCaller(true) for _, hook := range hooks { - // Check we received a proper logging level level, err := logrus.ParseLevel(hook.Level) if err != nil { @@ -126,6 +149,7 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str DisableColors: true, DisableTimestamp: false, DisableSorting: false, + QuoteEmptyFields: true, }, }, &dugong.DailyRotationSchedule{GZip: true}, diff --git a/go.mod b/go.mod index 3b4b736a4..4e5123730 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/jaegertracing/jaeger-client-go v0.0.0-20170921145708-3ad49a1d839b github.com/jaegertracing/jaeger-lib v0.0.0-20170920222118-21a3da6d66fe github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 + github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/lib/pq v0.0.0-20170918175043-23da1db4f16d github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 @@ -40,8 +41,9 @@ require ( github.com/prometheus/common v0.0.0-20170108231212-dd2f054febf4 github.com/prometheus/procfs v0.0.0-20170128160123-1878d9fbb537 github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5 - github.com/sirupsen/logrus v1.3.0 - github.com/stretchr/testify v1.2.2 + github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/objx v0.2.0 // indirect + github.com/stretchr/testify v1.3.0 github.com/tidwall/gjson v1.1.5 github.com/tidwall/match v1.0.1 github.com/tidwall/sjson v1.0.3 @@ -54,7 +56,7 @@ require ( go.uber.org/zap v1.7.1 golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95 - golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 + golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 gopkg.in/Shopify/sarama.v1 v1.11.0 gopkg.in/airbrake/gobrake.v2 v2.0.9 gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20170727041045-23bcc3c4eae3 diff --git a/go.sum b/go.sum index 8026640b5..5fd3dc5b4 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,7 @@ github.com/jaegertracing/jaeger-lib v0.0.0-20170920222118-21a3da6d66fe/go.mod h1 github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 h1:KAZ1BW2TCmT6PRihDPpocIy1QTtsAsrx6TneU/4+CMg= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -90,9 +91,14 @@ github.com/sirupsen/logrus v0.0.0-20170822132746-89742aefa4b2 h1:+8J/sCAVv2Y9Ct1 github.com/sirupsen/logrus v0.0.0-20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v0.0.0-20170809224252-890a5c3458b4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/tidwall/gjson v1.0.2 h1:5BsM7kyEAHAUGEGDkEKO9Mdyiuw6QQ6TSDdarP0Nnmk= github.com/tidwall/gjson v1.0.2/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA= github.com/tidwall/gjson v1.1.5 h1:QysILxBeUEY3GTLA0fQVgkQG1zme8NxGvhh2SSqWNwI= @@ -128,6 +134,9 @@ golang.org/x/sys v0.0.0-20171012164349-43eea11bc926 h1:PY6OU86NqbyZiOzaPnDw6oOjA golang.org/x/sys v0.0.0-20171012164349-43eea11bc926/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= gopkg.in/Shopify/sarama.v1 v1.11.0 h1:/3kaCyeYaPbr59IBjeqhIcUOB1vXlIVqXAYa5g5C5F0= gopkg.in/Shopify/sarama.v1 v1.11.0/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= From 6773572907a7748ce7f4ccd5467ee2e1d5d06f77 Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Wed, 24 Jul 2019 23:27:40 +0800 Subject: [PATCH 04/14] Update gomatrixserverlib to v0.0.0-20190724145009-a6df10ef35d6 (#762) Signed-off-by: Alex Chen --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 4e5123730..5d01012c9 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/lib/pq v0.0.0-20170918175043-23da1db4f16d github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2 + github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6 github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0 github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 github.com/matttproud/golang_protobuf_extensions v1.0.1 diff --git a/go.sum b/go.sum index 5fd3dc5b4..53151121b 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20181109104322-1c2cbc0872f0 h1:3U github.com/matrix-org/gomatrixserverlib v0.0.0-20181109104322-1c2cbc0872f0/go.mod h1:YHyhIQUmuXyKtoVfDUMk/DyU93Taamlu6nPZkij/JtA= github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2 h1:pYajAEdi3sowj4iSunqctchhcMNW3rDjeeH0T4uDkMY= github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2/go.mod h1:sf0RcKOdiwJeTti7A313xsaejNUGYDq02MQZ4JD4w/E= +github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6 h1:B8n1H5Wb1B5jwLzTylBpY0kJCMRqrofT7PmOw4aJFJA= +github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6/go.mod h1:sf0RcKOdiwJeTti7A313xsaejNUGYDq02MQZ4JD4w/E= github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0 h1:p7WTwG+aXM86+yVrYAiCMW3ZHSmotVvuRbjtt3jC+4A= github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171013132526-8b1c8ab81986 h1:TiWl4hLvezAhRPM8tPcPDFTysZ7k4T/1J4GPp/iqlZo= From b729a10366f9cb6f8b34db58c7bc1b9b69e67b5f Mon Sep 17 00:00:00 2001 From: Thibaut CHARLES Date: Wed, 24 Jul 2019 18:08:51 +0200 Subject: [PATCH 05/14] Store & retrieve filters as structs rather than []byte (#436) Manipulate filters as gomatrix.Filter structures, instead of their []byte JSON representation. This lays ground work for using filters in dendrite for /sync requests. --- .../auth/storage/accounts/filter_table.go | 38 ++++++++++++++----- clientapi/auth/storage/accounts/storage.go | 8 ++-- clientapi/routing/filter.go | 20 +++------- 3 files changed, 39 insertions(+), 27 deletions(-) diff --git a/clientapi/auth/storage/accounts/filter_table.go b/clientapi/auth/storage/accounts/filter_table.go index 81bae4545..2b07ef17e 100644 --- a/clientapi/auth/storage/accounts/filter_table.go +++ b/clientapi/auth/storage/accounts/filter_table.go @@ -17,6 +17,7 @@ package accounts import ( "context" "database/sql" + "encoding/json" "github.com/matrix-org/gomatrixserverlib" ) @@ -71,25 +72,44 @@ func (s *filterStatements) prepare(db *sql.DB) (err error) { func (s *filterStatements) selectFilter( ctx context.Context, localpart string, filterID string, -) (filter []byte, err error) { - err = s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filter) - return +) (*gomatrixserverlib.Filter, error) { + // Retrieve filter from database (stored as canonical JSON) + var filterData []byte + err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) + if err != nil { + return nil, err + } + + // Unmarshal JSON into Filter struct + var filter gomatrixserverlib.Filter + if err = json.Unmarshal(filterData, &filter); err != nil { + return nil, err + } + return &filter, nil } func (s *filterStatements) insertFilter( - ctx context.Context, filter []byte, localpart string, + ctx context.Context, filter *gomatrixserverlib.Filter, localpart string, ) (filterID string, err error) { var existingFilterID string - // This can result in a race condition when two clients try to insert the - // same filter and localpart at the same time, however this is not a - // problem as both calls will result in the same filterID - filterJSON, err := gomatrixserverlib.CanonicalJSON(filter) + // Serialise json + filterJSON, err := json.Marshal(filter) + if err != nil { + return "", err + } + // Remove whitespaces and sort JSON data + // needed to prevent from inserting the same filter multiple times + filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON) if err != nil { return "", err } - // Check if filter already exists in the database + // Check if filter already exists in the database using its localpart and content + // + // This can result in a race condition when two clients try to insert the + // same filter and localpart at the same time, however this is not a + // problem as both calls will result in the same filterID err = s.selectFilterIDByContentStmt.QueryRowContext(ctx, localpart, filterJSON).Scan(&existingFilterID) if err != nil && err != sql.ErrNoRows { diff --git a/clientapi/auth/storage/accounts/storage.go b/clientapi/auth/storage/accounts/storage.go index 27c0a176a..5c8ffffeb 100644 --- a/clientapi/auth/storage/accounts/storage.go +++ b/clientapi/auth/storage/accounts/storage.go @@ -344,11 +344,11 @@ func (d *Database) GetThreePIDsForLocalpart( } // GetFilter looks up the filter associated with a given local user and filter ID. -// Returns a filter represented as a byte slice. Otherwise returns an error if -// no such filter exists or if there was an error talking to the database. +// Returns a filter structure. Otherwise returns an error if no such filter exists +// or if there was an error talking to the database. func (d *Database) GetFilter( ctx context.Context, localpart string, filterID string, -) ([]byte, error) { +) (*gomatrixserverlib.Filter, error) { return d.filter.selectFilter(ctx, localpart, filterID) } @@ -356,7 +356,7 @@ func (d *Database) GetFilter( // Returns the filterID as a string. Otherwise returns an error if something // goes wrong. func (d *Database) PutFilter( - ctx context.Context, localpart string, filter []byte, + ctx context.Context, localpart string, filter *gomatrixserverlib.Filter, ) (string, error) { return d.filter.insertFilter(ctx, filter, localpart) } diff --git a/clientapi/routing/filter.go b/clientapi/routing/filter.go index 291a165b7..eec501ff7 100644 --- a/clientapi/routing/filter.go +++ b/clientapi/routing/filter.go @@ -17,13 +17,10 @@ package routing import ( "net/http" - "encoding/json" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -43,7 +40,7 @@ func GetFilter( return httputil.LogThenError(req, err) } - res, err := accountDB.GetFilter(req.Context(), localpart, filterID) + filter, err := accountDB.GetFilter(req.Context(), localpart, filterID) if err != nil { //TODO better error handling. This error message is *probably* right, // but if there are obscure db errors, this will also be returned, @@ -53,11 +50,6 @@ func GetFilter( JSON: jsonerror.NotFound("No such filter"), } } - filter := gomatrix.Filter{} - err = json.Unmarshal(res, &filter) - if err != nil { - return httputil.LogThenError(req, err) - } return util.JSONResponse{ Code: http.StatusOK, @@ -85,21 +77,21 @@ func PutFilter( return httputil.LogThenError(req, err) } - var filter gomatrix.Filter + var filter gomatrixserverlib.Filter if reqErr := httputil.UnmarshalJSONRequest(req, &filter); reqErr != nil { return *reqErr } - filterArray, err := json.Marshal(filter) - if err != nil { + // Validate generates a user-friendly error + if err = filter.Validate(); err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON("Filter is malformed"), + JSON: jsonerror.BadJSON("Invalid filter: " + err.Error()), } } - filterID, err := accountDB.PutFilter(req.Context(), localpart, filterArray) + filterID, err := accountDB.PutFilter(req.Context(), localpart, &filter) if err != nil { return httputil.LogThenError(req, err) } From 604685c5035d836d7069be92963191d6d9f49f84 Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Thu, 25 Jul 2019 00:15:36 +0800 Subject: [PATCH 06/14] Implement room creation content (#754) Fixes #660. Signed-off-by: Alex Chen minecnly@gmail.com --- clientapi/routing/createroom.go | 36 +++++++++++++++++++++++++++++++-- common/eventcontent.go | 12 +++++++++-- testfile | 3 +++ 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 220ba6ae8..8c5ee975c 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -15,6 +15,7 @@ package routing import ( + "encoding/json" "fmt" "net/http" "strings" @@ -97,6 +98,27 @@ func (r createRoomRequest) Validate() *util.JSONResponse { } } + // Validate creation_content fields defined in the spec by marshalling the + // creation_content map into bytes and then unmarshalling the bytes into + // common.CreateContent. + + creationContentBytes, err := json.Marshal(r.CreationContent) + if err != nil { + return &util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("malformed creation_content"), + } + } + + var CreationContent common.CreateContent + err = json.Unmarshal(creationContentBytes, &CreationContent) + if err != nil { + return &util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("malformed creation_content"), + } + } + return nil } @@ -154,7 +176,17 @@ func createRoom( JSON: jsonerror.InvalidArgumentValue(err.Error()), } } - // TODO: visibility/presets/raw initial state/creation content + + // Clobber keys: creator, room_version + + if r.CreationContent == nil { + r.CreationContent = make(map[string]interface{}, 2) + } + + r.CreationContent["creator"] = userID + r.CreationContent["room_version"] = "1" // TODO: We set this to 1 before we support Room versioning + + // TODO: visibility/presets/raw initial state // TODO: Create room alias association // Make sure this doesn't fall into an application service's namespace though! @@ -214,7 +246,7 @@ func createRoom( // harder to reason about, hence sticking to a strict static ordering. // TODO: Synapse has txn/token ID on each event. Do we need to do this here? eventsToMake := []fledglingEvent{ - {"m.room.create", "", common.CreateContent{Creator: userID}}, + {"m.room.create", "", r.CreationContent}, {"m.room.member", userID, membershipContent}, {"m.room.power_levels", "", common.InitialPowerLevelsContent(userID)}, // TODO: m.room.canonical_alias diff --git a/common/eventcontent.go b/common/eventcontent.go index 971c4f0a7..c45724fcd 100644 --- a/common/eventcontent.go +++ b/common/eventcontent.go @@ -16,8 +16,16 @@ package common // CreateContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-create type CreateContent struct { - Creator string `json:"creator"` - Federate *bool `json:"m.federate,omitempty"` + Creator string `json:"creator"` + Federate *bool `json:"m.federate,omitempty"` + RoomVersion string `json:"room_version,omitempty"` + Predecessor PreviousRoom `json:"predecessor,omitempty"` +} + +// PreviousRoom is the "Previous Room" structure defined at https://matrix.org/docs/spec/client_server/r0.5.0#m-room-create +type PreviousRoom struct { + RoomID string `json:"room_id"` + EventID string `json:"event_id"` } // MemberContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-member diff --git a/testfile b/testfile index 1b2ad9e69..4003638f1 100644 --- a/testfile +++ b/testfile @@ -151,3 +151,6 @@ Inbound federation of state requires event_id as a mandatory paramater Inbound federation of state_ids requires event_id as a mandatory paramater POST /register returns the same device_id as that in the request POST /login returns the same device_id as that in the request +POST /createRoom with creation content +User can create and send/receive messages in a room with version 1 +POST /createRoom ignores attempts to set the room version via creation_content From 45d24d3fb5b84b89bcd8eec7058896b3b4a6f2e3 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 24 Jul 2019 18:41:39 +0100 Subject: [PATCH 07/14] Remove the buildkite pipeline (#763) New repo: https://github.com/matrix-org/pipelines/ --- .buildkite/pipeline.yaml | 49 ---------------------------------------- 1 file changed, 49 deletions(-) delete mode 100644 .buildkite/pipeline.yaml diff --git a/.buildkite/pipeline.yaml b/.buildkite/pipeline.yaml deleted file mode 100644 index 9d755a244..000000000 --- a/.buildkite/pipeline.yaml +++ /dev/null @@ -1,49 +0,0 @@ -steps: - - command: - # https://github.com/golangci/golangci-lint#memory-usage-of-golangci-lint - - "GOGC=20 ./scripts/find-lint.sh" - label: "\U0001F9F9 Lint / :go: 1.12" - agents: - # Use a larger instance as linting takes a looot of memory - queue: "medium" - plugins: - - docker#v3.0.1: - image: "golang:1.12" - - - wait - - - command: - - "go build ./cmd/..." - label: "\U0001F528 Build / :go: 1.11" - plugins: - - docker#v3.0.1: - image: "golang:1.11" - retry: - automatic: - - exit_status: 128 - limit: 3 - - - command: - - "go build ./cmd/..." - label: "\U0001F528 Build / :go: 1.12" - plugins: - - docker#v3.0.1: - image: "golang:1.12" - retry: - automatic: - - exit_status: 128 - limit: 3 - - - command: - - "go test ./..." - label: "\U0001F9EA Unit tests / :go: 1.11" - plugins: - - docker#v3.0.1: - image: "golang:1.11" - - - command: - - "go test ./..." - label: "\U0001F9EA Unit tests / :go: 1.12" - plugins: - - docker#v3.0.1: - image: "golang:1.12" From e66933b108a4656d01bfa6285c782bdbfb249756 Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Fri, 26 Jul 2019 00:00:22 +0800 Subject: [PATCH 08/14] Fix data races reported by go test -race ./... (#748) --- syncapi/sync/notifier.go | 1 + syncapi/sync/notifier_test.go | 21 +++++++++++++++------ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 30ac3a2e5..14bc2efb6 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -185,6 +185,7 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) { // fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, // a stream will be made for this user if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. +// NB: Callers should have locked the mutex before calling this function. func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { stream, ok := n.userStreams[userID] if !ok && makeIfNotExists { diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index 904315e9f..808e07cc7 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -143,7 +143,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { wg.Done() }() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 1) n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) @@ -171,7 +171,7 @@ func TestNewInviteEventForUser(t *testing.T) { wg.Done() }() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 1) n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter) @@ -199,7 +199,7 @@ func TestEDUWakeup(t *testing.T) { wg.Done() }() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 1) n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU) @@ -230,7 +230,7 @@ func TestMultipleRequestWakeup(t *testing.T) { go poll() go poll() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 3) n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) @@ -266,14 +266,14 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { } leaveWG.Done() }() - bobStream := n.fetchUserStream(bob, true) + bobStream := lockedFetchUserStream(n, bob) waitForBlocking(bobStream, 1) n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter) leaveWG.Wait() // send an event into the room. Make sure alice gets it. Bob should not. var aliceWG sync.WaitGroup - aliceStream := n.fetchUserStream(alice, true) + aliceStream := lockedFetchUserStream(n, alice) aliceWG.Add(1) go func() { pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter)) @@ -328,6 +328,15 @@ func waitForBlocking(s *UserStream, numBlocking uint) { } } +// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock. +// A new stream is made if it doesn't exist already. +func lockedFetchUserStream(n *Notifier, userID string) *UserStream { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + return n.fetchUserStream(userID, true) +} + func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest { return syncRequest{ device: authtypes.Device{UserID: userID}, From 3e6d0a6246ed1bc10760c214fce762e6880bc5be Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Mon, 29 Jul 2019 15:18:21 +0800 Subject: [PATCH 09/14] Add newly passing tests from matrix-org/sytest 56de891 (#769) --- testfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testfile b/testfile index 4003638f1..2f9229e87 100644 --- a/testfile +++ b/testfile @@ -154,3 +154,5 @@ POST /login returns the same device_id as that in the request POST /createRoom with creation content User can create and send/receive messages in a room with version 1 POST /createRoom ignores attempts to set the room version via creation_content +Inbound federation rejects remote attempts to join local users to rooms +Inbound federation rejects remote attempts to kick local users to rooms From 40e44c5f3b445b2e5bb7dc98d991a9177d0f2abb Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Wed, 31 Jul 2019 20:45:45 +0800 Subject: [PATCH 10/14] Add newly passing tests from matrix-org/sytest (#771) Signed-off-by: Alex Chen --- testfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testfile b/testfile index 2f9229e87..c23dda662 100644 --- a/testfile +++ b/testfile @@ -156,3 +156,5 @@ User can create and send/receive messages in a room with version 1 POST /createRoom ignores attempts to set the room version via creation_content Inbound federation rejects remote attempts to join local users to rooms Inbound federation rejects remote attempts to kick local users to rooms +An event which redacts itself should be ignored +A pair of events which redact each other should be ignored From 3e1abe9ad3464a6c47dfcf003bb4bf6422c820af Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Wed, 31 Jul 2019 21:20:11 +0800 Subject: [PATCH 11/14] Fix /sync may contain duplicate EDUs and EDUs for left rooms (#752) In 29841be (#718), EDUs are added to /sync responses for rooms listed in joinedRoomIDs returned by addPDUDeltaToResponse. However this list may contain rooms other than those currently joined. Some variable renamings are done to make golangci-lint pass. Signed-off-by: Alex Chen minecnly@gmail.com --- syncapi/storage/syncserver.go | 44 +++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index b4d7ccbd2..20fa8a4e0 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -35,6 +35,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +const ( + membershipJoin = "join" + membershipLeave = "leave" + membershipBan = "ban" +) + type stateDelta struct { roomID string stateEvents []gomatrixserverlib.Event @@ -248,14 +254,12 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + deltas, joinedRoomIDs, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) if err != nil { return nil, err } - joinedRoomIDs := make([]string, 0, len(deltas)) for _, delta := range deltas { - joinedRoomIDs = append(joinedRoomIDs, delta.roomID) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err @@ -344,7 +348,7 @@ func (d *SyncServerDatasource) IncrementalSync( ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( - ctx, nil, device.UserID, "join", + ctx, nil, device.UserID, membershipJoin, ) } if err != nil { @@ -393,7 +397,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( res = types.NewResponse(toPos) // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { return } @@ -571,7 +575,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( res *types.Response, ) error { endPos := toPos - if delta.membershipPos > 0 && delta.membership == "leave" { + if delta.membershipPos > 0 && delta.membership == membershipLeave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). @@ -595,7 +599,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( } switch delta.membership { - case "join": + case membershipJoin: jr := types.NewJoinResponse() if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { // Use the short form of batch token for prev_batch @@ -608,9 +612,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr - case "leave": + case membershipLeave: fallthrough // transitions to leave are the same as ban - case "ban": + case membershipBan: // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() @@ -716,10 +720,14 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( return events, nil } +// getStateDeltas returns the state deltas between fromPos and toPos, +// exclusive of oldPos, inclusive of newPos, for the rooms in which +// the user has new membership events. +// A list of joined room IDs is also returned in case the caller needs it. func (d *SyncServerDatasource) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, fromPos, toPos int64, userID string, -) ([]stateDelta, error) { +) ([]stateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -733,11 +741,11 @@ func (d *SyncServerDatasource) getStateDeltas( // get all the state events ever between these two positions stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) if err != nil { - return nil, err + return nil, nil, err } state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) if err != nil { - return nil, err + return nil, nil, err } for roomID, stateStreamEvents := range state { @@ -748,12 +756,12 @@ func (d *SyncServerDatasource) getStateDeltas( // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the timeline. if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { - if membership == "join" { + if membership == membershipJoin { // send full room state down instead of a delta var allState []gomatrixserverlib.Event allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) if err != nil { - return nil, err + return nil, nil, err } s := make([]streamEvent, len(allState)) for i := 0; i < len(s); i++ { @@ -775,19 +783,19 @@ func (d *SyncServerDatasource) getStateDeltas( } // Add in currently joined rooms - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { - return nil, err + return nil, nil, err } for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, stateDelta{ - membership: "join", + membership: membershipJoin, stateEvents: streamEventsToEvents(device, state[joinedRoomID]), roomID: joinedRoomID, }) } - return deltas, nil + return deltas, joinedRoomIDs, nil } // streamEventsToEvents converts streamEvent to Event. If device is non-nil and From 92db6cd0eaecbd2c27f83488a176ef76ecf16f36 Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Wed, 31 Jul 2019 21:36:21 +0800 Subject: [PATCH 12/14] Fix index in invites_table.go (#770) This PR fixes a possible typo in an index created in invites_table.go. Signed-off-by: Alex Chen minecnly@gmail.com --- syncapi/storage/invites_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncapi/storage/invites_table.go b/syncapi/storage/invites_table.go index 88c98f7e3..9f52087f6 100644 --- a/syncapi/storage/invites_table.go +++ b/syncapi/storage/invites_table.go @@ -23,7 +23,7 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx -- For deleting old invites CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx - ON syncapi_invite_events(target_user_id, id); + ON syncapi_invite_events (event_id); ` const insertInviteEventSQL = "" + From 3578d77d259c852a16dc430725d348e2c62c4ff5 Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Thu, 1 Aug 2019 12:36:13 +0800 Subject: [PATCH 14/14] Implement "full_state" query parameter for /sync (#751) Closes #637. --- syncapi/storage/syncserver.go | 133 +++++++++++++++++++++++++++------- syncapi/sync/requestpool.go | 12 ++- testfile | 1 + 3 files changed, 117 insertions(+), 29 deletions(-) diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index 20fa8a4e0..e914bddfe 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -241,6 +241,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( device authtypes.Device, fromPos, toPos int64, numRecentEventsPerRoom int, + wantFullState bool, res *types.Response, ) ([]string, error) { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) @@ -254,7 +255,13 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, joinedRoomIDs, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + var deltas []stateDelta + var joinedRoomIDs []string + if !wantFullState { + deltas, joinedRoomIDs, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + } else { + deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID) + } if err != nil { return nil, err } @@ -336,15 +343,16 @@ func (d *SyncServerDatasource) IncrementalSync( device authtypes.Device, fromPos, toPos types.SyncPosition, numRecentEventsPerRoom int, + wantFullState bool, ) (*types.Response, error) { nextBatchPos := fromPos.WithUpdates(toPos) res := types.NewResponse(nextBatchPos) var joinedRoomIDs []string var err error - if fromPos.PDUPosition != toPos.PDUPosition { + if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, + ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res, ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( @@ -593,21 +601,30 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( recentEvents := streamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - // Don't bother appending empty room entries - if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { - return nil + var prevPDUPos int64 + + if len(recentEvents) == 0 { + if len(delta.stateEvents) == 0 { + // Don't bother appending empty room entries + return nil + } + + // If full_state=true and since is already up to date, then we'll have + // state events but no recent events. + prevPDUPos = toPos - 1 + } else { + prevPDUPos = recentStreamEvents[0].streamPosition - 1 + } + + if prevPDUPos <= 0 { + prevPDUPos = 1 } switch delta.membership { case membershipJoin: jr := types.NewJoinResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -618,13 +635,8 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -758,15 +770,11 @@ func (d *SyncServerDatasource) getStateDeltas( if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership == membershipJoin { // send full room state down instead of a delta - var allState []gomatrixserverlib.Event - allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + var s []streamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID) if err != nil { return nil, nil, err } - s := make([]streamEvent, len(allState)) - for i := 0; i < len(s); i++ { - s[i] = streamEvent{Event: allState[i], streamPosition: 0} - } state[roomID] = s continue // we'll add this room in when we do joined rooms } @@ -798,6 +806,79 @@ func (d *SyncServerDatasource) getStateDeltas( return deltas, joinedRoomIDs, nil } +// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// requests with full_state=true. +// Fetches full state for all joined rooms and uses selectStateInRange to get +// updates for other rooms. +func (d *SyncServerDatasource) getStateDeltasForFullStateSync( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos int64, userID string, +) ([]stateDelta, []string, error) { + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + if err != nil { + return nil, nil, err + } + + // Use a reasonable initial capacity + deltas := make([]stateDelta, 0, len(joinedRoomIDs)) + + // Add full states for all joined rooms + for _, joinedRoomID := range joinedRoomIDs { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID) + if stateErr != nil { + return nil, nil, stateErr + } + deltas = append(deltas, stateDelta{ + membership: "join", + stateEvents: streamEventsToEvents(device, s), + roomID: joinedRoomID, + }) + } + + // Get all the state events ever between these two positions + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) + if err != nil { + return nil, nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership != "join" { // We've already added full state for all joined rooms above. + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + } + + break + } + } + } + + return deltas, joinedRoomIDs, nil +} + +func (d *SyncServerDatasource) currentStateStreamEventsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) ([]streamEvent, error) { + allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{Event: allState[i], streamPosition: 0} + } + return s, nil +} + // streamEventsToEvents converts streamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a6ec6bd92..d773a4606 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -65,8 +65,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos := rp.notifier.CurrentPosition() - // If this is an initial sync or timeout=0 we return immediately - if syncReq.since == nil || syncReq.timeout == 0 { + if shouldReturnImmediately(syncReq) { syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) @@ -135,7 +134,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState) } if err != nil { @@ -216,3 +215,10 @@ func (rp *RequestPool) appendAccountData( return data, nil } + +// shouldReturnImmediately returns whether the /sync request is an initial sync, +// or timeout=0, or full_state=true, in any of the cases the request should +// return immediately. +func shouldReturnImmediately(syncReq *syncRequest) bool { + return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState +} diff --git a/testfile b/testfile index c23dda662..4c5163e5e 100644 --- a/testfile +++ b/testfile @@ -158,3 +158,4 @@ Inbound federation rejects remote attempts to join local users to rooms Inbound federation rejects remote attempts to kick local users to rooms An event which redacts itself should be ignored A pair of events which redact each other should be ignored +Full state sync includes joined rooms