diff --git a/.buildkite/pipeline.yaml b/.buildkite/pipeline.yaml deleted file mode 100644 index 9d755a244..000000000 --- a/.buildkite/pipeline.yaml +++ /dev/null @@ -1,49 +0,0 @@ -steps: - - command: - # https://github.com/golangci/golangci-lint#memory-usage-of-golangci-lint - - "GOGC=20 ./scripts/find-lint.sh" - label: "\U0001F9F9 Lint / :go: 1.12" - agents: - # Use a larger instance as linting takes a looot of memory - queue: "medium" - plugins: - - docker#v3.0.1: - image: "golang:1.12" - - - wait - - - command: - - "go build ./cmd/..." - label: "\U0001F528 Build / :go: 1.11" - plugins: - - docker#v3.0.1: - image: "golang:1.11" - retry: - automatic: - - exit_status: 128 - limit: 3 - - - command: - - "go build ./cmd/..." - label: "\U0001F528 Build / :go: 1.12" - plugins: - - docker#v3.0.1: - image: "golang:1.12" - retry: - automatic: - - exit_status: 128 - limit: 3 - - - command: - - "go test ./..." - label: "\U0001F9EA Unit tests / :go: 1.11" - plugins: - - docker#v3.0.1: - image: "golang:1.11" - - - command: - - "go test ./..." - label: "\U0001F9EA Unit tests / :go: 1.12" - plugins: - - docker#v3.0.1: - image: "golang:1.12" diff --git a/clientapi/auth/storage/accounts/filter_table.go b/clientapi/auth/storage/accounts/filter_table.go index 81bae4545..2b07ef17e 100644 --- a/clientapi/auth/storage/accounts/filter_table.go +++ b/clientapi/auth/storage/accounts/filter_table.go @@ -17,6 +17,7 @@ package accounts import ( "context" "database/sql" + "encoding/json" "github.com/matrix-org/gomatrixserverlib" ) @@ -71,25 +72,44 @@ func (s *filterStatements) prepare(db *sql.DB) (err error) { func (s *filterStatements) selectFilter( ctx context.Context, localpart string, filterID string, -) (filter []byte, err error) { - err = s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filter) - return +) (*gomatrixserverlib.Filter, error) { + // Retrieve filter from database (stored as canonical JSON) + var filterData []byte + err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) + if err != nil { + return nil, err + } + + // Unmarshal JSON into Filter struct + var filter gomatrixserverlib.Filter + if err = json.Unmarshal(filterData, &filter); err != nil { + return nil, err + } + return &filter, nil } func (s *filterStatements) insertFilter( - ctx context.Context, filter []byte, localpart string, + ctx context.Context, filter *gomatrixserverlib.Filter, localpart string, ) (filterID string, err error) { var existingFilterID string - // This can result in a race condition when two clients try to insert the - // same filter and localpart at the same time, however this is not a - // problem as both calls will result in the same filterID - filterJSON, err := gomatrixserverlib.CanonicalJSON(filter) + // Serialise json + filterJSON, err := json.Marshal(filter) + if err != nil { + return "", err + } + // Remove whitespaces and sort JSON data + // needed to prevent from inserting the same filter multiple times + filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON) if err != nil { return "", err } - // Check if filter already exists in the database + // Check if filter already exists in the database using its localpart and content + // + // This can result in a race condition when two clients try to insert the + // same filter and localpart at the same time, however this is not a + // problem as both calls will result in the same filterID err = s.selectFilterIDByContentStmt.QueryRowContext(ctx, localpart, filterJSON).Scan(&existingFilterID) if err != nil && err != sql.ErrNoRows { diff --git a/clientapi/auth/storage/accounts/storage.go b/clientapi/auth/storage/accounts/storage.go index 27c0a176a..5c8ffffeb 100644 --- a/clientapi/auth/storage/accounts/storage.go +++ b/clientapi/auth/storage/accounts/storage.go @@ -344,11 +344,11 @@ func (d *Database) GetThreePIDsForLocalpart( } // GetFilter looks up the filter associated with a given local user and filter ID. -// Returns a filter represented as a byte slice. Otherwise returns an error if -// no such filter exists or if there was an error talking to the database. +// Returns a filter structure. Otherwise returns an error if no such filter exists +// or if there was an error talking to the database. func (d *Database) GetFilter( ctx context.Context, localpart string, filterID string, -) ([]byte, error) { +) (*gomatrixserverlib.Filter, error) { return d.filter.selectFilter(ctx, localpart, filterID) } @@ -356,7 +356,7 @@ func (d *Database) GetFilter( // Returns the filterID as a string. Otherwise returns an error if something // goes wrong. func (d *Database) PutFilter( - ctx context.Context, localpart string, filter []byte, + ctx context.Context, localpart string, filter *gomatrixserverlib.Filter, ) (string, error) { return d.filter.insertFilter(ctx, filter, localpart) } diff --git a/clientapi/auth/storage/devices/devices_table.go b/clientapi/auth/storage/devices/devices_table.go index 96d6521d8..60aa563a2 100644 --- a/clientapi/auth/storage/devices/devices_table.go +++ b/clientapi/auth/storage/devices/devices_table.go @@ -169,6 +169,8 @@ func (s *devicesStatements) selectDeviceByToken( return &dev, err } +// selectDeviceByID retrieves a device from the database with the given user +// localpart and deviceID func (s *devicesStatements) selectDeviceByID( ctx context.Context, localpart, deviceID string, ) (*authtypes.Device, error) { diff --git a/clientapi/auth/storage/devices/storage.go b/clientapi/auth/storage/devices/storage.go index 7032fe7bf..82c8e97a2 100644 --- a/clientapi/auth/storage/devices/storage.go +++ b/clientapi/auth/storage/devices/storage.go @@ -84,7 +84,7 @@ func (d *Database) CreateDevice( if deviceID != nil { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - // Revoke existing token for this device + // Revoke existing tokens for this device if err = d.devices.deleteDevice(ctx, txn, *deviceID, localpart); err != nil { return err } diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 220ba6ae8..8c5ee975c 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -15,6 +15,7 @@ package routing import ( + "encoding/json" "fmt" "net/http" "strings" @@ -97,6 +98,27 @@ func (r createRoomRequest) Validate() *util.JSONResponse { } } + // Validate creation_content fields defined in the spec by marshalling the + // creation_content map into bytes and then unmarshalling the bytes into + // common.CreateContent. + + creationContentBytes, err := json.Marshal(r.CreationContent) + if err != nil { + return &util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("malformed creation_content"), + } + } + + var CreationContent common.CreateContent + err = json.Unmarshal(creationContentBytes, &CreationContent) + if err != nil { + return &util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("malformed creation_content"), + } + } + return nil } @@ -154,7 +176,17 @@ func createRoom( JSON: jsonerror.InvalidArgumentValue(err.Error()), } } - // TODO: visibility/presets/raw initial state/creation content + + // Clobber keys: creator, room_version + + if r.CreationContent == nil { + r.CreationContent = make(map[string]interface{}, 2) + } + + r.CreationContent["creator"] = userID + r.CreationContent["room_version"] = "1" // TODO: We set this to 1 before we support Room versioning + + // TODO: visibility/presets/raw initial state // TODO: Create room alias association // Make sure this doesn't fall into an application service's namespace though! @@ -214,7 +246,7 @@ func createRoom( // harder to reason about, hence sticking to a strict static ordering. // TODO: Synapse has txn/token ID on each event. Do we need to do this here? eventsToMake := []fledglingEvent{ - {"m.room.create", "", common.CreateContent{Creator: userID}}, + {"m.room.create", "", r.CreationContent}, {"m.room.member", userID, membershipContent}, {"m.room.power_levels", "", common.InitialPowerLevelsContent(userID)}, // TODO: m.room.canonical_alias diff --git a/clientapi/routing/filter.go b/clientapi/routing/filter.go index 291a165b7..eec501ff7 100644 --- a/clientapi/routing/filter.go +++ b/clientapi/routing/filter.go @@ -17,13 +17,10 @@ package routing import ( "net/http" - "encoding/json" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -43,7 +40,7 @@ func GetFilter( return httputil.LogThenError(req, err) } - res, err := accountDB.GetFilter(req.Context(), localpart, filterID) + filter, err := accountDB.GetFilter(req.Context(), localpart, filterID) if err != nil { //TODO better error handling. This error message is *probably* right, // but if there are obscure db errors, this will also be returned, @@ -53,11 +50,6 @@ func GetFilter( JSON: jsonerror.NotFound("No such filter"), } } - filter := gomatrix.Filter{} - err = json.Unmarshal(res, &filter) - if err != nil { - return httputil.LogThenError(req, err) - } return util.JSONResponse{ Code: http.StatusOK, @@ -85,21 +77,21 @@ func PutFilter( return httputil.LogThenError(req, err) } - var filter gomatrix.Filter + var filter gomatrixserverlib.Filter if reqErr := httputil.UnmarshalJSONRequest(req, &filter); reqErr != nil { return *reqErr } - filterArray, err := json.Marshal(filter) - if err != nil { + // Validate generates a user-friendly error + if err = filter.Validate(); err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON("Filter is malformed"), + JSON: jsonerror.BadJSON("Invalid filter: " + err.Error()), } } - filterID, err := accountDB.PutFilter(req.Context(), localpart, filterArray) + filterID, err := accountDB.PutFilter(req.Context(), localpart, &filter) if err != nil { return httputil.LogThenError(req, err) } diff --git a/clientapi/routing/login.go b/clientapi/routing/login.go index 2e2d409f6..02d958152 100644 --- a/clientapi/routing/login.go +++ b/clientapi/routing/login.go @@ -18,7 +18,6 @@ import ( "net/http" "context" - "database/sql" "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -42,10 +41,12 @@ type flow struct { } type passwordRequest struct { - User string `json:"user"` - Password string `json:"password"` + User string `json:"user"` + Password string `json:"password"` + // Both DeviceID and InitialDisplayName can be omitted, or empty strings ("") + // Thus a pointer is needed to differentiate between the two InitialDisplayName *string `json:"initial_device_display_name"` - DeviceID string `json:"device_id"` + DeviceID *string `json:"device_id"` } type loginResponse struct { @@ -110,7 +111,7 @@ func Login( return httputil.LogThenError(req, err) } - dev, err := getDevice(req.Context(), r, deviceDB, acc, localpart, token) + dev, err := getDevice(req.Context(), r, deviceDB, acc, token) if err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError, @@ -134,20 +135,16 @@ func Login( } } -// check if device exists else create one +// getDevice returns a new or existing device func getDevice( ctx context.Context, r passwordRequest, deviceDB *devices.Database, acc *authtypes.Account, - localpart, token string, + token string, ) (dev *authtypes.Device, err error) { - dev, err = deviceDB.GetDeviceByID(ctx, localpart, r.DeviceID) - if err == sql.ErrNoRows { - // device doesn't exist, create one - dev, err = deviceDB.CreateDevice( - ctx, acc.Localpart, nil, token, r.InitialDisplayName, - ) - } + dev, err = deviceDB.CreateDevice( + ctx, acc.Localpart, r.DeviceID, token, r.InitialDisplayName, + ) return } diff --git a/clientapi/routing/register.go b/clientapi/routing/register.go index fa15f4fc0..c5a3d3018 100644 --- a/clientapi/routing/register.go +++ b/clientapi/routing/register.go @@ -121,7 +121,10 @@ type registerRequest struct { // user-interactive auth params Auth authDict `json:"auth"` + // Both DeviceID and InitialDisplayName can be omitted, or empty strings ("") + // Thus a pointer is needed to differentiate between the two InitialDisplayName *string `json:"initial_device_display_name"` + DeviceID *string `json:"device_id"` // Prevent this user from logging in InhibitLogin common.WeakBoolean `json:"inhibit_login"` @@ -626,7 +629,7 @@ func handleApplicationServiceRegistration( // application service registration is entirely separate. return completeRegistration( req.Context(), accountDB, deviceDB, r.Username, "", appserviceID, - r.InhibitLogin, r.InitialDisplayName, + r.InhibitLogin, r.InitialDisplayName, r.DeviceID, ) } @@ -646,7 +649,7 @@ func checkAndCompleteFlow( // This flow was completed, registration can continue return completeRegistration( req.Context(), accountDB, deviceDB, r.Username, r.Password, "", - r.InhibitLogin, r.InitialDisplayName, + r.InhibitLogin, r.InitialDisplayName, r.DeviceID, ) } @@ -697,10 +700,10 @@ func LegacyRegister( return util.MessageResponse(http.StatusForbidden, "HMAC incorrect") } - return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil) + return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil, nil) case authtypes.LoginTypeDummy: // there is nothing to do - return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil) + return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil, nil) default: return util.JSONResponse{ Code: http.StatusNotImplemented, @@ -738,13 +741,19 @@ func parseAndValidateLegacyLogin(req *http.Request, r *legacyRegisterRequest) *u return nil } +// completeRegistration runs some rudimentary checks against the submitted +// input, then if successful creates an account and a newly associated device +// We pass in each individual part of the request here instead of just passing a +// registerRequest, as this function serves requests encoded as both +// registerRequests and legacyRegisterRequests, which share some attributes but +// not all func completeRegistration( ctx context.Context, accountDB *accounts.Database, deviceDB *devices.Database, username, password, appserviceID string, inhibitLogin common.WeakBoolean, - displayName *string, + displayName, deviceID *string, ) util.JSONResponse { if username == "" { return util.JSONResponse{ @@ -773,6 +782,9 @@ func completeRegistration( } } + // Increment prometheus counter for created users + amtRegUsers.Inc() + // Check whether inhibit_login option is set. If so, don't create an access // token or a device for this user if inhibitLogin { @@ -793,8 +805,7 @@ func completeRegistration( } } - // TODO: Use the device ID in the request. - dev, err := deviceDB.CreateDevice(ctx, username, nil, token, displayName) + dev, err := deviceDB.CreateDevice(ctx, username, deviceID, token, displayName) if err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError, @@ -802,9 +813,6 @@ func completeRegistration( } } - // Increment prometheus counter for created users - amtRegUsers.Inc() - return util.JSONResponse{ Code: http.StatusOK, JSON: registerResponse{ diff --git a/common/config/config_test.go b/common/config/config_test.go index acc4dbd12..110c8b84c 100644 --- a/common/config/config_test.go +++ b/common/config/config_test.go @@ -54,12 +54,14 @@ database: server_key: "postgresql:///server_keys" sync_api: "postgresql:///syn_api" room_server: "postgresql:///room_server" + appservice: "postgresql:///appservice" listen: room_server: "localhost:7770" client_api: "localhost:7771" federation_api: "localhost:7772" sync_api: "localhost:7773" media_api: "localhost:7774" + appservice_api: "localhost:7777" typing_server: "localhost:7778" logging: - type: "file" diff --git a/common/eventcontent.go b/common/eventcontent.go index 971c4f0a7..c45724fcd 100644 --- a/common/eventcontent.go +++ b/common/eventcontent.go @@ -16,8 +16,16 @@ package common // CreateContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-create type CreateContent struct { - Creator string `json:"creator"` - Federate *bool `json:"m.federate,omitempty"` + Creator string `json:"creator"` + Federate *bool `json:"m.federate,omitempty"` + RoomVersion string `json:"room_version,omitempty"` + Predecessor PreviousRoom `json:"predecessor,omitempty"` +} + +// PreviousRoom is the "Previous Room" structure defined at https://matrix.org/docs/spec/client_server/r0.5.0#m-room-create +type PreviousRoom struct { + RoomID string `json:"room_id"` + EventID string `json:"event_id"` } // MemberContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-member diff --git a/common/log.go b/common/log.go index 89a705822..f9ed84edb 100644 --- a/common/log.go +++ b/common/log.go @@ -15,9 +15,12 @@ package common import ( + "fmt" "os" "path" "path/filepath" + "runtime" + "strings" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dugong" @@ -54,15 +57,35 @@ func (h *logLevelHook) Levels() []logrus.Level { return levels } +// callerPrettyfier is a function that given a runtime.Frame object, will +// extract the calling function's name and file, and return them in a nicely +// formatted way +func callerPrettyfier(f *runtime.Frame) (string, string) { + // Retrieve just the function name + s := strings.Split(f.Function, ".") + funcname := s[len(s)-1] + + // Append a newline + tab to it to move the actual log content to its own line + funcname += "\n\t" + + // Surround the filepath in brackets and append line number so IDEs can quickly + // navigate + filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line) + + return funcname, filename +} + // SetupStdLogging configures the logging format to standard output. Typically, it is called when the config is not yet loaded. func SetupStdLogging() { + logrus.SetReportCaller(true) logrus.SetFormatter(&utcFormatter{ &logrus.TextFormatter{ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00", FullTimestamp: true, DisableColors: false, DisableTimestamp: false, - DisableSorting: false, + QuoteEmptyFields: true, + CallerPrettyfier: callerPrettyfier, }, }) } @@ -71,8 +94,8 @@ func SetupStdLogging() { // If something fails here it means that the logging was improperly configured, // so we just exit with the error func SetupHookLogging(hooks []config.LogrusHook, componentName string) { + logrus.SetReportCaller(true) for _, hook := range hooks { - // Check we received a proper logging level level, err := logrus.ParseLevel(hook.Level) if err != nil { @@ -126,6 +149,7 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str DisableColors: true, DisableTimestamp: false, DisableSorting: false, + QuoteEmptyFields: true, }, }, &dugong.DailyRotationSchedule{GZip: true}, diff --git a/go.mod b/go.mod index 3b4b736a4..5d01012c9 100644 --- a/go.mod +++ b/go.mod @@ -20,10 +20,11 @@ require ( github.com/jaegertracing/jaeger-client-go v0.0.0-20170921145708-3ad49a1d839b github.com/jaegertracing/jaeger-lib v0.0.0-20170920222118-21a3da6d66fe github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 + github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/lib/pq v0.0.0-20170918175043-23da1db4f16d github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2 + github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6 github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0 github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 github.com/matttproud/golang_protobuf_extensions v1.0.1 @@ -40,8 +41,9 @@ require ( github.com/prometheus/common v0.0.0-20170108231212-dd2f054febf4 github.com/prometheus/procfs v0.0.0-20170128160123-1878d9fbb537 github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5 - github.com/sirupsen/logrus v1.3.0 - github.com/stretchr/testify v1.2.2 + github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/objx v0.2.0 // indirect + github.com/stretchr/testify v1.3.0 github.com/tidwall/gjson v1.1.5 github.com/tidwall/match v1.0.1 github.com/tidwall/sjson v1.0.3 @@ -54,7 +56,7 @@ require ( go.uber.org/zap v1.7.1 golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95 - golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 + golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 gopkg.in/Shopify/sarama.v1 v1.11.0 gopkg.in/airbrake/gobrake.v2 v2.0.9 gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20170727041045-23bcc3c4eae3 diff --git a/go.sum b/go.sum index 8026640b5..53151121b 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,7 @@ github.com/jaegertracing/jaeger-lib v0.0.0-20170920222118-21a3da6d66fe/go.mod h1 github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 h1:KAZ1BW2TCmT6PRihDPpocIy1QTtsAsrx6TneU/4+CMg= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -53,6 +54,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20181109104322-1c2cbc0872f0 h1:3U github.com/matrix-org/gomatrixserverlib v0.0.0-20181109104322-1c2cbc0872f0/go.mod h1:YHyhIQUmuXyKtoVfDUMk/DyU93Taamlu6nPZkij/JtA= github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2 h1:pYajAEdi3sowj4iSunqctchhcMNW3rDjeeH0T4uDkMY= github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2/go.mod h1:sf0RcKOdiwJeTti7A313xsaejNUGYDq02MQZ4JD4w/E= +github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6 h1:B8n1H5Wb1B5jwLzTylBpY0kJCMRqrofT7PmOw4aJFJA= +github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6/go.mod h1:sf0RcKOdiwJeTti7A313xsaejNUGYDq02MQZ4JD4w/E= github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0 h1:p7WTwG+aXM86+yVrYAiCMW3ZHSmotVvuRbjtt3jC+4A= github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171013132526-8b1c8ab81986 h1:TiWl4hLvezAhRPM8tPcPDFTysZ7k4T/1J4GPp/iqlZo= @@ -90,9 +93,14 @@ github.com/sirupsen/logrus v0.0.0-20170822132746-89742aefa4b2 h1:+8J/sCAVv2Y9Ct1 github.com/sirupsen/logrus v0.0.0-20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v0.0.0-20170809224252-890a5c3458b4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/tidwall/gjson v1.0.2 h1:5BsM7kyEAHAUGEGDkEKO9Mdyiuw6QQ6TSDdarP0Nnmk= github.com/tidwall/gjson v1.0.2/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA= github.com/tidwall/gjson v1.1.5 h1:QysILxBeUEY3GTLA0fQVgkQG1zme8NxGvhh2SSqWNwI= @@ -128,6 +136,9 @@ golang.org/x/sys v0.0.0-20171012164349-43eea11bc926 h1:PY6OU86NqbyZiOzaPnDw6oOjA golang.org/x/sys v0.0.0-20171012164349-43eea11bc926/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= gopkg.in/Shopify/sarama.v1 v1.11.0 h1:/3kaCyeYaPbr59IBjeqhIcUOB1vXlIVqXAYa5g5C5F0= gopkg.in/Shopify/sarama.v1 v1.11.0/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= diff --git a/syncapi/storage/invites_table.go b/syncapi/storage/invites_table.go index 88c98f7e3..9f52087f6 100644 --- a/syncapi/storage/invites_table.go +++ b/syncapi/storage/invites_table.go @@ -23,7 +23,7 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx -- For deleting old invites CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx - ON syncapi_invite_events(target_user_id, id); + ON syncapi_invite_events (event_id); ` const insertInviteEventSQL = "" + diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index b4d7ccbd2..e914bddfe 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -35,6 +35,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +const ( + membershipJoin = "join" + membershipLeave = "leave" + membershipBan = "ban" +) + type stateDelta struct { roomID string stateEvents []gomatrixserverlib.Event @@ -235,6 +241,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( device authtypes.Device, fromPos, toPos int64, numRecentEventsPerRoom int, + wantFullState bool, res *types.Response, ) ([]string, error) { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) @@ -248,14 +255,18 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + var deltas []stateDelta + var joinedRoomIDs []string + if !wantFullState { + deltas, joinedRoomIDs, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + } else { + deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID) + } if err != nil { return nil, err } - joinedRoomIDs := make([]string, 0, len(deltas)) for _, delta := range deltas { - joinedRoomIDs = append(joinedRoomIDs, delta.roomID) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err @@ -332,19 +343,20 @@ func (d *SyncServerDatasource) IncrementalSync( device authtypes.Device, fromPos, toPos types.SyncPosition, numRecentEventsPerRoom int, + wantFullState bool, ) (*types.Response, error) { nextBatchPos := fromPos.WithUpdates(toPos) res := types.NewResponse(nextBatchPos) var joinedRoomIDs []string var err error - if fromPos.PDUPosition != toPos.PDUPosition { + if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, + ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res, ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( - ctx, nil, device.UserID, "join", + ctx, nil, device.UserID, membershipJoin, ) } if err != nil { @@ -393,7 +405,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( res = types.NewResponse(toPos) // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { return } @@ -571,7 +583,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( res *types.Response, ) error { endPos := toPos - if delta.membershipPos > 0 && delta.membership == "leave" { + if delta.membershipPos > 0 && delta.membership == membershipLeave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). @@ -589,38 +601,42 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( recentEvents := streamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - // Don't bother appending empty room entries - if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { - return nil + var prevPDUPos int64 + + if len(recentEvents) == 0 { + if len(delta.stateEvents) == 0 { + // Don't bother appending empty room entries + return nil + } + + // If full_state=true and since is already up to date, then we'll have + // state events but no recent events. + prevPDUPos = toPos - 1 + } else { + prevPDUPos = recentStreamEvents[0].streamPosition - 1 + } + + if prevPDUPos <= 0 { + prevPDUPos = 1 } switch delta.membership { - case "join": + case membershipJoin: jr := types.NewJoinResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr - case "leave": + case membershipLeave: fallthrough // transitions to leave are the same as ban - case "ban": + case membershipBan: // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -716,10 +732,14 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( return events, nil } +// getStateDeltas returns the state deltas between fromPos and toPos, +// exclusive of oldPos, inclusive of newPos, for the rooms in which +// the user has new membership events. +// A list of joined room IDs is also returned in case the caller needs it. func (d *SyncServerDatasource) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, fromPos, toPos int64, userID string, -) ([]stateDelta, error) { +) ([]stateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -733,11 +753,11 @@ func (d *SyncServerDatasource) getStateDeltas( // get all the state events ever between these two positions stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) if err != nil { - return nil, err + return nil, nil, err } state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) if err != nil { - return nil, err + return nil, nil, err } for roomID, stateStreamEvents := range state { @@ -748,16 +768,12 @@ func (d *SyncServerDatasource) getStateDeltas( // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the timeline. if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { - if membership == "join" { + if membership == membershipJoin { // send full room state down instead of a delta - var allState []gomatrixserverlib.Event - allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + var s []streamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID) if err != nil { - return nil, err - } - s := make([]streamEvent, len(allState)) - for i := 0; i < len(s); i++ { - s[i] = streamEvent{Event: allState[i], streamPosition: 0} + return nil, nil, err } state[roomID] = s continue // we'll add this room in when we do joined rooms @@ -775,19 +791,92 @@ func (d *SyncServerDatasource) getStateDeltas( } // Add in currently joined rooms - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { - return nil, err + return nil, nil, err } for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, stateDelta{ - membership: "join", + membership: membershipJoin, stateEvents: streamEventsToEvents(device, state[joinedRoomID]), roomID: joinedRoomID, }) } - return deltas, nil + return deltas, joinedRoomIDs, nil +} + +// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// requests with full_state=true. +// Fetches full state for all joined rooms and uses selectStateInRange to get +// updates for other rooms. +func (d *SyncServerDatasource) getStateDeltasForFullStateSync( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos int64, userID string, +) ([]stateDelta, []string, error) { + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + if err != nil { + return nil, nil, err + } + + // Use a reasonable initial capacity + deltas := make([]stateDelta, 0, len(joinedRoomIDs)) + + // Add full states for all joined rooms + for _, joinedRoomID := range joinedRoomIDs { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID) + if stateErr != nil { + return nil, nil, stateErr + } + deltas = append(deltas, stateDelta{ + membership: "join", + stateEvents: streamEventsToEvents(device, s), + roomID: joinedRoomID, + }) + } + + // Get all the state events ever between these two positions + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) + if err != nil { + return nil, nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership != "join" { // We've already added full state for all joined rooms above. + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + } + + break + } + } + } + + return deltas, joinedRoomIDs, nil +} + +func (d *SyncServerDatasource) currentStateStreamEventsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) ([]streamEvent, error) { + allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{Event: allState[i], streamPosition: 0} + } + return s, nil } // streamEventsToEvents converts streamEvent to Event. If device is non-nil and diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 30ac3a2e5..14bc2efb6 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -185,6 +185,7 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) { // fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, // a stream will be made for this user if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. +// NB: Callers should have locked the mutex before calling this function. func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { stream, ok := n.userStreams[userID] if !ok && makeIfNotExists { diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index 904315e9f..808e07cc7 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -143,7 +143,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { wg.Done() }() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 1) n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) @@ -171,7 +171,7 @@ func TestNewInviteEventForUser(t *testing.T) { wg.Done() }() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 1) n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter) @@ -199,7 +199,7 @@ func TestEDUWakeup(t *testing.T) { wg.Done() }() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 1) n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU) @@ -230,7 +230,7 @@ func TestMultipleRequestWakeup(t *testing.T) { go poll() go poll() - stream := n.fetchUserStream(bob, true) + stream := lockedFetchUserStream(n, bob) waitForBlocking(stream, 3) n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) @@ -266,14 +266,14 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { } leaveWG.Done() }() - bobStream := n.fetchUserStream(bob, true) + bobStream := lockedFetchUserStream(n, bob) waitForBlocking(bobStream, 1) n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter) leaveWG.Wait() // send an event into the room. Make sure alice gets it. Bob should not. var aliceWG sync.WaitGroup - aliceStream := n.fetchUserStream(alice, true) + aliceStream := lockedFetchUserStream(n, alice) aliceWG.Add(1) go func() { pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter)) @@ -328,6 +328,15 @@ func waitForBlocking(s *UserStream, numBlocking uint) { } } +// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock. +// A new stream is made if it doesn't exist already. +func lockedFetchUserStream(n *Notifier, userID string) *UserStream { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + return n.fetchUserStream(userID, true) +} + func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest { return syncRequest{ device: authtypes.Device{UserID: userID}, diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a6ec6bd92..d773a4606 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -65,8 +65,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos := rp.notifier.CurrentPosition() - // If this is an initial sync or timeout=0 we return immediately - if syncReq.since == nil || syncReq.timeout == 0 { + if shouldReturnImmediately(syncReq) { syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) @@ -135,7 +134,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState) } if err != nil { @@ -216,3 +215,10 @@ func (rp *RequestPool) appendAccountData( return data, nil } + +// shouldReturnImmediately returns whether the /sync request is an initial sync, +// or timeout=0, or full_state=true, in any of the cases the request should +// return immediately. +func shouldReturnImmediately(syncReq *syncRequest) bool { + return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState +} diff --git a/testfile b/testfile index 3251396dc..f5b748871 100644 --- a/testfile +++ b/testfile @@ -151,3 +151,13 @@ Can add tag Can remove tag Inbound federation of state requires event_id as a mandatory paramater Inbound federation of state_ids requires event_id as a mandatory paramater +POST /register returns the same device_id as that in the request +POST /login returns the same device_id as that in the request +POST /createRoom with creation content +User can create and send/receive messages in a room with version 1 +POST /createRoom ignores attempts to set the room version via creation_content +Inbound federation rejects remote attempts to join local users to rooms +Inbound federation rejects remote attempts to kick local users to rooms +An event which redacts itself should be ignored +A pair of events which redact each other should be ignored +Full state sync includes joined rooms