Merge branch 'master' into add-register-fallback

This commit is contained in:
Parminder Singh 2019-08-01 01:32:50 +05:30
commit 9df4ef3683
18 changed files with 210 additions and 136 deletions

View file

@ -1,49 +0,0 @@
steps:
- command:
# https://github.com/golangci/golangci-lint#memory-usage-of-golangci-lint
- "GOGC=20 ./scripts/find-lint.sh"
label: "\U0001F9F9 Lint / :go: 1.12"
agents:
# Use a larger instance as linting takes a looot of memory
queue: "medium"
plugins:
- docker#v3.0.1:
image: "golang:1.12"
- wait
- command:
- "go build ./cmd/..."
label: "\U0001F528 Build / :go: 1.11"
plugins:
- docker#v3.0.1:
image: "golang:1.11"
retry:
automatic:
- exit_status: 128
limit: 3
- command:
- "go build ./cmd/..."
label: "\U0001F528 Build / :go: 1.12"
plugins:
- docker#v3.0.1:
image: "golang:1.12"
retry:
automatic:
- exit_status: 128
limit: 3
- command:
- "go test ./..."
label: "\U0001F9EA Unit tests / :go: 1.11"
plugins:
- docker#v3.0.1:
image: "golang:1.11"
- command:
- "go test ./..."
label: "\U0001F9EA Unit tests / :go: 1.12"
plugins:
- docker#v3.0.1:
image: "golang:1.12"

View file

