diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 30babd8eb..d04707a61 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -56,6 +56,7 @@ kafka: topics: input_room_event: roomserverInput output_room_event: roomserverOutput + output_client_data: clientapiOutput user_updates: userUpdates # The postgres connection configs for connecting to the databases e.g a postgres:// URI diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/authtypes/membership.go b/src/github.com/matrix-org/dendrite/clientapi/auth/authtypes/membership.go new file mode 100644 index 000000000..ad5312db6 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/authtypes/membership.go @@ -0,0 +1,23 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package authtypes + +// Membership represents the relationship between a user and a room they're a +// member of +type Membership struct { + Localpart string + RoomID string + EventID string +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go new file mode 100644 index 000000000..0c1fc0ff9 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/account_data_table.go @@ -0,0 +1,144 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package accounts + +import ( + "database/sql" + + "github.com/matrix-org/gomatrixserverlib" +) + +const accountDataSchema = ` +-- Stores data about accounts data. +CREATE TABLE IF NOT EXISTS account_data ( + -- The Matrix user ID localpart for this account + localpart TEXT NOT NULL, + -- The room ID for this data (empty string if not specific to a room) + room_id TEXT, + -- The account data type + type TEXT NOT NULL, + -- The account data content + content TEXT NOT NULL, + + PRIMARY KEY(localpart, room_id, type) +); +` + +const insertAccountDataSQL = ` + INSERT INTO account_data(localpart, room_id, type, content) VALUES($1, $2, $3, $4) + ON CONFLICT (localpart, room_id, type) DO UPDATE SET content = EXCLUDED.content +` + +const selectAccountDataSQL = "" + + "SELECT room_id, type, content FROM account_data WHERE localpart = $1" + +const selectAccountDataByTypeSQL = "" + + "SELECT content FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3" + +const deleteAccountDataSQL = "" + + "DELETE FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3" + +type accountDataStatements struct { + insertAccountDataStmt *sql.Stmt + selectAccountDataStmt *sql.Stmt + selectAccountDataByTypeStmt *sql.Stmt +} + +func (s *accountDataStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(accountDataSchema) + if err != nil { + return + } + if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { + return + } + if s.selectAccountDataStmt, err = db.Prepare(selectAccountDataSQL); err != nil { + return + } + if s.selectAccountDataByTypeStmt, err = db.Prepare(selectAccountDataByTypeSQL); err != nil { + return + } + return +} + +func (s *accountDataStatements) insertAccountData(localpart string, roomID string, dataType string, content string) (err error) { + _, err = s.insertAccountDataStmt.Exec(localpart, roomID, dataType, content) + return +} + +func (s *accountDataStatements) selectAccountData(localpart string) ( + global []gomatrixserverlib.ClientEvent, + rooms map[string][]gomatrixserverlib.ClientEvent, + err error, +) { + rows, err := s.selectAccountDataStmt.Query(localpart) + if err != nil { + return + } + + global = []gomatrixserverlib.ClientEvent{} + rooms = make(map[string][]gomatrixserverlib.ClientEvent) + + for rows.Next() { + var roomID string + var dataType string + var content []byte + + if err = rows.Scan(&roomID, &dataType, &content); err != nil && err != sql.ErrNoRows { + return + } + + ac := gomatrixserverlib.ClientEvent{ + Type: dataType, + Content: content, + } + + if len(roomID) > 0 { + rooms[roomID] = append(rooms[roomID], ac) + } else { + global = append(global, ac) + } + } + + return +} + +func (s *accountDataStatements) selectAccountDataByType( + localpart string, roomID string, dataType string, +) (data []gomatrixserverlib.ClientEvent, err error) { + data = []gomatrixserverlib.ClientEvent{} + + rows, err := s.selectAccountDataByTypeStmt.Query(localpart, roomID, dataType) + if err != nil { + return + } + + for rows.Next() { + var content []byte + + if err = rows.Scan(&content); err != nil { + return + } + + ac := gomatrixserverlib.ClientEvent{ + Type: dataType, + Content: content, + } + + data = append(data, ac) + } + + return +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go index 8eca4a574..70affdc33 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go @@ -18,6 +18,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" ) const membershipSchema = ` @@ -38,23 +39,29 @@ CREATE TABLE IF NOT EXISTS memberships ( CREATE UNIQUE INDEX IF NOT EXISTS membership_event_id ON memberships(event_id); ` -const insertMembershipSQL = "" + - "INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)" +const insertMembershipSQL = ` + INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3) + ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id +` const selectMembershipSQL = "" + "SELECT * from memberships WHERE localpart = $1 AND room_id = $2" const selectMembershipsByLocalpartSQL = "" + - "SELECT room_id FROM memberships WHERE localpart = $1" + "SELECT room_id, event_id FROM memberships WHERE localpart = $1" const deleteMembershipsByEventIDsSQL = "" + "DELETE FROM memberships WHERE event_id = ANY($1)" +const updateMembershipByEventIDSQL = "" + + "UPDATE memberships SET event_id = $2 WHERE event_id = $1" + type membershipStatements struct { deleteMembershipsByEventIDsStmt *sql.Stmt insertMembershipStmt *sql.Stmt selectMembershipByEventIDStmt *sql.Stmt selectMembershipsByLocalpartStmt *sql.Stmt + updateMembershipByEventIDStmt *sql.Stmt } func (s *membershipStatements) prepare(db *sql.DB) (err error) { @@ -71,6 +78,9 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) { if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil { return } + if s.updateMembershipByEventIDStmt, err = db.Prepare(updateMembershipByEventIDSQL); err != nil { + return + } return } @@ -83,3 +93,29 @@ func (s *membershipStatements) deleteMembershipsByEventIDs(eventIDs []string, tx _, err = txn.Stmt(s.deleteMembershipsByEventIDsStmt).Exec(pq.StringArray(eventIDs)) return } + +func (s *membershipStatements) selectMembershipsByLocalpart(localpart string) (memberships []authtypes.Membership, err error) { + rows, err := s.selectMembershipsByLocalpartStmt.Query(localpart) + if err != nil { + return + } + + memberships = []authtypes.Membership{} + + defer rows.Close() + for rows.Next() { + var m authtypes.Membership + m.Localpart = localpart + if err := rows.Scan(&m.RoomID, &m.EventID); err != nil { + return nil, err + } + memberships = append(memberships, m) + } + + return +} + +func (s *membershipStatements) updateMembershipByEventID(oldEventID string, newEventID string) (err error) { + _, err = s.updateMembershipByEventIDStmt.Exec(oldEventID, newEventID) + return +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go index 65d87d5a7..ca9deac09 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go @@ -27,12 +27,13 @@ import ( // Database represents an account database type Database struct { - db *sql.DB - partitions common.PartitionOffsetStatements - accounts accountsStatements - profiles profilesStatements - memberships membershipStatements - serverName gomatrixserverlib.ServerName + db *sql.DB + partitions common.PartitionOffsetStatements + accounts accountsStatements + profiles profilesStatements + memberships membershipStatements + accountDatas accountDataStatements + serverName gomatrixserverlib.ServerName } // NewDatabase creates a new accounts and profiles database @@ -58,7 +59,11 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName) if err = m.prepare(db); err != nil { return nil, err } - return &Database{db, partitions, a, p, m, serverName}, nil + ac := accountDataStatements{} + if err = ac.prepare(db); err != nil { + return nil, err + } + return &Database{db, partitions, a, p, m, ac, serverName}, nil } // GetAccountByPassword returns the account associated with the given localpart and password. @@ -151,6 +156,21 @@ func (d *Database) UpdateMemberships(eventsToAdd []gomatrixserverlib.Event, idsT }) } +// GetMembershipsByLocalpart returns an array containing the IDs of all the rooms +// a user matching a given localpart is a member of +// If no membership match the given localpart, returns an empty array +// If there was an issue during the retrieval, returns the SQL error +func (d *Database) GetMembershipsByLocalpart(localpart string) (memberships []authtypes.Membership, err error) { + return d.memberships.selectMembershipsByLocalpart(localpart) +} + +// UpdateMembership update the "join" membership event ID of a membership. +// This is useful in case of membership upgrade (e.g. profile update) +// If there was an issue during the update, returns the SQL error +func (d *Database) UpdateMembership(oldEventID string, newEventID string) error { + return d.memberships.updateMembershipByEventID(oldEventID, newEventID) +} + // newMembership will save a new membership in the database if the given state // event is a "join" membership event // If the event isn't a "join" membership event, does nothing @@ -184,6 +204,34 @@ func (d *Database) newMembership(ev gomatrixserverlib.Event, txn *sql.Tx) error return nil } +// SaveAccountData saves new account data for a given user and a given room. +// If the account data is not specific to a room, the room ID should be an empty string +// If an account data already exists for a given set (user, room, data type), it will +// update the corresponding row with the new content +// Returns a SQL error if there was an issue with the insertion/update +func (d *Database) SaveAccountData(localpart string, roomID string, dataType string, content string) error { + return d.accountDatas.insertAccountData(localpart, roomID, dataType, content) +} + +// GetAccountData returns account data related to a given localpart +// If no account data could be found, returns an empty arrays +// Returns an error if there was an issue with the retrieval +func (d *Database) GetAccountData(localpart string) ( + global []gomatrixserverlib.ClientEvent, + rooms map[string][]gomatrixserverlib.ClientEvent, + err error, +) { + return d.accountDatas.selectAccountData(localpart) +} + +// GetAccountDataByType returns account data matching a given +// localpart, room ID and type. +// If no account data could be found, returns an empty array +// Returns an error if there was an issue with the retrieval +func (d *Database) GetAccountDataByType(localpart string, roomID string, dataType string) (data []gomatrixserverlib.ClientEvent, err error) { + return d.accountDatas.selectAccountDataByType(localpart, roomID, dataType) +} + func hashPassword(plaintext string) (hash string, err error) { hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost) return string(hashBytes), err diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go index 98dcd5b65..1e2320787 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -107,6 +107,11 @@ func (s *OutputRoomEvent) lookupStateEvents( ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. if len(addsStateEventIDs) == 0 { + // If the event is a membership update (e.g. for a profile update), it won't + // show up in AddsStateEventIDs, so we need to add it manually + if event.Type() == "m.room.member" { + return []gomatrixserverlib.Event{event}, nil + } return nil, nil } diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go new file mode 100644 index 000000000..2597089e3 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go @@ -0,0 +1,65 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/common" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// SyncAPIProducer produces events for the sync API server to consume +type SyncAPIProducer struct { + Topic string + Producer sarama.SyncProducer +} + +// NewSyncAPIProducer creates a new SyncAPIProducer +func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) { + producer, err := sarama.NewSyncProducer(kafkaURIs, nil) + if err != nil { + return nil, err + } + return &SyncAPIProducer{ + Topic: topic, + Producer: producer, + }, nil +} + +// SendData sends account data to the sync API server +func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { + var m sarama.ProducerMessage + + data := common.AccountData{ + RoomID: roomID, + Type: dataType, + } + value, err := json.Marshal(data) + if err != nil { + return err + } + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(userID) + m.Value = sarama.ByteEncoder(value) + + if _, _, err := p.Producer.SendMessage(&m); err != nil { + return err + } + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go b/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go new file mode 100644 index 000000000..d3bb932de --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/account_data.go @@ -0,0 +1,74 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package readers + +import ( + "io/ioutil" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/util" +) + +// SaveAccountData implements PUT /user/{userId}/[rooms/{roomId}/]account_data/{type} +func SaveAccountData( + req *http.Request, accountDB *accounts.Database, device *authtypes.Device, + userID string, roomID string, dataType string, syncProducer *producers.SyncAPIProducer, +) util.JSONResponse { + if req.Method != "PUT" { + return util.JSONResponse{ + Code: 405, + JSON: jsonerror.NotFound("Bad method"), + } + } + + if userID != device.UserID { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("userID does not match the current user"), + } + } + + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return httputil.LogThenError(req, err) + } + + defer req.Body.Close() + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + return httputil.LogThenError(req, err) + } + + if err := accountDB.SaveAccountData(localpart, roomID, dataType, string(body)); err != nil { + return httputil.LogThenError(req, err) + } + + if err := syncProducer.SendData(userID, roomID, dataType); err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: struct{}{}, + } +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go b/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go index 336f29fed..4c25a45cd 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go @@ -15,13 +15,13 @@ package readers import ( - "fmt" "net/http" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -34,6 +34,7 @@ func DirectoryRoom( roomAlias string, federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, + aliasAPI api.RoomserverAliasAPI, ) util.JSONResponse { _, domain, err := gomatrixserverlib.SplitID('#', roomAlias) if err != nil { @@ -43,11 +44,30 @@ func DirectoryRoom( } } + var resp gomatrixserverlib.RespDirectory + if domain == cfg.Matrix.ServerName { - // TODO: Implement lookup up local room aliases. - panic(fmt.Errorf("Looking up local room aliases is not implemented")) + queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias} + var queryRes api.GetAliasRoomIDResponse + if err = aliasAPI.GetAliasRoomID(&queryReq, &queryRes); err != nil { + return httputil.LogThenError(req, err) + } + + if len(queryRes.RoomID) > 0 { + // TODO: List servers that are aware of this room alias + resp = gomatrixserverlib.RespDirectory{ + RoomID: queryRes.RoomID, + Servers: []gomatrixserverlib.ServerName{}, + } + } else { + // If the response doesn't contain a non-empty string, return an error + return util.JSONResponse{ + Code: 404, + JSON: jsonerror.NotFound("Room alias " + roomAlias + " not found."), + } + } } else { - resp, err := federation.LookupRoomAlias(domain, roomAlias) + resp, err = federation.LookupRoomAlias(domain, roomAlias) if err != nil { switch x := err.(type) { case gomatrix.HTTPError: @@ -62,10 +82,88 @@ func DirectoryRoom( // TODO: Return 504 if the remote server timed out. return httputil.LogThenError(req, err) } + } - return util.JSONResponse{ - Code: 200, - JSON: resp, - } + return util.JSONResponse{ + Code: 200, + JSON: resp, + } +} + +// SetLocalAlias implements PUT /directory/room/{roomAlias} +// TODO: Check if the user has the power level to set an alias +func SetLocalAlias( + req *http.Request, + device *authtypes.Device, + alias string, + cfg *config.Dendrite, + aliasAPI api.RoomserverAliasAPI, +) util.JSONResponse { + _, domain, err := gomatrixserverlib.SplitID('#', alias) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON("Room alias must be in the form '#localpart:domain'"), + } + } + + if domain != cfg.Matrix.ServerName { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("Alias must be on local homeserver"), + } + } + + var r struct { + RoomID string `json:"room_id"` + } + if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { + return *resErr + } + + queryReq := api.SetRoomAliasRequest{ + UserID: device.UserID, + RoomID: r.RoomID, + Alias: alias, + } + var queryRes api.SetRoomAliasResponse + if err := aliasAPI.SetRoomAlias(&queryReq, &queryRes); err != nil { + return httputil.LogThenError(req, err) + } + + if queryRes.AliasExists { + return util.JSONResponse{ + Code: 409, + JSON: jsonerror.Unknown("The alias " + alias + " already exists."), + } + } + + return util.JSONResponse{ + Code: 200, + JSON: struct{}{}, + } +} + +// RemoveLocalAlias implements DELETE /directory/room/{roomAlias} +// TODO: Check if the user has the power level to remove an alias +func RemoveLocalAlias( + req *http.Request, + device *authtypes.Device, + alias string, + cfg *config.Dendrite, + aliasAPI api.RoomserverAliasAPI, +) util.JSONResponse { + queryReq := api.RemoveRoomAliasRequest{ + Alias: alias, + UserID: device.UserID, + } + var queryRes api.RemoveRoomAliasResponse + if err := aliasAPI.RemoveRoomAlias(&queryReq, &queryRes); err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: struct{}{}, } } diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/logout.go b/src/github.com/matrix-org/dendrite/clientapi/readers/logout.go index 62aaee1c3..585527fc9 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/logout.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/logout.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -35,7 +36,11 @@ func Logout( } } - localpart := getLocalPart(device.UserID) + localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) + if err != nil { + return httputil.LogThenError(req, err) + } + if err := deviceDB.RemoveDevice(device.ID, localpart); err != nil { return httputil.LogThenError(req, err) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go index dcdb14b44..a449c38d8 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go @@ -17,12 +17,17 @@ package readers import ( "fmt" "net/http" - "strings" + "time" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -50,7 +55,11 @@ func GetProfile( JSON: jsonerror.NotFound("Bad method"), } } - localpart := getLocalPart(userID) + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return httputil.LogThenError(req, err) + } + profile, err := accountDB.GetProfileByLocalpart(localpart) if err != nil { return httputil.LogThenError(req, err) @@ -69,7 +78,11 @@ func GetProfile( func GetAvatarURL( req *http.Request, accountDB *accounts.Database, userID string, ) util.JSONResponse { - localpart := getLocalPart(userID) + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return httputil.LogThenError(req, err) + } + profile, err := accountDB.GetProfileByLocalpart(localpart) if err != nil { return httputil.LogThenError(req, err) @@ -85,9 +98,19 @@ func GetAvatarURL( // SetAvatarURL implements PUT /profile/{userID}/avatar_url func SetAvatarURL( - req *http.Request, accountDB *accounts.Database, userID string, - producer *producers.UserUpdateProducer, + req *http.Request, accountDB *accounts.Database, device *authtypes.Device, + userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite, + rsProducer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI, ) util.JSONResponse { + if userID != device.UserID { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("userID does not match the current user"), + } + } + + changedKey := "avatar_url" + var r avatarURL if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { return *resErr @@ -99,18 +122,41 @@ func SetAvatarURL( } } - localpart := getLocalPart(userID) + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return httputil.LogThenError(req, err) + } oldProfile, err := accountDB.GetProfileByLocalpart(localpart) if err != nil { return httputil.LogThenError(req, err) } - if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil { + if err = accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil { return httputil.LogThenError(req, err) } - if err := producer.SendUpdate(userID, "avatar_url", oldProfile.AvatarURL, r.AvatarURL); err != nil { + memberships, err := accountDB.GetMembershipsByLocalpart(localpart) + if err != nil { + return httputil.LogThenError(req, err) + } + + newProfile := authtypes.Profile{ + Localpart: localpart, + DisplayName: oldProfile.DisplayName, + AvatarURL: r.AvatarURL, + } + + events, err := buildMembershipEvents(memberships, accountDB, newProfile, userID, cfg, queryAPI) + if err != nil { + return httputil.LogThenError(req, err) + } + + if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil { + return httputil.LogThenError(req, err) + } + + if err := producer.SendUpdate(userID, changedKey, oldProfile.AvatarURL, r.AvatarURL); err != nil { return httputil.LogThenError(req, err) } @@ -124,7 +170,11 @@ func SetAvatarURL( func GetDisplayName( req *http.Request, accountDB *accounts.Database, userID string, ) util.JSONResponse { - localpart := getLocalPart(userID) + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return httputil.LogThenError(req, err) + } + profile, err := accountDB.GetProfileByLocalpart(localpart) if err != nil { return httputil.LogThenError(req, err) @@ -140,9 +190,19 @@ func GetDisplayName( // SetDisplayName implements PUT /profile/{userID}/displayname func SetDisplayName( - req *http.Request, accountDB *accounts.Database, userID string, - producer *producers.UserUpdateProducer, + req *http.Request, accountDB *accounts.Database, device *authtypes.Device, + userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite, + rsProducer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI, ) util.JSONResponse { + if userID != device.UserID { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("userID does not match the current user"), + } + } + + changedKey := "displayname" + var r displayName if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { return *resErr @@ -154,18 +214,41 @@ func SetDisplayName( } } - localpart := getLocalPart(userID) + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return httputil.LogThenError(req, err) + } oldProfile, err := accountDB.GetProfileByLocalpart(localpart) if err != nil { return httputil.LogThenError(req, err) } - if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil { + if err = accountDB.SetDisplayName(localpart, r.DisplayName); err != nil { return httputil.LogThenError(req, err) } - if err := producer.SendUpdate(userID, "displayname", oldProfile.DisplayName, r.DisplayName); err != nil { + memberships, err := accountDB.GetMembershipsByLocalpart(localpart) + if err != nil { + return httputil.LogThenError(req, err) + } + + newProfile := authtypes.Profile{ + Localpart: localpart, + DisplayName: r.DisplayName, + AvatarURL: oldProfile.AvatarURL, + } + + events, err := buildMembershipEvents(memberships, accountDB, newProfile, userID, cfg, queryAPI) + if err != nil { + return httputil.LogThenError(req, err) + } + + if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil { + return httputil.LogThenError(req, err) + } + + if err := producer.SendUpdate(userID, changedKey, oldProfile.DisplayName, r.DisplayName); err != nil { return httputil.LogThenError(req, err) } @@ -175,13 +258,71 @@ func SetDisplayName( } } -func getLocalPart(userID string) string { - if !strings.HasPrefix(userID, "@") { - panic(fmt.Errorf("Invalid user ID")) +func buildMembershipEvents( + memberships []authtypes.Membership, db *accounts.Database, + newProfile authtypes.Profile, userID string, cfg *config.Dendrite, + queryAPI api.RoomserverQueryAPI, +) ([]gomatrixserverlib.Event, error) { + evs := []gomatrixserverlib.Event{} + + for _, membership := range memberships { + builder := gomatrixserverlib.EventBuilder{ + Sender: userID, + RoomID: membership.RoomID, + Type: "m.room.member", + StateKey: &userID, + } + + content := events.MemberContent{ + Membership: "join", + } + + content.DisplayName = newProfile.DisplayName + content.AvatarURL = newProfile.AvatarURL + + if err := builder.SetContent(content); err != nil { + return nil, err + } + + eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&builder) + if err != nil { + return nil, err + } + + // Ask the roomserver for information about this room + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: membership.RoomID, + StateToFetch: eventsNeeded.Tuples(), + } + var queryRes api.QueryLatestEventsAndStateResponse + if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil { + return nil, err + } + + builder.Depth = queryRes.Depth + builder.PrevEvents = queryRes.LatestEvents + + authEvents := gomatrixserverlib.NewAuthEvents(nil) + + for i := range queryRes.StateEvents { + authEvents.AddEvent(&queryRes.StateEvents[i]) + } + + refs, err := eventsNeeded.AuthEventReferences(&authEvents) + if err != nil { + return nil, err + } + builder.AuthEvents = refs + + eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName) + now := time.Now() + event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey) + if err != nil { + return nil, err + } + + evs = append(evs, event) } - // Get the part before ":" - username := strings.Split(userID, ":")[0] - // Return the part after the "@" - return strings.Split(username, "@")[1] + return evs, nil } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 8d6f024e2..aeb231645 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -42,11 +42,13 @@ const pathPrefixUnstable = "/_matrix/client/unstable" func Setup( servMux *http.ServeMux, httpClient *http.Client, cfg config.Dendrite, producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI, + aliasAPI api.RoomserverAliasAPI, accountDB *accounts.Database, deviceDB *devices.Database, federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, userUpdateProducer *producers.UserUpdateProducer, + syncProducer *producers.SyncAPIProducer, ) { apiMux := mux.NewRouter() @@ -70,14 +72,14 @@ func Setup( r0mux.Handle("/createRoom", common.MakeAuthAPI("createRoom", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { - return writers.CreateRoom(req, device, cfg, producer) + return writers.CreateRoom(req, device, cfg, producer, accountDB) }), ) r0mux.Handle("/join/{roomIDOrAlias}", common.MakeAuthAPI("join", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) return writers.JoinRoomByIDOrAlias( - req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, keyRing, + req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, aliasAPI, keyRing, accountDB, ) }), ) @@ -109,9 +111,23 @@ func Setup( r0mux.Handle("/directory/room/{roomAlias}", common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.DirectoryRoom(req, device, vars["roomAlias"], federation, &cfg) + return readers.DirectoryRoom(req, device, vars["roomAlias"], federation, &cfg, aliasAPI) }), - ) + ).Methods("GET") + + r0mux.Handle("/directory/room/{roomAlias}", + common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return readers.SetLocalAlias(req, device, vars["roomAlias"], &cfg, aliasAPI) + }), + ).Methods("PUT", "OPTIONS") + + r0mux.Handle("/directory/room/{roomAlias}", + common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return readers.RemoveLocalAlias(req, device, vars["roomAlias"], &cfg, aliasAPI) + }), + ).Methods("DELETE") r0mux.Handle("/logout", common.MakeAuthAPI("logout", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { @@ -185,7 +201,7 @@ func Setup( r0mux.Handle("/profile/{userID}/avatar_url", common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetAvatarURL(req, accountDB, vars["userID"], userUpdateProducer) + return readers.SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -201,7 +217,7 @@ func Setup( r0mux.Handle("/profile/{userID}/displayname", common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetDisplayName(req, accountDB, vars["userID"], userUpdateProducer) + return readers.SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -274,9 +290,16 @@ func Setup( ) r0mux.Handle("/user/{userID}/account_data/{type}", - common.MakeAPI("user_account_data", func(req *http.Request) util.JSONResponse { - // TODO: Set and get the account_data - return util.JSONResponse{Code: 200, JSON: struct{}{}} + common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"], syncProducer) + }), + ) + + r0mux.Handle("/user/{userID}/rooms/{roomID}/account_data/{type}", + common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"], syncProducer) }), ) diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index 0a2b185e8..e43ae780d 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -23,6 +23,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -84,15 +85,21 @@ type fledglingEvent struct { } // CreateRoom implements /createRoom -func CreateRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, producer *producers.RoomserverProducer) util.JSONResponse { +func CreateRoom(req *http.Request, device *authtypes.Device, + cfg config.Dendrite, producer *producers.RoomserverProducer, + accountDB *accounts.Database, +) util.JSONResponse { // TODO: Check room ID doesn't clash with an existing one, and we // probably shouldn't be using pseudo-random strings, maybe GUIDs? roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName) - return createRoom(req, device, cfg, roomID, producer) + return createRoom(req, device, cfg, roomID, producer, accountDB) } // createRoom implements /createRoom -func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer) util.JSONResponse { +func createRoom(req *http.Request, device *authtypes.Device, + cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer, + accountDB *accounts.Database, +) util.JSONResponse { logger := util.GetLogger(req.Context()) userID := device.UserID var r createRoomRequest @@ -115,6 +122,22 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite "roomID": roomID, }).Info("Creating new room") + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return httputil.LogThenError(req, err) + } + + profile, err := accountDB.GetProfileByLocalpart(localpart) + if err != nil { + return httputil.LogThenError(req, err) + } + + membershipContent := events.MemberContent{ + Membership: "join", + DisplayName: profile.DisplayName, + AvatarURL: profile.AvatarURL, + } + var builtEvents []gomatrixserverlib.Event // send events into the room in order of: @@ -137,7 +160,7 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite // TODO: Synapse has txn/token ID on each event. Do we need to do this here? eventsToMake := []fledglingEvent{ {"m.room.create", "", events.CreateContent{Creator: userID}}, - {"m.room.member", userID, events.MemberContent{Membership: "join"}}, // TODO: Set avatar_url / displayname + {"m.room.member", userID, membershipContent}, {"m.room.power_levels", "", events.InitialPowerLevelsContent(userID)}, // TODO: m.room.canonical_alias {"m.room.join_rules", "", events.JoinRulesContent{"public"}}, // FIXME: Allow this to be changed diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go index 55cc65acb..6618b0bdf 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go @@ -21,6 +21,7 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" @@ -41,16 +42,30 @@ func JoinRoomByIDOrAlias( federation *gomatrixserverlib.FederationClient, producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI, + aliasAPI api.RoomserverAliasAPI, keyRing gomatrixserverlib.KeyRing, + accountDB *accounts.Database, ) util.JSONResponse { var content map[string]interface{} // must be a JSON object if resErr := httputil.UnmarshalJSONRequest(req, &content); resErr != nil { return *resErr } - content["membership"] = "join" + localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) + if err != nil { + return httputil.LogThenError(req, err) + } - r := joinRoomReq{req, content, device.UserID, cfg, federation, producer, queryAPI, keyRing} + profile, err := accountDB.GetProfileByLocalpart(localpart) + if err != nil { + return httputil.LogThenError(req, err) + } + + content["membership"] = "join" + content["displayname"] = profile.DisplayName + content["avatar_url"] = profile.AvatarURL + + r := joinRoomReq{req, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing} if strings.HasPrefix(roomIDOrAlias, "!") { return r.joinRoomByID() @@ -72,6 +87,7 @@ type joinRoomReq struct { federation *gomatrixserverlib.FederationClient producer *producers.RoomserverProducer queryAPI api.RoomserverQueryAPI + aliasAPI api.RoomserverAliasAPI keyRing gomatrixserverlib.KeyRing } @@ -97,11 +113,23 @@ func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse { } } if domain == r.cfg.Matrix.ServerName { - // TODO: Implement joining local room aliases. - panic(fmt.Errorf("Joining local room aliases is not implemented")) - } else { - return r.joinRoomByRemoteAlias(domain, roomAlias) + queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias} + var queryRes api.GetAliasRoomIDResponse + if err = r.aliasAPI.GetAliasRoomID(&queryReq, &queryRes); err != nil { + return httputil.LogThenError(r.req, err) + } + + if len(queryRes.RoomID) > 0 { + return r.joinRoomUsingServers(queryRes.RoomID, []gomatrixserverlib.ServerName{r.cfg.Matrix.ServerName}) + } + // If the response doesn't contain a non-empty string, return an error + return util.JSONResponse{ + Code: 404, + JSON: jsonerror.NotFound("Room alias " + roomAlias + " not found."), + } } + // If the room isn't local, use federation to join + return r.joinRoomByRemoteAlias(domain, roomAlias) } func (r joinRoomReq) joinRoomByRemoteAlias( @@ -126,7 +154,7 @@ func (r joinRoomReq) joinRoomByRemoteAlias( func (r joinRoomReq) writeToBuilder(eb *gomatrixserverlib.EventBuilder, roomID string) { eb.Type = "m.room.member" - eb.SetContent(r.content) // TODO: Set avatar_url / displayname + eb.SetContent(r.content) eb.SetUnsigned(struct{}{}) eb.Sender = r.userID eb.StateKey = &r.userID @@ -156,9 +184,44 @@ func (r joinRoomReq) joinRoomUsingServers( } if queryRes.RoomExists { - // TODO: Implement joining rooms that already the server is already in. - // This should just fall through to the usual event sending code. - panic(fmt.Errorf("Joining rooms that the server already in is not implemented")) + // The room exists in the local database, so we just have to send a join + // membership event and return the room ID + // TODO: Check if the user is allowed in the room (has been invited if + // the room is invite-only) + eb.Depth = queryRes.Depth + eb.PrevEvents = queryRes.LatestEvents + + authEvents := gomatrixserverlib.NewAuthEvents(nil) + + for i := range queryRes.StateEvents { + authEvents.AddEvent(&queryRes.StateEvents[i]) + } + + refs, err := needed.AuthEventReferences(&authEvents) + if err != nil { + return httputil.LogThenError(r.req, err) + } + eb.AuthEvents = refs + + now := time.Now() + eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.Matrix.ServerName) + event, err := eb.Build( + eventID, now, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey, + ) + if err != nil { + return httputil.LogThenError(r.req, err) + } + + if err := r.producer.SendEvents([]gomatrixserverlib.Event{event}, r.cfg.Matrix.ServerName); err != nil { + return httputil.LogThenError(r.req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: struct { + RoomID string `json:"room_id"` + }{roomID}, + } } if len(servers) == 0 { diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index eb1218e7c..6f568b1a1 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -52,6 +52,7 @@ func main() { log.Info("config: ", cfg) queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) + aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil) roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL()) userUpdateProducer, err := producers.NewUserUpdateProducer( @@ -60,6 +61,12 @@ func main() { if err != nil { log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) } + syncProducer, err := producers.NewSyncAPIProducer( + cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData), + ) + if err != nil { + log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) + } federation := gomatrixserverlib.NewFederationClient( cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, @@ -97,7 +104,8 @@ func main() { log.Info("Starting client API server on ", cfg.Listen.ClientAPI) routing.Setup( http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer, - queryAPI, accountDB, deviceDB, federation, keyRing, userUpdateProducer, + queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing, + userUpdateProducer, syncProducer, ) log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go index 2c62cdb97..c6bd9dea6 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go @@ -29,7 +29,7 @@ import ( var ( logDir = os.Getenv("LOG_DIR") - configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") + configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") ) func main() { diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go index 715a40740..bd8116b6d 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go @@ -23,6 +23,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/alias" "github.com/matrix-org/dendrite/roomserver/input" "github.com/matrix-org/dendrite/roomserver/query" "github.com/matrix-org/dendrite/roomserver/storage" @@ -32,7 +33,7 @@ import ( var ( logDir = os.Getenv("LOG_DIR") - configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") + configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") ) func main() { @@ -58,12 +59,6 @@ func main() { panic(err) } - queryAPI := query.RoomserverQueryAPI{ - DB: db, - } - - queryAPI.SetupHTTP(http.DefaultServeMux) - inputAPI := input.RoomserverInputAPI{ DB: db, Producer: kafkaProducer, @@ -72,6 +67,19 @@ func main() { inputAPI.SetupHTTP(http.DefaultServeMux) + queryAPI := query.RoomserverQueryAPI{db} + + queryAPI.SetupHTTP(http.DefaultServeMux) + + aliasAPI := alias.RoomserverAliasAPI{ + DB: db, + Cfg: cfg, + InputAPI: inputAPI, + QueryAPI: queryAPI, + } + + aliasAPI.SetupHTTP(http.DefaultServeMux) + http.DefaultServeMux.Handle("/metrics", prometheus.Handler()) log.Info("Started room server on ", cfg.Listen.RoomServer) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index a8984c1d5..77ada4121 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -19,6 +19,7 @@ import ( "net/http" "os" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" @@ -58,6 +59,11 @@ func main() { log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err) } + adb, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName) + if err != nil { + log.Panicf("startup: failed to create account database with data source %s : %s", cfg.Database.Account, err) + } + pos, err := db.SyncStreamPosition() if err != nil { log.Panicf("startup: failed to get latest sync stream position : %s", err) @@ -67,15 +73,22 @@ func main() { if err = n.Load(db); err != nil { log.Panicf("startup: failed to set up notifier: %s", err) } - consumer, err := consumers.NewOutputRoomEvent(cfg, n, db) + roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db) if err != nil { log.Panicf("startup: failed to create room server consumer: %s", err) } - if err = consumer.Start(); err != nil { - log.Panicf("startup: failed to start room server consumer") + if err = roomConsumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer: %s", err) + } + clientConsumer, err := consumers.NewOutputClientData(cfg, n, db) + if err != nil { + log.Panicf("startup: failed to create client API server consumer: %s", err) + } + if err = clientConsumer.Start(); err != nil { + log.Panicf("startup: failed to start client API server consumer: %s", err) } log.Info("Starting sync server on ", cfg.Listen.SyncAPI) - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n), deviceDB) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n, adb), deviceDB) log.Fatal(http.ListenAndServe(string(cfg.Listen.SyncAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index b0e36c425..e39a89808 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -54,6 +54,7 @@ var ( ) const inputTopic = "syncserverInput" +const clientTopic = "clientapiserverOutput" var exe = test.KafkaExecutor{ ZookeeperURI: zookeeperURI, @@ -134,6 +135,7 @@ func startSyncServer() (*exec.Cmd, chan error) { cfg.Matrix.ServerName = "localhost" cfg.Listen.SyncAPI = config.Address(syncserverAddr) cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic) + cfg.Kafka.Topics.OutputClientData = config.Topic(clientTopic) if err := test.WriteConfig(cfg, dir); err != nil { panic(err) @@ -177,6 +179,10 @@ func prepareKafka() { if err := exe.CreateTopic(inputTopic); err != nil { panic(err) } + exe.DeleteTopic(clientTopic) + if err := exe.CreateTopic(clientTopic); err != nil { + panic(err) + } } func testSyncServer(syncServerCmdChan chan error, userID, since, want string) { diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go index 5cf090d3d..14c5dd329 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go @@ -71,11 +71,11 @@ var outputRoomEventTestData = []string{ // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost" `{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"membership":"invite"},"depth":0,"event_id":"$zzLHVlHIWPrnE7DI:localhost","hashes":{"sha256":"LKk7tnYJAHsyffbi9CzfdP+TU4KQ5g6YTgYGKjJ7NxU"},"origin":"localhost","origin_server_ts":1494411709192,"prev_events":[["$4NBTdIwDxq5fDGpv:localhost",{"sha256":"EpqmxEoJP93Zb2Nt2fS95SJWTqqIutHm/Ne8OHqp6Ps"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"GdUzkC+7YKl1XDi7kYuD39yi2L/+nv+YrecIQHS+0BLDQqnEj+iRXfNBuZfTk6lUBCJCHXZlk7MnEIjvWDlZCg"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"adds_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$4NBTdIwDxq5fDGpv:localhost"}}`, // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" - `{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"adds_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"removes_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$zzLHVlHIWPrnE7DI:localhost"}}`, + `{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"unsigned":{"prev_content":{"membership":"invite"},"prev_sender":"@bob:localhost","replaces_state":"$zzLHVlHIWPrnE7DI:localhost"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"adds_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"removes_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$zzLHVlHIWPrnE7DI:localhost"}}`, // $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" `{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"not charlie...","msgtype":"m.text"},"depth":0,"event_id":"$Ixfn5WT9ocWTYxfy:localhost","hashes":{"sha256":"hRChdyMQ3AY4jvrPpI8PEX6Taux83Qo5hdSeHlhPxGo"},"origin":"localhost","origin_server_ts":1494411792737,"prev_events":[["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"LC/Zqwu/XdqjmLdTOp/NQaFaE0niSAGgEpa39gCxsnsqEX80P7P5WDn/Kzx6rjWTnhIszrLsnoycqkXQT0Z4DQ"}},"type":"m.room.message"},"latest_event_ids":["$Ixfn5WT9ocWTYxfy:localhost"],"last_sent_event_id":"$uJVKyzZi8ZX0kOd9:localhost"}}`, // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost" - `{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"adds_state_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"removes_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"last_sent_event_id":"$Ixfn5WT9ocWTYxfy:localhost"}}`, + `{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"unsigned":{"prev_content":{"membership":"join"},"prev_sender":"@charlie:localhost","replaces_state":"$uJVKyzZi8ZX0kOd9:localhost"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"adds_state_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"removes_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"last_sent_event_id":"$Ixfn5WT9ocWTYxfy:localhost"}}`, // $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" `{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"why did you kick charlie","msgtype":"m.text"},"depth":0,"event_id":"$hgao5gTmr3r9TtK2:localhost","hashes":{"sha256":"Aa2ZCrvwjX5xhvkVqIOFUeEGqrnrQZjjNFiZRybjsPY"},"origin":"localhost","origin_server_ts":1494411912809,"prev_events":[["$om1F4AI8tCYlHUSp:localhost",{"sha256":"yVs+CW7AiJrJOYouL8xPIBrtIHAhnbxaegna8MxeCto"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"sGkpbEXGsvAuCvE3wb5E9H5fjCVKpRdWNt6csj1bCB9Fmg4Rg4mvj3TAJ+91DjO8IPsgSxDKdqqRYF0OtcynBA"}},"type":"m.room.message"},"latest_event_ids":["$hgao5gTmr3r9TtK2:localhost"],"last_sent_event_id":"$om1F4AI8tCYlHUSp:localhost"}}`, // $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 4b362b5ff..324561f68 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -98,6 +98,8 @@ type Dendrite struct { Topics struct { // Topic for roomserver/api.OutputRoomEvent events. OutputRoomEvent Topic `yaml:"output_room_event"` + // Topic for sending account data from client API to sync API + OutputClientData Topic `yaml:"output_client_data"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` } @@ -298,6 +300,7 @@ func (config *Dendrite) check() error { checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) + checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.device", string(config.Database.Device)) checkNotEmpty("database.server_key", string(config.Database.ServerKey)) diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index 7af619688..4275e3d41 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -44,6 +44,7 @@ kafka: topics: input_room_event: input.room output_room_event: output.room + output_client_data: output.client database: media_api: "postgresql:///media_api" account: "postgresql:///account" diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index e429d06b0..a28a08d54 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -82,6 +82,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con // TODO: Different servers should be using different topics. // Make this configurable somehow? cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" + cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" // TODO: Use different databases for the different schemas. // Using the same database for every schema currently works because diff --git a/src/github.com/matrix-org/dendrite/common/types.go b/src/github.com/matrix-org/dendrite/common/types.go new file mode 100644 index 000000000..471a2f30b --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/types.go @@ -0,0 +1,22 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +// AccountData represents account data sent from the client API server to the +// sync API server +type AccountData struct { + RoomID string `json:"room_id"` + Type string `json:"type"` +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/alias/alias.go b/src/github.com/matrix-org/dendrite/roomserver/alias/alias.go new file mode 100644 index 000000000..e630343e7 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/alias/alias.go @@ -0,0 +1,256 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alias + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/input" + "github.com/matrix-org/dendrite/roomserver/query" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +// RoomserverAliasAPIDatabase has the storage APIs needed to implement the alias API. +type RoomserverAliasAPIDatabase interface { + // Save a given room alias with the room ID it refers to. + // Returns an error if there was a problem talking to the database. + SetRoomAlias(alias string, roomID string) error + // Lookup the room ID a given alias refers to. + // Returns an error if there was a problem talking to the database. + GetRoomIDFromAlias(alias string) (string, error) + // Lookup all aliases referring to a given room ID. + // Returns an error if there was a problem talking to the database. + GetAliasesFromRoomID(roomID string) ([]string, error) + // Remove a given room alias. + // Returns an error if there was a problem talking to the database. + RemoveRoomAlias(alias string) error +} + +// RoomserverAliasAPI is an implementation of api.RoomserverAliasAPI +type RoomserverAliasAPI struct { + DB RoomserverAliasAPIDatabase + Cfg *config.Dendrite + InputAPI input.RoomserverInputAPI + QueryAPI query.RoomserverQueryAPI +} + +// SetRoomAlias implements api.RoomserverAliasAPI +func (r *RoomserverAliasAPI) SetRoomAlias( + request *api.SetRoomAliasRequest, + response *api.SetRoomAliasResponse, +) error { + // Check if the alias isn't already referring to a room + roomID, err := r.DB.GetRoomIDFromAlias(request.Alias) + if err != nil { + return err + } + if len(roomID) > 0 { + // If the alias already exists, stop the process + response.AliasExists = true + return nil + } + response.AliasExists = false + + // Save the new alias + if err := r.DB.SetRoomAlias(request.Alias, request.RoomID); err != nil { + return err + } + + // Send a m.room.aliases event with the updated list of aliases for this room + if err := r.sendUpdatedAliasesEvent(request.UserID, request.RoomID); err != nil { + return err + } + + return nil +} + +// GetAliasRoomID implements api.RoomserverAliasAPI +func (r *RoomserverAliasAPI) GetAliasRoomID( + request *api.GetAliasRoomIDRequest, + response *api.GetAliasRoomIDResponse, +) error { + // Lookup the room ID in the database + roomID, err := r.DB.GetRoomIDFromAlias(request.Alias) + if err != nil { + return err + } + + response.RoomID = roomID + return nil +} + +// RemoveRoomAlias implements api.RoomserverAliasAPI +func (r *RoomserverAliasAPI) RemoveRoomAlias( + request *api.RemoveRoomAliasRequest, + response *api.RemoveRoomAliasResponse, +) error { + // Lookup the room ID in the database + roomID, err := r.DB.GetRoomIDFromAlias(request.Alias) + if err != nil { + return err + } + + // Remove the dalias from the database + if err := r.DB.RemoveRoomAlias(request.Alias); err != nil { + return err + } + + // Send an updated m.room.aliases event + if err := r.sendUpdatedAliasesEvent(request.UserID, roomID); err != nil { + return err + } + + return nil +} + +type roomAliasesContent struct { + Aliases []string `json:"aliases"` +} + +// Build the updated m.room.aliases event to send to the room after addition or +// removal of an alias +func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(userID string, roomID string) error { + serverName := string(r.Cfg.Matrix.ServerName) + + builder := gomatrixserverlib.EventBuilder{ + Sender: userID, + RoomID: roomID, + Type: "m.room.aliases", + StateKey: &serverName, + } + + // Retrieve the updated list of aliases, marhal it and set it as the + // event's content + aliases, err := r.DB.GetAliasesFromRoomID(roomID) + if err != nil { + return err + } + content := roomAliasesContent{Aliases: aliases} + rawContent, err := json.Marshal(content) + if err != nil { + return err + } + err = builder.SetContent(json.RawMessage(rawContent)) + if err != nil { + return err + } + + // Get needed state events and depth + eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&builder) + if err != nil { + return err + } + req := api.QueryLatestEventsAndStateRequest{ + RoomID: roomID, + StateToFetch: eventsNeeded.Tuples(), + } + var res api.QueryLatestEventsAndStateResponse + if err = r.QueryAPI.QueryLatestEventsAndState(&req, &res); err != nil { + return err + } + builder.Depth = res.Depth + builder.PrevEvents = res.LatestEvents + + // Add auth events + authEvents := gomatrixserverlib.NewAuthEvents(nil) + for i := range res.StateEvents { + authEvents.AddEvent(&res.StateEvents[i]) + } + refs, err := eventsNeeded.AuthEventReferences(&authEvents) + if err != nil { + return err + } + builder.AuthEvents = refs + + // Build the event + eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.Cfg.Matrix.ServerName) + now := time.Now() + event, err := builder.Build(eventID, now, r.Cfg.Matrix.ServerName, r.Cfg.Matrix.KeyID, r.Cfg.Matrix.PrivateKey) + if err != nil { + return err + } + + // Create the request + ire := api.InputRoomEvent{ + Kind: api.KindNew, + Event: event, + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: serverName, + } + inputReq := api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{ire}, + } + var inputRes api.InputRoomEventsResponse + + // Send the request + if err := r.InputAPI.InputRoomEvents(&inputReq, &inputRes); err != nil { + return err + } + + return nil +} + +// SetupHTTP adds the RoomserverAliasAPI handlers to the http.ServeMux. +func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle( + api.RoomserverSetRoomAliasPath, + common.MakeAPI("setRoomAlias", func(req *http.Request) util.JSONResponse { + var request api.SetRoomAliasRequest + var response api.SetRoomAliasResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.SetRoomAlias(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) + servMux.Handle( + api.RoomserverGetAliasRoomIDPath, + common.MakeAPI("getAliasRoomID", func(req *http.Request) util.JSONResponse { + var request api.GetAliasRoomIDRequest + var response api.GetAliasRoomIDResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.GetAliasRoomID(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) + servMux.Handle( + api.RoomserverRemoveRoomAliasPath, + common.MakeAPI("removeRoomAlias", func(req *http.Request) util.JSONResponse { + var request api.RemoveRoomAliasRequest + var response api.RemoveRoomAliasResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.RemoveRoomAlias(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/alias.go b/src/github.com/matrix-org/dendrite/roomserver/api/alias.go new file mode 100644 index 000000000..bb65c3ae3 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/api/alias.go @@ -0,0 +1,129 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" +) + +// SetRoomAliasRequest is a request to SetRoomAlias +type SetRoomAliasRequest struct { + // ID of the user setting the alias + UserID string `json:"user_id"` + // New alias for the room + Alias string `json:"alias"` + // The room ID the alias is referring to + RoomID string `json:"room_id"` +} + +// SetRoomAliasResponse is a response to SetRoomAlias +type SetRoomAliasResponse struct { + // Does the alias already refer to a room? + AliasExists bool `json:"alias_exists"` +} + +// GetAliasRoomIDRequest is a request to GetAliasRoomID +type GetAliasRoomIDRequest struct { + // Alias we want to lookup + Alias string `json:"alias"` +} + +// GetAliasRoomIDResponse is a response to GetAliasRoomID +type GetAliasRoomIDResponse struct { + // The room ID the alias refers to + RoomID string `json:"room_id"` +} + +// RemoveRoomAliasRequest is a request to RemoveRoomAlias +type RemoveRoomAliasRequest struct { + // ID of the user removing the alias + UserID string `json:"user_id"` + // The room alias to remove + Alias string `json:"alias"` +} + +// RemoveRoomAliasResponse is a response to RemoveRoomAlias +type RemoveRoomAliasResponse struct{} + +// RoomserverAliasAPI is used to save, lookup or remove a room alias +type RoomserverAliasAPI interface { + // Set a room alias + SetRoomAlias( + req *SetRoomAliasRequest, + response *SetRoomAliasResponse, + ) error + + // Get the room ID for an alias + GetAliasRoomID( + req *GetAliasRoomIDRequest, + response *GetAliasRoomIDResponse, + ) error + + // Remove a room alias + RemoveRoomAlias( + req *RemoveRoomAliasRequest, + response *RemoveRoomAliasResponse, + ) error +} + +// RoomserverSetRoomAliasPath is the HTTP path for the SetRoomAlias API. +const RoomserverSetRoomAliasPath = "/api/roomserver/setRoomAlias" + +// RoomserverGetAliasRoomIDPath is the HTTP path for the GetAliasRoomID API. +const RoomserverGetAliasRoomIDPath = "/api/roomserver/getAliasRoomID" + +// RoomserverRemoveRoomAliasPath is the HTTP path for the RemoveRoomAlias API. +const RoomserverRemoveRoomAliasPath = "/api/roomserver/removeRoomAlias" + +// NewRoomserverAliasAPIHTTP creates a RoomserverAliasAPI implemented by talking to a HTTP POST API. +// If httpClient is nil then it uses the http.DefaultClient +func NewRoomserverAliasAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverAliasAPI { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &httpRoomserverAliasAPI{roomserverURL, httpClient} +} + +type httpRoomserverAliasAPI struct { + roomserverURL string + httpClient *http.Client +} + +// SetRoomAlias implements RoomserverAliasAPI +func (h *httpRoomserverAliasAPI) SetRoomAlias( + request *SetRoomAliasRequest, + response *SetRoomAliasResponse, +) error { + apiURL := h.roomserverURL + RoomserverSetRoomAliasPath + return postJSON(h.httpClient, apiURL, request, response) +} + +// GetAliasRoomID implements RoomserverAliasAPI +func (h *httpRoomserverAliasAPI) GetAliasRoomID( + request *GetAliasRoomIDRequest, + response *GetAliasRoomIDResponse, +) error { + // RemoveRoomAlias implements RoomserverAliasAPI + apiURL := h.roomserverURL + RoomserverGetAliasRoomIDPath + return postJSON(h.httpClient, apiURL, request, response) +} + +func (h *httpRoomserverAliasAPI) RemoveRoomAlias( + request *RemoveRoomAliasRequest, + response *RemoveRoomAliasResponse, +) error { + apiURL := h.roomserverURL + RoomserverRemoveRoomAliasPath + return postJSON(h.httpClient, apiURL, request, response) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 142df90e3..30b695fb4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -40,9 +40,21 @@ type RoomserverQueryAPIDatabase interface { // Lookup the numeric IDs for a list of events. // Returns an error if there was a problem talking to the database. EventNIDs(eventIDs []string) (map[string]types.EventNID, error) + // Save a given room alias with the room ID it refers to. + // Returns an error if there was a problem talking to the database. + SetRoomAlias(alias string, roomID string) error + // Lookup the room ID a given alias refers to. + // Returns an error if there was a problem talking to the database. + GetRoomIDFromAlias(alias string) (string, error) + // Lookup all aliases referring to a given room ID. + // Returns an error if there was a problem talking to the database. + GetAliasesFromRoomID(roomID string) ([]string, error) + // Remove a given room alias. + // Returns an error if there was a problem talking to the database. + RemoveRoomAlias(alias string) error } -// RoomserverQueryAPI is an implementation of RoomserverQueryAPI +// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI type RoomserverQueryAPI struct { DB RoomserverQueryAPIDatabase } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/room_aliases_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/room_aliases_table.go new file mode 100644 index 000000000..8b56b6d14 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/room_aliases_table.go @@ -0,0 +1,100 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" +) + +const roomAliasesSchema = ` +-- Stores room aliases and room IDs they refer to +CREATE TABLE IF NOT EXISTS room_aliases ( + -- Alias of the room + alias TEXT NOT NULL PRIMARY KEY, + -- Room ID the alias refers to + room_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS room_id_idx ON room_aliases(room_id); +` + +const insertRoomAliasSQL = "" + + "INSERT INTO room_aliases (alias, room_id) VALUES ($1, $2)" + +const selectRoomIDFromAliasSQL = "" + + "SELECT room_id FROM room_aliases WHERE alias = $1" + +const selectAliasesFromRoomIDSQL = "" + + "SELECT alias FROM room_aliases WHERE room_id = $1" + +const deleteRoomAliasSQL = "" + + "DELETE FROM room_aliases WHERE alias = $1" + +type roomAliasesStatements struct { + insertRoomAliasStmt *sql.Stmt + selectRoomIDFromAliasStmt *sql.Stmt + selectAliasesFromRoomIDStmt *sql.Stmt + deleteRoomAliasStmt *sql.Stmt +} + +func (s *roomAliasesStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(roomAliasesSchema) + if err != nil { + return + } + return statementList{ + {&s.insertRoomAliasStmt, insertRoomAliasSQL}, + {&s.selectRoomIDFromAliasStmt, selectRoomIDFromAliasSQL}, + {&s.selectAliasesFromRoomIDStmt, selectAliasesFromRoomIDSQL}, + {&s.deleteRoomAliasStmt, deleteRoomAliasSQL}, + }.prepare(db) +} + +func (s *roomAliasesStatements) insertRoomAlias(alias string, roomID string) (err error) { + _, err = s.insertRoomAliasStmt.Exec(alias, roomID) + return +} + +func (s *roomAliasesStatements) selectRoomIDFromAlias(alias string) (roomID string, err error) { + err = s.selectRoomIDFromAliasStmt.QueryRow(alias).Scan(&roomID) + if err == sql.ErrNoRows { + return "", nil + } + return +} + +func (s *roomAliasesStatements) selectAliasesFromRoomID(roomID string) (aliases []string, err error) { + aliases = []string{} + rows, err := s.selectAliasesFromRoomIDStmt.Query(roomID) + if err != nil { + return + } + + for rows.Next() { + var alias string + if err = rows.Scan(&alias); err != nil { + return + } + + aliases = append(aliases, alias) + } + + return +} + +func (s *roomAliasesStatements) deleteRoomAlias(alias string) (err error) { + _, err = s.deleteRoomAliasStmt.Exec(alias) + return +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index 10d3a801d..2ab066ea4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -29,6 +29,7 @@ type statements struct { previousEventStatements inviteStatements membershipStatements + roomAliasesStatements } func (s *statements) prepare(db *sql.DB) error { @@ -51,5 +52,9 @@ func (s *statements) prepare(db *sql.DB) error { } } + if err = s.roomAliasesStatements.prepare(db); err != nil { + return err + } + return nil } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 8ff0592a8..c4a701e8c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -353,6 +353,26 @@ func (d *Database) LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.Ev return references, currentStateSnapshotNID, depth, nil } +// SetRoomAlias implements alias.RoomserverAliasAPIDB +func (d *Database) SetRoomAlias(alias string, roomID string) error { + return d.statements.insertRoomAlias(alias, roomID) +} + +// GetRoomIDFromAlias implements alias.RoomserverAliasAPIDB +func (d *Database) GetRoomIDFromAlias(alias string) (string, error) { + return d.statements.selectRoomIDFromAlias(alias) +} + +// GetAliasesFromRoomID implements alias.RoomserverAliasAPIDB +func (d *Database) GetAliasesFromRoomID(roomID string) ([]string, error) { + return d.statements.selectAliasesFromRoomID(roomID) +} + +// RemoveRoomAlias implements alias.RoomserverAliasAPIDB +func (d *Database) RemoveRoomAlias(alias string) error { + return d.statements.deleteRoomAlias(alias) +} + // StateEntriesForTuples implements state.RoomStateDatabase func (d *Database) StateEntriesForTuples( stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple, diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go new file mode 100644 index 000000000..a2a240ff9 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -0,0 +1,91 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/sync" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputClientData consumes events that originated in the client API server. +type OutputClientData struct { + clientAPIConsumer *common.ContinualConsumer + db *storage.SyncServerDatabase + notifier *sync.Notifier +} + +// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. +func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + return nil, err + } + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputClientData), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputClientData{ + clientAPIConsumer: &consumer, + db: store, + notifier: n, + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *OutputClientData) Start() error { + return s.clientAPIConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the client API server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. +func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output common.AccountData + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("client API server output log: message parse failure") + return nil + } + + log.WithFields(log.Fields{ + "type": output.Type, + "room_id": output.RoomID, + }).Info("received data from client API server") + + syncStreamPos, err := s.db.UpsertAccountData(string(msg.Key), output.RoomID, output.Type) + if err != nil { + log.WithFields(log.Fields{ + "type": output.Type, + "room_id": output.RoomID, + log.ErrorKey: err, + }).Panicf("could not save account data") + } + + s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos) + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 70f42e1b7..c846705fc 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -37,6 +37,12 @@ type OutputRoomEvent struct { query api.RoomserverQueryAPI } +type prevEventRef struct { + PrevContent json.RawMessage `json:"prev_content"` + PrevID string `json:"replaces_state"` + UserID string `json:"prev_sender"` +} + // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) @@ -101,6 +107,18 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: state event lookup failure") } + ev, err = s.updateStateEvent(ev) + if err != nil { + return err + } + + for i := range addsStateEvents { + addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i]) + if err != nil { + return err + } + } + syncStreamPos, err := s.db.WriteEvent( &ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs, ) @@ -115,7 +133,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) + s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos)) return nil } @@ -177,6 +195,32 @@ func (s *OutputRoomEvent) lookupStateEvents( return result, nil } +func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) { + var stateKey string + if event.StateKey() == nil { + stateKey = "" + } else { + stateKey = *event.StateKey() + } + + prevEvent, err := s.db.GetStateEvent(event.Type(), event.RoomID(), stateKey) + if err != nil { + return event, err + } + + if prevEvent == nil { + return event, nil + } + + prev := prevEventRef{ + PrevContent: prevEvent.Content(), + PrevID: prevEvent.EventID(), + UserID: prevEvent.Sender(), + } + + return event.SetUnsigned(prev) +} + func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { have := map[string]bool{} for _, event := range events { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go new file mode 100644 index 000000000..f95e48583 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go @@ -0,0 +1,113 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +const accountDataSchema = ` +-- Stores the users account data +CREATE TABLE IF NOT EXISTS account_data_type ( + -- The highest numeric ID from the output_room_events at the time of saving the data + id BIGINT, + -- ID of the user the data belongs to + user_id TEXT NOT NULL, + -- ID of the room the data is related to (empty string if not related to a specific room) + room_id TEXT NOT NULL, + -- Type of the data + type TEXT NOT NULL, + + PRIMARY KEY(user_id, room_id, type), + + -- We don't want two entries of the same type for the same user + CONSTRAINT account_data_unique UNIQUE (user_id, room_id, type) +); + +CREATE UNIQUE INDEX IF NOT EXISTS account_data_id_idx ON account_data_type(id); +` + +const insertAccountDataSQL = "" + + "INSERT INTO account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" + + " ON CONFLICT ON CONSTRAINT account_data_unique" + + " DO UPDATE SET id = EXCLUDED.id" + +const selectAccountDataInRangeSQL = "" + + "SELECT room_id, type FROM account_data_type" + + " WHERE user_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id ASC" + +type accountDataStatements struct { + insertAccountDataStmt *sql.Stmt + selectAccountDataInRangeStmt *sql.Stmt +} + +func (s *accountDataStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(accountDataSchema) + if err != nil { + return + } + if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { + return + } + if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { + return + } + return +} + +func (s *accountDataStatements) insertAccountData( + pos types.StreamPosition, userID string, roomID string, dataType string, +) (err error) { + _, err = s.insertAccountDataStmt.Exec(pos, userID, roomID, dataType) + return +} + +func (s *accountDataStatements) selectAccountDataInRange( + userID string, oldPos types.StreamPosition, newPos types.StreamPosition, +) (data map[string][]string, err error) { + data = make(map[string][]string) + + // If both positions are the same, it means that the data was saved after the + // latest room event. In that case, we need to decrement the old position as + // it would prevent the SQL request from returning anything. + if oldPos == newPos { + oldPos-- + } + + rows, err := s.selectAccountDataInRangeStmt.Query(userID, oldPos, newPos) + if err != nil { + return + } + + for rows.Next() { + var dataType string + var roomID string + + if err = rows.Scan(&roomID, &dataType); err != nil { + return + } + + if len(data[roomID]) > 0 { + data[roomID] = append(data[roomID], dataType) + } else { + data[roomID] = []string{dataType} + } + } + + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index d4f260e00..64afaede8 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -66,6 +66,9 @@ const selectCurrentStateSQL = "" + const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" +const selectStateEventSQL = "" + + "SELECT event_json FROM current_room_state WHERE type = $1 AND room_id = $2 AND state_key = $3" + const selectEventsWithEventIDsSQL = "" + "SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)" @@ -76,6 +79,7 @@ type currentRoomStateStatements struct { selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt + selectStateEventStmt *sql.Stmt } func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { @@ -101,6 +105,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { return } + if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { + return + } return } @@ -195,3 +202,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { } return result, nil } + +func (s *currentRoomStateStatements) selectStateEvent(evType string, roomID string, stateKey string) (*gomatrixserverlib.Event, error) { + var res []byte + if err := s.selectStateEventStmt.QueryRow(evType, roomID, stateKey).Scan(&res); err == sql.ErrNoRows { + return nil, nil + } + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) + return &ev, err +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 27afd1c05..2433b68cb 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -41,10 +41,11 @@ type streamEvent struct { // SyncServerDatabase represents a sync server database type SyncServerDatabase struct { - db *sql.DB - partitions common.PartitionOffsetStatements - events outputRoomEventsStatements - roomstate currentRoomStateStatements + db *sql.DB + partitions common.PartitionOffsetStatements + accountData accountDataStatements + events outputRoomEventsStatements + roomstate currentRoomStateStatements } // NewSyncServerDatabase creates a new sync server database @@ -58,6 +59,10 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err = partitions.Prepare(db); err != nil { return nil, err } + accountData := accountDataStatements{} + if err = accountData.prepare(db); err != nil { + return nil, err + } events := outputRoomEventsStatements{} if err = events.prepare(db); err != nil { return nil, err @@ -66,7 +71,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err := state.prepare(db); err != nil { return nil, err } - return &SyncServerDatabase{db, partitions, events, state}, nil + return &SyncServerDatabase{db, partitions, accountData, events, state}, nil } // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. @@ -141,6 +146,13 @@ func (d *SyncServerDatabase) updateRoomState( return nil } +// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key +// If no event could be found, returns nil +// If there was an issue during the retrieval, returns an error +func (d *SyncServerDatabase) GetStateEvent(evType string, roomID string, stateKey string) (*gomatrixserverlib.Event, error) { + return d.roomstate.selectStateEvent(evType, roomID, stateKey) +} + // PartitionOffsets implements common.PartitionStorer func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { return d.partitions.SelectPartitionOffsets(topic) @@ -267,6 +279,33 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom return } +// GetAccountDataInRange returns all account data for a given user inserted or +// updated between two given positions +// Returns a map following the format data[roomID] = []dataTypes +// If no data is retrieved, returns an empty map +// If there was an issue with the retrieval, returns an error +func (d *SyncServerDatabase) GetAccountDataInRange( + userID string, oldPos types.StreamPosition, newPos types.StreamPosition, +) (map[string][]string, error) { + return d.accountData.selectAccountDataInRange(userID, oldPos, newPos) +} + +// UpsertAccountData keeps track of new or updated account data, by saving the type +// of the new/updated data, and the user ID and room ID the data is related to (empty) +// room ID means the data isn't specific to any room) +// If no data with the given type, user ID and room ID exists in the database, +// creates a new row, else update the existing one +// Returns an error if there was an issue with the upsert +func (d *SyncServerDatabase) UpsertAccountData(userID string, roomID string, dataType string) (types.StreamPosition, error) { + pos, err := d.SyncStreamPosition() + if err != nil { + return pos, err + } + + err = d.accountData.insertAccountData(pos, userID, roomID, dataType) + return pos, err +} + func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error { // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. roomIDs, err := d.roomstate.selectRoomIDsWithMembership(txn, userID, "invite") diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 814660a79..c2fdd8f03 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -54,39 +54,44 @@ func NewNotifier(pos types.StreamPosition) *Notifier { // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. -func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { +// Can be called either with a *gomatrixserverlib.Event, or with an user ID +func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) { // update the current position then notify relevant /sync streams. // This needs to be done PRIOR to waking up users as they will read this value. n.streamLock.Lock() defer n.streamLock.Unlock() n.currPos = pos - // Map this event's room_id to a list of joined users, and wake them up. - userIDs := n.joinedUsers(ev.RoomID()) - // If this is an invite, also add in the invitee to this list. - if ev.Type() == "m.room.member" && ev.StateKey() != nil { - userID := *ev.StateKey() - membership, err := ev.Membership() - if err != nil { - log.WithError(err).WithField("event_id", ev.EventID()).Errorf( - "Notifier.OnNewEvent: Failed to unmarshal member event", - ) - } else { - // Keep the joined user map up-to-date - switch membership { - case "invite": - userIDs = append(userIDs, userID) - case "join": - n.addJoinedUser(ev.RoomID(), userID) - case "leave": - fallthrough - case "ban": - n.removeJoinedUser(ev.RoomID(), userID) + if ev != nil { + // Map this event's room_id to a list of joined users, and wake them up. + userIDs := n.joinedUsers(ev.RoomID()) + // If this is an invite, also add in the invitee to this list. + if ev.Type() == "m.room.member" && ev.StateKey() != nil { + userID := *ev.StateKey() + membership, err := ev.Membership() + if err != nil { + log.WithError(err).WithField("event_id", ev.EventID()).Errorf( + "Notifier.OnNewEvent: Failed to unmarshal member event", + ) + } else { + // Keep the joined user map up-to-date + switch membership { + case "invite": + userIDs = append(userIDs, userID) + case "join": + n.addJoinedUser(ev.RoomID(), userID) + case "leave": + fallthrough + case "ban": + n.removeJoinedUser(ev.RoomID(), userID) + } } } - } - for _, userID := range userIDs { + for _, userID := range userIDs { + n.wakeupUser(userID, pos) + } + } else if len(userID) > 0 { n.wakeupUser(userID, pos) } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 03e39da07..358243bc5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -123,7 +123,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { stream := n.fetchUserStream(bob, true) waitForBlocking(stream, 1) - n.OnNewEvent(&randomMessageEvent, streamPositionAfter) + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter) wg.Wait() } @@ -151,7 +151,7 @@ func TestNewInviteEventForUser(t *testing.T) { stream := n.fetchUserStream(bob, true) waitForBlocking(stream, 1) - n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter) + n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter) wg.Wait() } @@ -182,7 +182,7 @@ func TestMultipleRequestWakeup(t *testing.T) { stream := n.fetchUserStream(bob, true) waitForBlocking(stream, 3) - n.OnNewEvent(&randomMessageEvent, streamPositionAfter) + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter) wg.Wait() @@ -217,7 +217,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { }() bobStream := n.fetchUserStream(bob, true) waitForBlocking(bobStream, 1) - n.OnNewEvent(&bobLeaveEvent, streamPositionAfter) + n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter) leaveWG.Wait() // send an event into the room. Make sure alice gets it. Bob should not. @@ -246,7 +246,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { waitForBlocking(aliceStream, 1) waitForBlocking(bobStream, 1) - n.OnNewEvent(&randomMessageEvent, streamPositionAfter2) + n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2) aliceWG.Wait() // it's possible that at this point alice has been informed and bob is about to be informed, so wait diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 08bad334a..a207b8152 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -20,22 +20,25 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db *storage.SyncServerDatabase - notifier *Notifier + db *storage.SyncServerDatabase + accountDB *accounts.Database + notifier *Notifier } // NewRequestPool makes a new RequestPool -func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier) *RequestPool { - return &RequestPool{db, n} +func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.Database) *RequestPool { + return &RequestPool{db, adb, n} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -77,9 +80,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype if err != nil { res = httputil.LogThenError(req, err) } else { - res = util.JSONResponse{ - Code: 200, - JSON: syncData, + syncData, err = rp.appendAccountData(syncData, device.UserID, *syncReq, currentPos) + if err != nil { + res = httputil.LogThenError(req, err) + } else { + res = util.JSONResponse{ + Code: 200, + JSON: syncData, + } } } done <- res @@ -104,3 +112,69 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre } return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) } + +func (rp *RequestPool) appendAccountData( + data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, +) (*types.Response, error) { + // TODO: We currently send all account data on every sync response, we should instead send data + // that has changed on incremental sync responses + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return nil, err + } + + if req.since == types.StreamPosition(0) { + // If this is the initial sync, we don't need to check if a data has + // already been sent. Instead, we send the whole batch. + var global []gomatrixserverlib.ClientEvent + var rooms map[string][]gomatrixserverlib.ClientEvent + global, rooms, err = rp.accountDB.GetAccountData(localpart) + if err != nil { + return nil, err + } + data.AccountData.Events = global + + for r, j := range data.Rooms.Join { + if len(rooms[r]) > 0 { + j.AccountData.Events = rooms[r] + data.Rooms.Join[r] = j + } + } + + return data, nil + } + + // Sync is not initial, get all account data since the latest sync + dataTypes, err := rp.db.GetAccountDataInRange(userID, req.since, currentPos) + if err != nil { + return nil, err + } + + if len(dataTypes) == 0 { + return data, nil + } + + // Iterate over the rooms + for roomID, dataTypes := range dataTypes { + events := []gomatrixserverlib.ClientEvent{} + // Request the missing data from the database + for _, dataType := range dataTypes { + evs, err := rp.accountDB.GetAccountDataByType(localpart, roomID, dataType) + if err != nil { + return nil, err + } + events = append(events, evs...) + } + + // Append the data to the response + if len(roomID) > 0 { + jr := data.Rooms.Join[roomID] + jr.AccountData.Events = events + data.Rooms.Join[roomID] = jr + } else { + data.AccountData.Events = events + } + } + + return data, nil +}