@ -17,6 +17,7 @@ package accounts
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/gomatrixserverlib"
)
@ -71,25 +72,44 @@ func (s *filterStatements) prepare(db *sql.DB) (err error) {
func (s *filterStatements) selectFilter(
ctx context.Context, localpart string, filterID string,
) (filter []byte, err error) {
err = s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filter)
return
) (*gomatrixserverlib.Filter, error) {
// Retrieve filter from database (stored as canonical JSON)
var filterData []byte
err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData)
if err != nil {
return nil, err
}
// Unmarshal JSON into Filter struct
var filter gomatrixserverlib.Filter
if err = json.Unmarshal(filterData, &filter); err != nil {
return nil, err
}
return &filter, nil
}
func (s *filterStatements) insertFilter(
ctx context.Context, filter []byte, localpart string,
ctx context.Context, filter *gomatrixserverlib.Filter, localpart string,
) (filterID string, err error) {
var existingFilterID string
// This can result in a race condition when two clients try to insert the
// same filter and localpart at the same time, however this is not a
// problem as both calls will result in the same filterID
filterJSON, err := gomatrixserverlib.CanonicalJSON(filter)
// Serialise json
filterJSON, err := json.Marshal(filter)
if err != nil {
return "", err
}
// Remove whitespaces and sort JSON data
// needed to prevent from inserting the same filter multiple times
filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON)
if err != nil {
return "", err
}
// Check if filter already exists in the database
// Check if filter already exists in the database using its localpart and content
//
// This can result in a race condition when two clients try to insert the
// same filter and localpart at the same time, however this is not a
// problem as both calls will result in the same filterID
err = s.selectFilterIDByContentStmt.QueryRowContext(ctx,
localpart, filterJSON).Scan(&existingFilterID)
if err != nil && err != sql.ErrNoRows {

View file

@ -344,11 +344,11 @@ func (d *Database) GetThreePIDsForLocalpart(
}
// GetFilter looks up the filter associated with a given local user and filter ID.
// Returns a filter represented as a byte slice. Otherwise returns an error if
// no such filter exists or if there was an error talking to the database.
// Returns a filter structure. Otherwise returns an error if no such filter exists
// or if there was an error talking to the database.
func (d *Database) GetFilter(
ctx context.Context, localpart string, filterID string,
) ([]byte, error) {
) (*gomatrixserverlib.Filter, error) {
return d.filter.selectFilter(ctx, localpart, filterID)
}
@ -356,7 +356,7 @@ func (d *Database) GetFilter(
// Returns the filterID as a string. Otherwise returns an error if something
// goes wrong.
func (d *Database) PutFilter(
ctx context.Context, localpart string, filter []byte,
ctx context.Context, localpart string, filter *gomatrixserverlib.Filter,
) (string, error) {
return d.filter.insertFilter(ctx, filter, localpart)
}

View file

@ -169,6 +169,8 @@ func (s *devicesStatements) selectDeviceByToken(
return &dev, err
}
// selectDeviceByID retrieves a device from the database with the given user
// localpart and deviceID
func (s *devicesStatements) selectDeviceByID(
ctx context.Context, localpart, deviceID string,
) (*authtypes.Device, error) {

View file

@ -84,7 +84,7 @@ func (d *Database) CreateDevice(
if deviceID != nil {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
var err error
// Revoke existing token for this device
// Revoke existing tokens for this device
if err = d.devices.deleteDevice(ctx, txn, *deviceID, localpart); err != nil {
return err
}

View file

@ -15,6 +15,7 @@
package routing
import (
"encoding/json"
"fmt"
"net/http"
"strings"
@ -97,6 +98,27 @@ func (r createRoomRequest) Validate() *util.JSONResponse {
}
}
// Validate creation_content fields defined in the spec by marshalling the
// creation_content map into bytes and then unmarshalling the bytes into
// common.CreateContent.
creationContentBytes, err := json.Marshal(r.CreationContent)
if err != nil {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("malformed creation_content"),
}
}
var CreationContent common.CreateContent
err = json.Unmarshal(creationContentBytes, &CreationContent)
if err != nil {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("malformed creation_content"),
}
}
return nil
}
@ -154,7 +176,17 @@ func createRoom(
JSON: jsonerror.InvalidArgumentValue(err.Error()),
}
}
// TODO: visibility/presets/raw initial state/creation content
// Clobber keys: creator, room_version
if r.CreationContent == nil {
r.CreationContent = make(map[string]interface{}, 2)
}
r.CreationContent["creator"] = userID
r.CreationContent["room_version"] = "1" // TODO: We set this to 1 before we support Room versioning
// TODO: visibility/presets/raw initial state
// TODO: Create room alias association
// Make sure this doesn't fall into an application service's namespace though!
@ -214,7 +246,7 @@ func createRoom(
// harder to reason about, hence sticking to a strict static ordering.
// TODO: Synapse has txn/token ID on each event. Do we need to do this here?
eventsToMake := []fledglingEvent{
{"m.room.create", "", common.CreateContent{Creator: userID}},
{"m.room.create", "", r.CreationContent},
{"m.room.member", userID, membershipContent},
{"m.room.power_levels", "", common.InitialPowerLevelsContent(userID)},
// TODO: m.room.canonical_alias

View file

@ -17,13 +17,10 @@ package routing
import (
"net/http"
"encoding/json"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -43,7 +40,7 @@ func GetFilter(
return httputil.LogThenError(req, err)
}
res, err := accountDB.GetFilter(req.Context(), localpart, filterID)
filter, err := accountDB.GetFilter(req.Context(), localpart, filterID)
if err != nil {
//TODO better error handling. This error message is *probably* right,
// but if there are obscure db errors, this will also be returned,
@ -53,11 +50,6 @@ func GetFilter(
JSON: jsonerror.NotFound("No such filter"),
}
}
filter := gomatrix.Filter{}
err = json.Unmarshal(res, &filter)
if err != nil {
return httputil.LogThenError(req, err)
}
return util.JSONResponse{
Code: http.StatusOK,
@ -85,21 +77,21 @@ func PutFilter(
return httputil.LogThenError(req, err)
}
var filter gomatrix.Filter
var filter gomatrixserverlib.Filter
if reqErr := httputil.UnmarshalJSONRequest(req, &filter); reqErr != nil {
return *reqErr
}
filterArray, err := json.Marshal(filter)
if err != nil {
// Validate generates a user-friendly error
if err = filter.Validate(); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("Filter is malformed"),
JSON: jsonerror.BadJSON("Invalid filter: " + err.Error()),
}
}
filterID, err := accountDB.PutFilter(req.Context(), localpart, filterArray)
filterID, err := accountDB.PutFilter(req.Context(), localpart, &filter)
if err != nil {
return httputil.LogThenError(req, err)
}

View file

@ -18,7 +18,6 @@ import (
"net/http"
"context"
"database/sql"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -44,8 +43,10 @@ type flow struct {
type passwordRequest struct {
User string `json:"user"`
Password string `json:"password"`
// Both DeviceID and InitialDisplayName can be omitted, or empty strings ("")
// Thus a pointer is needed to differentiate between the two
InitialDisplayName *string `json:"initial_device_display_name"`
DeviceID string `json:"device_id"`
DeviceID *string `json:"device_id"`
}
type loginResponse struct {
@ -110,7 +111,7 @@ func Login(
return httputil.LogThenError(req, err)
}
dev, err := getDevice(req.Context(), r, deviceDB, acc, localpart, token)
dev, err := getDevice(req.Context(), r, deviceDB, acc, token)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
@ -134,20 +135,16 @@ func Login(
}
}
// check if device exists else create one
// getDevice returns a new or existing device
func getDevice(
ctx context.Context,
r passwordRequest,
deviceDB *devices.Database,
acc *authtypes.Account,
localpart, token string,
token string,
) (dev *authtypes.Device, err error) {
dev, err = deviceDB.GetDeviceByID(ctx, localpart, r.DeviceID)
if err == sql.ErrNoRows {
// device doesn't exist, create one
dev, err = deviceDB.CreateDevice(
ctx, acc.Localpart, nil, token, r.InitialDisplayName,
ctx, acc.Localpart, r.DeviceID, token, r.InitialDisplayName,
)
}
return
}

View file

@ -126,7 +126,10 @@ type registerRequest struct {
// user-interactive auth params
Auth authDict `json:"auth"`
// Both DeviceID and InitialDisplayName can be omitted, or empty strings ("")
// Thus a pointer is needed to differentiate between the two
InitialDisplayName *string `json:"initial_device_display_name"`
DeviceID *string `json:"device_id"`
// Prevent this user from logging in
InhibitLogin common.WeakBoolean `json:"inhibit_login"`
@ -631,7 +634,7 @@ func handleApplicationServiceRegistration(
// application service registration is entirely separate.
return completeRegistration(
req.Context(), accountDB, deviceDB, r.Username, "", appserviceID,
r.InhibitLogin, r.InitialDisplayName,
r.InhibitLogin, r.InitialDisplayName, r.DeviceID,
)
}
@ -651,7 +654,7 @@ func checkAndCompleteFlow(
// This flow was completed, registration can continue
return completeRegistration(
req.Context(), accountDB, deviceDB, r.Username, r.Password, "",
r.InhibitLogin, r.InitialDisplayName,
r.InhibitLogin, r.InitialDisplayName, r.DeviceID,
)
}
@ -702,10 +705,10 @@ func LegacyRegister(
return util.MessageResponse(http.StatusForbidden, "HMAC incorrect")
}
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil)
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil, nil)
case authtypes.LoginTypeDummy:
// there is nothing to do
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil)
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil, nil)
default:
return util.JSONResponse{
Code: http.StatusNotImplemented,
@ -743,13 +746,19 @@ func parseAndValidateLegacyLogin(req *http.Request, r *legacyRegisterRequest) *u
return nil
}
// completeRegistration runs some rudimentary checks against the submitted
// input, then if successful creates an account and a newly associated device
// We pass in each individual part of the request here instead of just passing a
// registerRequest, as this function serves requests encoded as both
// registerRequests and legacyRegisterRequests, which share some attributes but
// not all
func completeRegistration(
ctx context.Context,
accountDB *accounts.Database,
deviceDB *devices.Database,
username, password, appserviceID string,
inhibitLogin common.WeakBoolean,
displayName *string,
displayName, deviceID *string,
) util.JSONResponse {
if username == "" {
return util.JSONResponse{
@ -778,6 +787,9 @@ func completeRegistration(
}
}
// Increment prometheus counter for created users
amtRegUsers.Inc()
// Check whether inhibit_login option is set. If so, don't create an access
// token or a device for this user
if inhibitLogin {
@ -798,8 +810,7 @@ func completeRegistration(
}
}
// TODO: Use the device ID in the request.
dev, err := deviceDB.CreateDevice(ctx, username, nil, token, displayName)
dev, err := deviceDB.CreateDevice(ctx, username, deviceID, token, displayName)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
@ -807,9 +818,6 @@ func completeRegistration(
}
}
// Increment prometheus counter for created users
amtRegUsers.Inc()
return util.JSONResponse{
Code: http.StatusOK,
JSON: registerResponse{

View file

@ -18,6 +18,14 @@ package common
type CreateContent struct {
Creator string `json:"creator"`
Federate *bool `json:"m.federate,omitempty"`
RoomVersion string `json:"room_version,omitempty"`
Predecessor PreviousRoom `json:"predecessor,omitempty"`
}
// PreviousRoom is the "Previous Room" structure defined at https://matrix.org/docs/spec/client_server/r0.5.0#m-room-create
type PreviousRoom struct {
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
}
// MemberContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-member

View file

@ -15,9 +15,12 @@
package common
import (
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dugong"
@ -54,15 +57,35 @@ func (h *logLevelHook) Levels() []logrus.Level {
return levels
}
// callerPrettyfier is a function that given a runtime.Frame object, will
// extract the calling function's name and file, and return them in a nicely
// formatted way
func callerPrettyfier(f *runtime.Frame) (string, string) {
// Retrieve just the function name
s := strings.Split(f.Function, ".")
funcname := s[len(s)-1]
// Append a newline + tab to it to move the actual log content to its own line
funcname += "\n\t"
// Surround the filepath in brackets and append line number so IDEs can quickly
// navigate
filename := fmt.Sprintf(" [%s:%d]", f.File, f.Line)
return funcname, filename
}
// SetupStdLogging configures the logging format to standard output. Typically, it is called when the config is not yet loaded.
func SetupStdLogging() {
logrus.SetReportCaller(true)
logrus.SetFormatter(&utcFormatter{
&logrus.TextFormatter{
TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
FullTimestamp: true,
DisableColors: false,
DisableTimestamp: false,
DisableSorting: false,
QuoteEmptyFields: true,
CallerPrettyfier: callerPrettyfier,
},
})
}
@ -71,8 +94,8 @@ func SetupStdLogging() {
// If something fails here it means that the logging was improperly configured,
// so we just exit with the error
func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
logrus.SetReportCaller(true)
for _, hook := range hooks {
// Check we received a proper logging level
level, err := logrus.ParseLevel(hook.Level)
if err != nil {
@ -126,6 +149,7 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str
DisableColors: true,
DisableTimestamp: false,
DisableSorting: false,
QuoteEmptyFields: true,
},
},
&dugong.DailyRotationSchedule{GZip: true},

10
go.mod
View file

@ -20,10 +20,11 @@ require (
github.com/jaegertracing/jaeger-client-go v0.0.0-20170921145708-3ad49a1d839b
github.com/jaegertracing/jaeger-lib v0.0.0-20170920222118-21a3da6d66fe
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/lib/pq v0.0.0-20170918175043-23da1db4f16d
github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2
github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6
github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5
github.com/matttproud/golang_protobuf_extensions v1.0.1
@ -40,8 +41,9 @@ require (
github.com/prometheus/common v0.0.0-20170108231212-dd2f054febf4
github.com/prometheus/procfs v0.0.0-20170128160123-1878d9fbb537
github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.2.2
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.1.5
github.com/tidwall/match v1.0.1
github.com/tidwall/sjson v1.0.3
@ -54,7 +56,7 @@ require (
go.uber.org/zap v1.7.1
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7
gopkg.in/Shopify/sarama.v1 v1.11.0
gopkg.in/airbrake/gobrake.v2 v2.0.9
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20170727041045-23bcc3c4eae3

11
go.sum
View file

@ -36,6 +36,7 @@ github.com/jaegertracing/jaeger-lib v0.0.0-20170920222118-21a3da6d66fe/go.mod h1
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 h1:KAZ1BW2TCmT6PRihDPpocIy1QTtsAsrx6TneU/4+CMg=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@ -53,6 +54,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20181109104322-1c2cbc0872f0 h1:3U
github.com/matrix-org/gomatrixserverlib v0.0.0-20181109104322-1c2cbc0872f0/go.mod h1:YHyhIQUmuXyKtoVfDUMk/DyU93Taamlu6nPZkij/JtA=
github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2 h1:pYajAEdi3sowj4iSunqctchhcMNW3rDjeeH0T4uDkMY=
github.com/matrix-org/gomatrixserverlib v0.0.0-20190619132215-178ed5e3b8e2/go.mod h1:sf0RcKOdiwJeTti7A313xsaejNUGYDq02MQZ4JD4w/E=
github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6 h1:B8n1H5Wb1B5jwLzTylBpY0kJCMRqrofT7PmOw4aJFJA=
github.com/matrix-org/gomatrixserverlib v0.0.0-20190724145009-a6df10ef35d6/go.mod h1:sf0RcKOdiwJeTti7A313xsaejNUGYDq02MQZ4JD4w/E=
github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0 h1:p7WTwG+aXM86+yVrYAiCMW3ZHSmotVvuRbjtt3jC+4A=
github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171013132526-8b1c8ab81986 h1:TiWl4hLvezAhRPM8tPcPDFTysZ7k4T/1J4GPp/iqlZo=
@ -90,9 +93,14 @@ github.com/sirupsen/logrus v0.0.0-20170822132746-89742aefa4b2 h1:+8J/sCAVv2Y9Ct1
github.com/sirupsen/logrus v0.0.0-20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v0.0.0-20170809224252-890a5c3458b4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tidwall/gjson v1.0.2 h1:5BsM7kyEAHAUGEGDkEKO9Mdyiuw6QQ6TSDdarP0Nnmk=
github.com/tidwall/gjson v1.0.2/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
github.com/tidwall/gjson v1.1.5 h1:QysILxBeUEY3GTLA0fQVgkQG1zme8NxGvhh2SSqWNwI=
@ -128,6 +136,9 @@ golang.org/x/sys v0.0.0-20171012164349-43eea11bc926 h1:PY6OU86NqbyZiOzaPnDw6oOjA
golang.org/x/sys v0.0.0-20171012164349-43eea11bc926/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/Shopify/sarama.v1 v1.11.0 h1:/3kaCyeYaPbr59IBjeqhIcUOB1vXlIVqXAYa5g5C5F0=
gopkg.in/Shopify/sarama.v1 v1.11.0/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=

View file

@ -23,7 +23,7 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx
-- For deleting old invites
CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
ON syncapi_invite_events(target_user_id, id);
ON syncapi_invite_events (event_id);
`
const insertInviteEventSQL = "" +

View file

@ -35,6 +35,12 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
const (
membershipJoin = "join"
membershipLeave = "leave"
membershipBan = "ban"
)
type stateDelta struct {
roomID string
stateEvents []gomatrixserverlib.Event
@ -248,14 +254,12 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
// This works out what the 'state' key should be for each room as well as which membership block
// to put the room into.
deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
deltas, joinedRoomIDs, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
if err != nil {
return nil, err
}
joinedRoomIDs := make([]string, 0, len(deltas))
for _, delta := range deltas {
joinedRoomIDs = append(joinedRoomIDs, delta.roomID)
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
if err != nil {
return nil, err
@ -344,7 +348,7 @@ func (d *SyncServerDatasource) IncrementalSync(
)
} else {
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
ctx, nil, device.UserID, "join",
ctx, nil, device.UserID, membershipJoin,
)
}
if err != nil {
@ -393,7 +397,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
res = types.NewResponse(toPos)
// Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin)
if err != nil {
return
}
@ -571,7 +575,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
res *types.Response,
) error {
endPos := toPos
if delta.membershipPos > 0 && delta.membership == "leave" {
if delta.membershipPos > 0 && delta.membership == membershipLeave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
@ -595,7 +599,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
}
switch delta.membership {
case "join":
case membershipJoin:
jr := types.NewJoinResponse()
if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
// Use the short form of batch token for prev_batch
@ -608,9 +612,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case "leave":
case membershipLeave:
fallthrough // transitions to leave are the same as ban
case "ban":
case membershipBan:
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
@ -716,10 +720,14 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
return events, nil
}
// getStateDeltas returns the state deltas between fromPos and toPos,
// exclusive of oldPos, inclusive of newPos, for the rooms in which
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
func (d *SyncServerDatasource) getStateDeltas(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
fromPos, toPos int64, userID string,
) ([]stateDelta, error) {
) ([]stateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
// - For each room which has membership list changes:
@ -733,11 +741,11 @@ func (d *SyncServerDatasource) getStateDeltas(
// get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
if err != nil {
return nil, err
return nil, nil, err
}
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
if err != nil {
return nil, err
return nil, nil, err
}
for roomID, stateStreamEvents := range state {
@ -748,12 +756,12 @@ func (d *SyncServerDatasource) getStateDeltas(
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline.
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership == "join" {
if membership == membershipJoin {
// send full room state down instead of a delta
var allState []gomatrixserverlib.Event
allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
return nil, err
return nil, nil, err
}
s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ {
@ -775,19 +783,19 @@ func (d *SyncServerDatasource) getStateDeltas(
}
// Add in currently joined rooms
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin)
if err != nil {
return nil, err
return nil, nil, err
}
for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, stateDelta{
membership: "join",
membership: membershipJoin,
stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
roomID: joinedRoomID,
})
}
return deltas, nil
return deltas, joinedRoomIDs, nil
}
// streamEventsToEvents converts streamEvent to Event. If device is non-nil and

View file

@ -185,6 +185,7 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) {
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
// a stream will be made for this user if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream.
// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
stream, ok := n.userStreams[userID]
if !ok && makeIfNotExists {

View file

@ -143,7 +143,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
wg.Done()
}()
stream := n.fetchUserStream(bob, true)
stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 1)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
@ -171,7 +171,7 @@ func TestNewInviteEventForUser(t *testing.T) {
wg.Done()
}()
stream := n.fetchUserStream(bob, true)
stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
@ -199,7 +199,7 @@ func TestEDUWakeup(t *testing.T) {
wg.Done()
}()
stream := n.fetchUserStream(bob, true)
stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
@ -230,7 +230,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
go poll()
go poll()
stream := n.fetchUserStream(bob, true)
stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 3)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
@ -266,14 +266,14 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
}
leaveWG.Done()
}()
bobStream := n.fetchUserStream(bob, true)
bobStream := lockedFetchUserStream(n, bob)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
var aliceWG sync.WaitGroup
aliceStream := n.fetchUserStream(alice, true)
aliceStream := lockedFetchUserStream(n, alice)
aliceWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
@ -328,6 +328,15 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
}
}
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
// A new stream is made if it doesn't exist already.
func lockedFetchUserStream(n *Notifier, userID string) *UserStream {
n.streamLock.Lock()
defer n.streamLock.Unlock()
return n.fetchUserStream(userID, true)
}
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
return syncRequest{
device: authtypes.Device{UserID: userID},

View file

@ -149,3 +149,12 @@ Typing events appear in incremental sync
Typing events appear in gapped sync
Inbound federation of state requires event_id as a mandatory paramater
Inbound federation of state_ids requires event_id as a mandatory paramater
POST /register returns the same device_id as that in the request
POST /login returns the same device_id as that in the request
POST /createRoom with creation content
User can create and send/receive messages in a room with version 1
POST /createRoom ignores attempts to set the room version via creation_content
Inbound federation rejects remote attempts to join local users to rooms
Inbound federation rejects remote attempts to kick local users to rooms
An event which redacts itself should be ignored
A pair of events which redact each other should be ignored