mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-10 16:33:11 -06:00
Merge remote-tracking branch 'origin/master' into markjh/invites
This commit is contained in:
commit
8fafe14f47
|
|
@ -56,6 +56,7 @@ kafka:
|
|||
topics:
|
||||
input_room_event: roomserverInput
|
||||
output_room_event: roomserverOutput
|
||||
output_client_data: clientapiOutput
|
||||
user_updates: userUpdates
|
||||
|
||||
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
||||
|
|
|
|||
|
|
@ -0,0 +1,23 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package authtypes
|
||||
|
||||
// Membership represents the relationship between a user and a room they're a
|
||||
// member of
|
||||
type Membership struct {
|
||||
Localpart string
|
||||
RoomID string
|
||||
EventID string
|
||||
}
|
||||
|
|
@ -0,0 +1,144 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package accounts
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const accountDataSchema = `
|
||||
-- Stores data about accounts data.
|
||||
CREATE TABLE IF NOT EXISTS account_data (
|
||||
-- The Matrix user ID localpart for this account
|
||||
localpart TEXT NOT NULL,
|
||||
-- The room ID for this data (empty string if not specific to a room)
|
||||
room_id TEXT,
|
||||
-- The account data type
|
||||
type TEXT NOT NULL,
|
||||
-- The account data content
|
||||
content TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(localpart, room_id, type)
|
||||
);
|
||||
`
|
||||
|
||||
const insertAccountDataSQL = `
|
||||
INSERT INTO account_data(localpart, room_id, type, content) VALUES($1, $2, $3, $4)
|
||||
ON CONFLICT (localpart, room_id, type) DO UPDATE SET content = EXCLUDED.content
|
||||
`
|
||||
|
||||
const selectAccountDataSQL = "" +
|
||||
"SELECT room_id, type, content FROM account_data WHERE localpart = $1"
|
||||
|
||||
const selectAccountDataByTypeSQL = "" +
|
||||
"SELECT content FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3"
|
||||
|
||||
const deleteAccountDataSQL = "" +
|
||||
"DELETE FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3"
|
||||
|
||||
type accountDataStatements struct {
|
||||
insertAccountDataStmt *sql.Stmt
|
||||
selectAccountDataStmt *sql.Stmt
|
||||
selectAccountDataByTypeStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(accountDataSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectAccountDataStmt, err = db.Prepare(selectAccountDataSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectAccountDataByTypeStmt, err = db.Prepare(selectAccountDataByTypeSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) insertAccountData(localpart string, roomID string, dataType string, content string) (err error) {
|
||||
_, err = s.insertAccountDataStmt.Exec(localpart, roomID, dataType, content)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) selectAccountData(localpart string) (
|
||||
global []gomatrixserverlib.ClientEvent,
|
||||
rooms map[string][]gomatrixserverlib.ClientEvent,
|
||||
err error,
|
||||
) {
|
||||
rows, err := s.selectAccountDataStmt.Query(localpart)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
global = []gomatrixserverlib.ClientEvent{}
|
||||
rooms = make(map[string][]gomatrixserverlib.ClientEvent)
|
||||
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
var dataType string
|
||||
var content []byte
|
||||
|
||||
if err = rows.Scan(&roomID, &dataType, &content); err != nil && err != sql.ErrNoRows {
|
||||
return
|
||||
}
|
||||
|
||||
ac := gomatrixserverlib.ClientEvent{
|
||||
Type: dataType,
|
||||
Content: content,
|
||||
}
|
||||
|
||||
if len(roomID) > 0 {
|
||||
rooms[roomID] = append(rooms[roomID], ac)
|
||||
} else {
|
||||
global = append(global, ac)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) selectAccountDataByType(
|
||||
localpart string, roomID string, dataType string,
|
||||
) (data []gomatrixserverlib.ClientEvent, err error) {
|
||||
data = []gomatrixserverlib.ClientEvent{}
|
||||
|
||||
rows, err := s.selectAccountDataByTypeStmt.Query(localpart, roomID, dataType)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var content []byte
|
||||
|
||||
if err = rows.Scan(&content); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ac := gomatrixserverlib.ClientEvent{
|
||||
Type: dataType,
|
||||
Content: content,
|
||||
}
|
||||
|
||||
data = append(data, ac)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
)
|
||||
|
||||
const membershipSchema = `
|
||||
|
|
@ -38,23 +39,29 @@ CREATE TABLE IF NOT EXISTS memberships (
|
|||
CREATE UNIQUE INDEX IF NOT EXISTS membership_event_id ON memberships(event_id);
|
||||
`
|
||||
|
||||
const insertMembershipSQL = "" +
|
||||
"INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)"
|
||||
const insertMembershipSQL = `
|
||||
INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
|
||||
ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id
|
||||
`
|
||||
|
||||
const selectMembershipSQL = "" +
|
||||
"SELECT * from memberships WHERE localpart = $1 AND room_id = $2"
|
||||
|
||||
const selectMembershipsByLocalpartSQL = "" +
|
||||
"SELECT room_id FROM memberships WHERE localpart = $1"
|
||||
"SELECT room_id, event_id FROM memberships WHERE localpart = $1"
|
||||
|
||||
const deleteMembershipsByEventIDsSQL = "" +
|
||||
"DELETE FROM memberships WHERE event_id = ANY($1)"
|
||||
|
||||
const updateMembershipByEventIDSQL = "" +
|
||||
"UPDATE memberships SET event_id = $2 WHERE event_id = $1"
|
||||
|
||||
type membershipStatements struct {
|
||||
deleteMembershipsByEventIDsStmt *sql.Stmt
|
||||
insertMembershipStmt *sql.Stmt
|
||||
selectMembershipByEventIDStmt *sql.Stmt
|
||||
selectMembershipsByLocalpartStmt *sql.Stmt
|
||||
updateMembershipByEventIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
|
||||
|
|
@ -71,6 +78,9 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.updateMembershipByEventIDStmt, err = db.Prepare(updateMembershipByEventIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -83,3 +93,29 @@ func (s *membershipStatements) deleteMembershipsByEventIDs(eventIDs []string, tx
|
|||
_, err = txn.Stmt(s.deleteMembershipsByEventIDsStmt).Exec(pq.StringArray(eventIDs))
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) selectMembershipsByLocalpart(localpart string) (memberships []authtypes.Membership, err error) {
|
||||
rows, err := s.selectMembershipsByLocalpartStmt.Query(localpart)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
memberships = []authtypes.Membership{}
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var m authtypes.Membership
|
||||
m.Localpart = localpart
|
||||
if err := rows.Scan(&m.RoomID, &m.EventID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
memberships = append(memberships, m)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) updateMembershipByEventID(oldEventID string, newEventID string) (err error) {
|
||||
_, err = s.updateMembershipByEventIDStmt.Exec(oldEventID, newEventID)
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,12 +27,13 @@ import (
|
|||
|
||||
// Database represents an account database
|
||||
type Database struct {
|
||||
db *sql.DB
|
||||
partitions common.PartitionOffsetStatements
|
||||
accounts accountsStatements
|
||||
profiles profilesStatements
|
||||
memberships membershipStatements
|
||||
serverName gomatrixserverlib.ServerName
|
||||
db *sql.DB
|
||||
partitions common.PartitionOffsetStatements
|
||||
accounts accountsStatements
|
||||
profiles profilesStatements
|
||||
memberships membershipStatements
|
||||
accountDatas accountDataStatements
|
||||
serverName gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
// NewDatabase creates a new accounts and profiles database
|
||||
|
|
@ -58,7 +59,11 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName)
|
|||
if err = m.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Database{db, partitions, a, p, m, serverName}, nil
|
||||
ac := accountDataStatements{}
|
||||
if err = ac.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Database{db, partitions, a, p, m, ac, serverName}, nil
|
||||
}
|
||||
|
||||
// GetAccountByPassword returns the account associated with the given localpart and password.
|
||||
|
|
@ -151,6 +156,21 @@ func (d *Database) UpdateMemberships(eventsToAdd []gomatrixserverlib.Event, idsT
|
|||
})
|
||||
}
|
||||
|
||||
// GetMembershipsByLocalpart returns an array containing the IDs of all the rooms
|
||||
// a user matching a given localpart is a member of
|
||||
// If no membership match the given localpart, returns an empty array
|
||||
// If there was an issue during the retrieval, returns the SQL error
|
||||
func (d *Database) GetMembershipsByLocalpart(localpart string) (memberships []authtypes.Membership, err error) {
|
||||
return d.memberships.selectMembershipsByLocalpart(localpart)
|
||||
}
|
||||
|
||||
// UpdateMembership update the "join" membership event ID of a membership.
|
||||
// This is useful in case of membership upgrade (e.g. profile update)
|
||||
// If there was an issue during the update, returns the SQL error
|
||||
func (d *Database) UpdateMembership(oldEventID string, newEventID string) error {
|
||||
return d.memberships.updateMembershipByEventID(oldEventID, newEventID)
|
||||
}
|
||||
|
||||
// newMembership will save a new membership in the database if the given state
|
||||
// event is a "join" membership event
|
||||
// If the event isn't a "join" membership event, does nothing
|
||||
|
|
@ -184,6 +204,34 @@ func (d *Database) newMembership(ev gomatrixserverlib.Event, txn *sql.Tx) error
|
|||
return nil
|
||||
}
|
||||
|
||||
// SaveAccountData saves new account data for a given user and a given room.
|
||||
// If the account data is not specific to a room, the room ID should be an empty string
|
||||
// If an account data already exists for a given set (user, room, data type), it will
|
||||
// update the corresponding row with the new content
|
||||
// Returns a SQL error if there was an issue with the insertion/update
|
||||
func (d *Database) SaveAccountData(localpart string, roomID string, dataType string, content string) error {
|
||||
return d.accountDatas.insertAccountData(localpart, roomID, dataType, content)
|
||||
}
|
||||
|
||||
// GetAccountData returns account data related to a given localpart
|
||||
// If no account data could be found, returns an empty arrays
|
||||
// Returns an error if there was an issue with the retrieval
|
||||
func (d *Database) GetAccountData(localpart string) (
|
||||
global []gomatrixserverlib.ClientEvent,
|
||||
rooms map[string][]gomatrixserverlib.ClientEvent,
|
||||
err error,
|
||||
) {
|
||||
return d.accountDatas.selectAccountData(localpart)
|
||||
}
|
||||
|
||||
// GetAccountDataByType returns account data matching a given
|
||||
// localpart, room ID and type.
|
||||
// If no account data could be found, returns an empty array
|
||||
// Returns an error if there was an issue with the retrieval
|
||||
func (d *Database) GetAccountDataByType(localpart string, roomID string, dataType string) (data []gomatrixserverlib.ClientEvent, err error) {
|
||||
return d.accountDatas.selectAccountDataByType(localpart, roomID, dataType)
|
||||
}
|
||||
|
||||
func hashPassword(plaintext string) (hash string, err error) {
|
||||
hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost)
|
||||
return string(hashBytes), err
|
||||
|
|
|
|||
|
|
@ -107,6 +107,11 @@ func (s *OutputRoomEvent) lookupStateEvents(
|
|||
) ([]gomatrixserverlib.Event, error) {
|
||||
// Fast path if there aren't any new state events.
|
||||
if len(addsStateEventIDs) == 0 {
|
||||
// If the event is a membership update (e.g. for a profile update), it won't
|
||||
// show up in AddsStateEventIDs, so we need to add it manually
|
||||
if event.Type() == "m.room.member" {
|
||||
return []gomatrixserverlib.Event{event}, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,65 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package producers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
// SyncAPIProducer produces events for the sync API server to consume
|
||||
type SyncAPIProducer struct {
|
||||
Topic string
|
||||
Producer sarama.SyncProducer
|
||||
}
|
||||
|
||||
// NewSyncAPIProducer creates a new SyncAPIProducer
|
||||
func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) {
|
||||
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SyncAPIProducer{
|
||||
Topic: topic,
|
||||
Producer: producer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SendData sends account data to the sync API server
|
||||
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
|
||||
var m sarama.ProducerMessage
|
||||
|
||||
data := common.AccountData{
|
||||
RoomID: roomID,
|
||||
Type: dataType,
|
||||
}
|
||||
value, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Topic = string(p.Topic)
|
||||
m.Key = sarama.StringEncoder(userID)
|
||||
m.Value = sarama.ByteEncoder(value)
|
||||
|
||||
if _, _, err := p.Producer.SendMessage(&m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package readers
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// SaveAccountData implements PUT /user/{userId}/[rooms/{roomId}/]account_data/{type}
|
||||
func SaveAccountData(
|
||||
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
||||
userID string, roomID string, dataType string, syncProducer *producers.SyncAPIProducer,
|
||||
) util.JSONResponse {
|
||||
if req.Method != "PUT" {
|
||||
return util.JSONResponse{
|
||||
Code: 405,
|
||||
JSON: jsonerror.NotFound("Bad method"),
|
||||
}
|
||||
}
|
||||
|
||||
if userID != device.UserID {
|
||||
return util.JSONResponse{
|
||||
Code: 403,
|
||||
JSON: jsonerror.Forbidden("userID does not match the current user"),
|
||||
}
|
||||
}
|
||||
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
defer req.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := accountDB.SaveAccountData(localpart, roomID, dataType, string(body)); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := syncProducer.SendData(userID, roomID, dataType); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
|
@ -15,13 +15,13 @@
|
|||
package readers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -34,6 +34,7 @@ func DirectoryRoom(
|
|||
roomAlias string,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
cfg *config.Dendrite,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
) util.JSONResponse {
|
||||
_, domain, err := gomatrixserverlib.SplitID('#', roomAlias)
|
||||
if err != nil {
|
||||
|
|
@ -43,11 +44,30 @@ func DirectoryRoom(
|
|||
}
|
||||
}
|
||||
|
||||
var resp gomatrixserverlib.RespDirectory
|
||||
|
||||
if domain == cfg.Matrix.ServerName {
|
||||
// TODO: Implement lookup up local room aliases.
|
||||
panic(fmt.Errorf("Looking up local room aliases is not implemented"))
|
||||
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
|
||||
var queryRes api.GetAliasRoomIDResponse
|
||||
if err = aliasAPI.GetAliasRoomID(&queryReq, &queryRes); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if len(queryRes.RoomID) > 0 {
|
||||
// TODO: List servers that are aware of this room alias
|
||||
resp = gomatrixserverlib.RespDirectory{
|
||||
RoomID: queryRes.RoomID,
|
||||
Servers: []gomatrixserverlib.ServerName{},
|
||||
}
|
||||
} else {
|
||||
// If the response doesn't contain a non-empty string, return an error
|
||||
return util.JSONResponse{
|
||||
Code: 404,
|
||||
JSON: jsonerror.NotFound("Room alias " + roomAlias + " not found."),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
resp, err := federation.LookupRoomAlias(domain, roomAlias)
|
||||
resp, err = federation.LookupRoomAlias(domain, roomAlias)
|
||||
if err != nil {
|
||||
switch x := err.(type) {
|
||||
case gomatrix.HTTPError:
|
||||
|
|
@ -62,10 +82,88 @@ func DirectoryRoom(
|
|||
// TODO: Return 504 if the remote server timed out.
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: resp,
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: resp,
|
||||
}
|
||||
}
|
||||
|
||||
// SetLocalAlias implements PUT /directory/room/{roomAlias}
|
||||
// TODO: Check if the user has the power level to set an alias
|
||||
func SetLocalAlias(
|
||||
req *http.Request,
|
||||
device *authtypes.Device,
|
||||
alias string,
|
||||
cfg *config.Dendrite,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
) util.JSONResponse {
|
||||
_, domain, err := gomatrixserverlib.SplitID('#', alias)
|
||||
if err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: 400,
|
||||
JSON: jsonerror.BadJSON("Room alias must be in the form '#localpart:domain'"),
|
||||
}
|
||||
}
|
||||
|
||||
if domain != cfg.Matrix.ServerName {
|
||||
return util.JSONResponse{
|
||||
Code: 403,
|
||||
JSON: jsonerror.Forbidden("Alias must be on local homeserver"),
|
||||
}
|
||||
}
|
||||
|
||||
var r struct {
|
||||
RoomID string `json:"room_id"`
|
||||
}
|
||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||
return *resErr
|
||||
}
|
||||
|
||||
queryReq := api.SetRoomAliasRequest{
|
||||
UserID: device.UserID,
|
||||
RoomID: r.RoomID,
|
||||
Alias: alias,
|
||||
}
|
||||
var queryRes api.SetRoomAliasResponse
|
||||
if err := aliasAPI.SetRoomAlias(&queryReq, &queryRes); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if queryRes.AliasExists {
|
||||
return util.JSONResponse{
|
||||
Code: 409,
|
||||
JSON: jsonerror.Unknown("The alias " + alias + " already exists."),
|
||||
}
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveLocalAlias implements DELETE /directory/room/{roomAlias}
|
||||
// TODO: Check if the user has the power level to remove an alias
|
||||
func RemoveLocalAlias(
|
||||
req *http.Request,
|
||||
device *authtypes.Device,
|
||||
alias string,
|
||||
cfg *config.Dendrite,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
) util.JSONResponse {
|
||||
queryReq := api.RemoveRoomAliasRequest{
|
||||
Alias: alias,
|
||||
UserID: device.UserID,
|
||||
}
|
||||
var queryRes api.RemoveRoomAliasResponse
|
||||
if err := aliasAPI.RemoveRoomAlias(&queryReq, &queryRes); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
|
|
@ -35,7 +36,11 @@ func Logout(
|
|||
}
|
||||
}
|
||||
|
||||
localpart := getLocalPart(device.UserID)
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := deviceDB.RemoveDevice(device.ID, localpart); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,12 +17,17 @@ package readers
|
|||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
|
@ -50,7 +55,11 @@ func GetProfile(
|
|||
JSON: jsonerror.NotFound("Bad method"),
|
||||
}
|
||||
}
|
||||
localpart := getLocalPart(userID)
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
profile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
|
|
@ -69,7 +78,11 @@ func GetProfile(
|
|||
func GetAvatarURL(
|
||||
req *http.Request, accountDB *accounts.Database, userID string,
|
||||
) util.JSONResponse {
|
||||
localpart := getLocalPart(userID)
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
profile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
|
|
@ -85,9 +98,19 @@ func GetAvatarURL(
|
|||
|
||||
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
||||
func SetAvatarURL(
|
||||
req *http.Request, accountDB *accounts.Database, userID string,
|
||||
producer *producers.UserUpdateProducer,
|
||||
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
||||
userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite,
|
||||
rsProducer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
|
||||
) util.JSONResponse {
|
||||
if userID != device.UserID {
|
||||
return util.JSONResponse{
|
||||
Code: 403,
|
||||
JSON: jsonerror.Forbidden("userID does not match the current user"),
|
||||
}
|
||||
}
|
||||
|
||||
changedKey := "avatar_url"
|
||||
|
||||
var r avatarURL
|
||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||
return *resErr
|
||||
|
|
@ -99,18 +122,41 @@ func SetAvatarURL(
|
|||
}
|
||||
}
|
||||
|
||||
localpart := getLocalPart(userID)
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
|
||||
if err = accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := producer.SendUpdate(userID, "avatar_url", oldProfile.AvatarURL, r.AvatarURL); err != nil {
|
||||
memberships, err := accountDB.GetMembershipsByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
newProfile := authtypes.Profile{
|
||||
Localpart: localpart,
|
||||
DisplayName: oldProfile.DisplayName,
|
||||
AvatarURL: r.AvatarURL,
|
||||
}
|
||||
|
||||
events, err := buildMembershipEvents(memberships, accountDB, newProfile, userID, cfg, queryAPI)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := producer.SendUpdate(userID, changedKey, oldProfile.AvatarURL, r.AvatarURL); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
|
|
@ -124,7 +170,11 @@ func SetAvatarURL(
|
|||
func GetDisplayName(
|
||||
req *http.Request, accountDB *accounts.Database, userID string,
|
||||
) util.JSONResponse {
|
||||
localpart := getLocalPart(userID)
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
profile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
|
|
@ -140,9 +190,19 @@ func GetDisplayName(
|
|||
|
||||
// SetDisplayName implements PUT /profile/{userID}/displayname
|
||||
func SetDisplayName(
|
||||
req *http.Request, accountDB *accounts.Database, userID string,
|
||||
producer *producers.UserUpdateProducer,
|
||||
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
||||
userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite,
|
||||
rsProducer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
|
||||
) util.JSONResponse {
|
||||
if userID != device.UserID {
|
||||
return util.JSONResponse{
|
||||
Code: 403,
|
||||
JSON: jsonerror.Forbidden("userID does not match the current user"),
|
||||
}
|
||||
}
|
||||
|
||||
changedKey := "displayname"
|
||||
|
||||
var r displayName
|
||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||
return *resErr
|
||||
|
|
@ -154,18 +214,41 @@ func SetDisplayName(
|
|||
}
|
||||
}
|
||||
|
||||
localpart := getLocalPart(userID)
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
|
||||
if err = accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := producer.SendUpdate(userID, "displayname", oldProfile.DisplayName, r.DisplayName); err != nil {
|
||||
memberships, err := accountDB.GetMembershipsByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
newProfile := authtypes.Profile{
|
||||
Localpart: localpart,
|
||||
DisplayName: r.DisplayName,
|
||||
AvatarURL: oldProfile.AvatarURL,
|
||||
}
|
||||
|
||||
events, err := buildMembershipEvents(memberships, accountDB, newProfile, userID, cfg, queryAPI)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := producer.SendUpdate(userID, changedKey, oldProfile.DisplayName, r.DisplayName); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
|
|
@ -175,13 +258,71 @@ func SetDisplayName(
|
|||
}
|
||||
}
|
||||
|
||||
func getLocalPart(userID string) string {
|
||||
if !strings.HasPrefix(userID, "@") {
|
||||
panic(fmt.Errorf("Invalid user ID"))
|
||||
func buildMembershipEvents(
|
||||
memberships []authtypes.Membership, db *accounts.Database,
|
||||
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
) ([]gomatrixserverlib.Event, error) {
|
||||
evs := []gomatrixserverlib.Event{}
|
||||
|
||||
for _, membership := range memberships {
|
||||
builder := gomatrixserverlib.EventBuilder{
|
||||
Sender: userID,
|
||||
RoomID: membership.RoomID,
|
||||
Type: "m.room.member",
|
||||
StateKey: &userID,
|
||||
}
|
||||
|
||||
content := events.MemberContent{
|
||||
Membership: "join",
|
||||
}
|
||||
|
||||
content.DisplayName = newProfile.DisplayName
|
||||
content.AvatarURL = newProfile.AvatarURL
|
||||
|
||||
if err := builder.SetContent(content); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&builder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ask the roomserver for information about this room
|
||||
queryReq := api.QueryLatestEventsAndStateRequest{
|
||||
RoomID: membership.RoomID,
|
||||
StateToFetch: eventsNeeded.Tuples(),
|
||||
}
|
||||
var queryRes api.QueryLatestEventsAndStateResponse
|
||||
if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
builder.Depth = queryRes.Depth
|
||||
builder.PrevEvents = queryRes.LatestEvents
|
||||
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
|
||||
for i := range queryRes.StateEvents {
|
||||
authEvents.AddEvent(&queryRes.StateEvents[i])
|
||||
}
|
||||
|
||||
refs, err := eventsNeeded.AuthEventReferences(&authEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
builder.AuthEvents = refs
|
||||
|
||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||
now := time.Now()
|
||||
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evs = append(evs, event)
|
||||
}
|
||||
|
||||
// Get the part before ":"
|
||||
username := strings.Split(userID, ":")[0]
|
||||
// Return the part after the "@"
|
||||
return strings.Split(username, "@")[1]
|
||||
return evs, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,11 +42,13 @@ const pathPrefixUnstable = "/_matrix/client/unstable"
|
|||
func Setup(
|
||||
servMux *http.ServeMux, httpClient *http.Client, cfg config.Dendrite,
|
||||
producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
accountDB *accounts.Database,
|
||||
deviceDB *devices.Database,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
keyRing gomatrixserverlib.KeyRing,
|
||||
userUpdateProducer *producers.UserUpdateProducer,
|
||||
syncProducer *producers.SyncAPIProducer,
|
||||
) {
|
||||
apiMux := mux.NewRouter()
|
||||
|
||||
|
|
@ -70,14 +72,14 @@ func Setup(
|
|||
|
||||
r0mux.Handle("/createRoom",
|
||||
common.MakeAuthAPI("createRoom", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
return writers.CreateRoom(req, device, cfg, producer)
|
||||
return writers.CreateRoom(req, device, cfg, producer, accountDB)
|
||||
}),
|
||||
)
|
||||
r0mux.Handle("/join/{roomIDOrAlias}",
|
||||
common.MakeAuthAPI("join", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return writers.JoinRoomByIDOrAlias(
|
||||
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, keyRing,
|
||||
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, aliasAPI, keyRing, accountDB,
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
|
@ -109,9 +111,23 @@ func Setup(
|
|||
r0mux.Handle("/directory/room/{roomAlias}",
|
||||
common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.DirectoryRoom(req, device, vars["roomAlias"], federation, &cfg)
|
||||
return readers.DirectoryRoom(req, device, vars["roomAlias"], federation, &cfg, aliasAPI)
|
||||
}),
|
||||
)
|
||||
).Methods("GET")
|
||||
|
||||
r0mux.Handle("/directory/room/{roomAlias}",
|
||||
common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.SetLocalAlias(req, device, vars["roomAlias"], &cfg, aliasAPI)
|
||||
}),
|
||||
).Methods("PUT", "OPTIONS")
|
||||
|
||||
r0mux.Handle("/directory/room/{roomAlias}",
|
||||
common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.RemoveLocalAlias(req, device, vars["roomAlias"], &cfg, aliasAPI)
|
||||
}),
|
||||
).Methods("DELETE")
|
||||
|
||||
r0mux.Handle("/logout",
|
||||
common.MakeAuthAPI("logout", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
|
|
@ -185,7 +201,7 @@ func Setup(
|
|||
r0mux.Handle("/profile/{userID}/avatar_url",
|
||||
common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.SetAvatarURL(req, accountDB, vars["userID"], userUpdateProducer)
|
||||
return readers.SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
|
||||
}),
|
||||
).Methods("PUT", "OPTIONS")
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
@ -201,7 +217,7 @@ func Setup(
|
|||
r0mux.Handle("/profile/{userID}/displayname",
|
||||
common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.SetDisplayName(req, accountDB, vars["userID"], userUpdateProducer)
|
||||
return readers.SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
|
||||
}),
|
||||
).Methods("PUT", "OPTIONS")
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
@ -274,9 +290,16 @@ func Setup(
|
|||
)
|
||||
|
||||
r0mux.Handle("/user/{userID}/account_data/{type}",
|
||||
common.MakeAPI("user_account_data", func(req *http.Request) util.JSONResponse {
|
||||
// TODO: Set and get the account_data
|
||||
return util.JSONResponse{Code: 200, JSON: struct{}{}}
|
||||
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"], syncProducer)
|
||||
}),
|
||||
)
|
||||
|
||||
r0mux.Handle("/user/{userID}/rooms/{roomID}/account_data/{type}",
|
||||
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"], syncProducer)
|
||||
}),
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
|
|
@ -84,15 +85,21 @@ type fledglingEvent struct {
|
|||
}
|
||||
|
||||
// CreateRoom implements /createRoom
|
||||
func CreateRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, producer *producers.RoomserverProducer) util.JSONResponse {
|
||||
func CreateRoom(req *http.Request, device *authtypes.Device,
|
||||
cfg config.Dendrite, producer *producers.RoomserverProducer,
|
||||
accountDB *accounts.Database,
|
||||
) util.JSONResponse {
|
||||
// TODO: Check room ID doesn't clash with an existing one, and we
|
||||
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
|
||||
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||
return createRoom(req, device, cfg, roomID, producer)
|
||||
return createRoom(req, device, cfg, roomID, producer, accountDB)
|
||||
}
|
||||
|
||||
// createRoom implements /createRoom
|
||||
func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer) util.JSONResponse {
|
||||
func createRoom(req *http.Request, device *authtypes.Device,
|
||||
cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer,
|
||||
accountDB *accounts.Database,
|
||||
) util.JSONResponse {
|
||||
logger := util.GetLogger(req.Context())
|
||||
userID := device.UserID
|
||||
var r createRoomRequest
|
||||
|
|
@ -115,6 +122,22 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite
|
|||
"roomID": roomID,
|
||||
}).Info("Creating new room")
|
||||
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
profile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
membershipContent := events.MemberContent{
|
||||
Membership: "join",
|
||||
DisplayName: profile.DisplayName,
|
||||
AvatarURL: profile.AvatarURL,
|
||||
}
|
||||
|
||||
var builtEvents []gomatrixserverlib.Event
|
||||
|
||||
// send events into the room in order of:
|
||||
|
|
@ -137,7 +160,7 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite
|
|||
// TODO: Synapse has txn/token ID on each event. Do we need to do this here?
|
||||
eventsToMake := []fledglingEvent{
|
||||
{"m.room.create", "", events.CreateContent{Creator: userID}},
|
||||
{"m.room.member", userID, events.MemberContent{Membership: "join"}}, // TODO: Set avatar_url / displayname
|
||||
{"m.room.member", userID, membershipContent},
|
||||
{"m.room.power_levels", "", events.InitialPowerLevelsContent(userID)},
|
||||
// TODO: m.room.canonical_alias
|
||||
{"m.room.join_rules", "", events.JoinRulesContent{"public"}}, // FIXME: Allow this to be changed
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
|
|
@ -41,16 +42,30 @@ func JoinRoomByIDOrAlias(
|
|||
federation *gomatrixserverlib.FederationClient,
|
||||
producer *producers.RoomserverProducer,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
keyRing gomatrixserverlib.KeyRing,
|
||||
accountDB *accounts.Database,
|
||||
) util.JSONResponse {
|
||||
var content map[string]interface{} // must be a JSON object
|
||||
if resErr := httputil.UnmarshalJSONRequest(req, &content); resErr != nil {
|
||||
return *resErr
|
||||
}
|
||||
|
||||
content["membership"] = "join"
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
r := joinRoomReq{req, content, device.UserID, cfg, federation, producer, queryAPI, keyRing}
|
||||
profile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
content["membership"] = "join"
|
||||
content["displayname"] = profile.DisplayName
|
||||
content["avatar_url"] = profile.AvatarURL
|
||||
|
||||
r := joinRoomReq{req, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing}
|
||||
|
||||
if strings.HasPrefix(roomIDOrAlias, "!") {
|
||||
return r.joinRoomByID()
|
||||
|
|
@ -72,6 +87,7 @@ type joinRoomReq struct {
|
|||
federation *gomatrixserverlib.FederationClient
|
||||
producer *producers.RoomserverProducer
|
||||
queryAPI api.RoomserverQueryAPI
|
||||
aliasAPI api.RoomserverAliasAPI
|
||||
keyRing gomatrixserverlib.KeyRing
|
||||
}
|
||||
|
||||
|
|
@ -97,11 +113,23 @@ func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
|
|||
}
|
||||
}
|
||||
if domain == r.cfg.Matrix.ServerName {
|
||||
// TODO: Implement joining local room aliases.
|
||||
panic(fmt.Errorf("Joining local room aliases is not implemented"))
|
||||
} else {
|
||||
return r.joinRoomByRemoteAlias(domain, roomAlias)
|
||||
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
|
||||
var queryRes api.GetAliasRoomIDResponse
|
||||
if err = r.aliasAPI.GetAliasRoomID(&queryReq, &queryRes); err != nil {
|
||||
return httputil.LogThenError(r.req, err)
|
||||
}
|
||||
|
||||
if len(queryRes.RoomID) > 0 {
|
||||
return r.joinRoomUsingServers(queryRes.RoomID, []gomatrixserverlib.ServerName{r.cfg.Matrix.ServerName})
|
||||
}
|
||||
// If the response doesn't contain a non-empty string, return an error
|
||||
return util.JSONResponse{
|
||||
Code: 404,
|
||||
JSON: jsonerror.NotFound("Room alias " + roomAlias + " not found."),
|
||||
}
|
||||
}
|
||||
// If the room isn't local, use federation to join
|
||||
return r.joinRoomByRemoteAlias(domain, roomAlias)
|
||||
}
|
||||
|
||||
func (r joinRoomReq) joinRoomByRemoteAlias(
|
||||
|
|
@ -126,7 +154,7 @@ func (r joinRoomReq) joinRoomByRemoteAlias(
|
|||
|
||||
func (r joinRoomReq) writeToBuilder(eb *gomatrixserverlib.EventBuilder, roomID string) {
|
||||
eb.Type = "m.room.member"
|
||||
eb.SetContent(r.content) // TODO: Set avatar_url / displayname
|
||||
eb.SetContent(r.content)
|
||||
eb.SetUnsigned(struct{}{})
|
||||
eb.Sender = r.userID
|
||||
eb.StateKey = &r.userID
|
||||
|
|
@ -156,9 +184,44 @@ func (r joinRoomReq) joinRoomUsingServers(
|
|||
}
|
||||
|
||||
if queryRes.RoomExists {
|
||||
// TODO: Implement joining rooms that already the server is already in.
|
||||
// This should just fall through to the usual event sending code.
|
||||
panic(fmt.Errorf("Joining rooms that the server already in is not implemented"))
|
||||
// The room exists in the local database, so we just have to send a join
|
||||
// membership event and return the room ID
|
||||
// TODO: Check if the user is allowed in the room (has been invited if
|
||||
// the room is invite-only)
|
||||
eb.Depth = queryRes.Depth
|
||||
eb.PrevEvents = queryRes.LatestEvents
|
||||
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
|
||||
for i := range queryRes.StateEvents {
|
||||
authEvents.AddEvent(&queryRes.StateEvents[i])
|
||||
}
|
||||
|
||||
refs, err := needed.AuthEventReferences(&authEvents)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(r.req, err)
|
||||
}
|
||||
eb.AuthEvents = refs
|
||||
|
||||
now := time.Now()
|
||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.Matrix.ServerName)
|
||||
event, err := eb.Build(
|
||||
eventID, now, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey,
|
||||
)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(r.req, err)
|
||||
}
|
||||
|
||||
if err := r.producer.SendEvents([]gomatrixserverlib.Event{event}, r.cfg.Matrix.ServerName); err != nil {
|
||||
return httputil.LogThenError(r.req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: struct {
|
||||
RoomID string `json:"room_id"`
|
||||
}{roomID},
|
||||
}
|
||||
}
|
||||
|
||||
if len(servers) == 0 {
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ func main() {
|
|||
log.Info("config: ", cfg)
|
||||
|
||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||
aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil)
|
||||
|
||||
roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL())
|
||||
userUpdateProducer, err := producers.NewUserUpdateProducer(
|
||||
|
|
@ -60,6 +61,12 @@ func main() {
|
|||
if err != nil {
|
||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
||||
}
|
||||
syncProducer, err := producers.NewSyncAPIProducer(
|
||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData),
|
||||
)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
||||
}
|
||||
|
||||
federation := gomatrixserverlib.NewFederationClient(
|
||||
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||
|
|
@ -97,7 +104,8 @@ func main() {
|
|||
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
|
||||
routing.Setup(
|
||||
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
||||
queryAPI, accountDB, deviceDB, federation, keyRing, userUpdateProducer,
|
||||
queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing,
|
||||
userUpdateProducer, syncProducer,
|
||||
)
|
||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import (
|
|||
|
||||
var (
|
||||
logDir = os.Getenv("LOG_DIR")
|
||||
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
||||
configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/alias"
|
||||
"github.com/matrix-org/dendrite/roomserver/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/query"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
|
|
@ -32,7 +33,7 @@ import (
|
|||
|
||||
var (
|
||||
logDir = os.Getenv("LOG_DIR")
|
||||
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
||||
configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -58,12 +59,6 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
queryAPI := query.RoomserverQueryAPI{
|
||||
DB: db,
|
||||
}
|
||||
|
||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
inputAPI := input.RoomserverInputAPI{
|
||||
DB: db,
|
||||
Producer: kafkaProducer,
|
||||
|
|
@ -72,6 +67,19 @@ func main() {
|
|||
|
||||
inputAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
queryAPI := query.RoomserverQueryAPI{db}
|
||||
|
||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
aliasAPI := alias.RoomserverAliasAPI{
|
||||
DB: db,
|
||||
Cfg: cfg,
|
||||
InputAPI: inputAPI,
|
||||
QueryAPI: queryAPI,
|
||||
}
|
||||
|
||||
aliasAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
||||
|
||||
log.Info("Started room server on ", cfg.Listen.RoomServer)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
|
|
@ -58,6 +59,11 @@ func main() {
|
|||
log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err)
|
||||
}
|
||||
|
||||
adb, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName)
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create account database with data source %s : %s", cfg.Database.Account, err)
|
||||
}
|
||||
|
||||
pos, err := db.SyncStreamPosition()
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to get latest sync stream position : %s", err)
|
||||
|
|
@ -67,15 +73,22 @@ func main() {
|
|||
if err = n.Load(db); err != nil {
|
||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||
}
|
||||
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
||||
roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||
}
|
||||
if err = consumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer")
|
||||
if err = roomConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||
}
|
||||
clientConsumer, err := consumers.NewOutputClientData(cfg, n, db)
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create client API server consumer: %s", err)
|
||||
}
|
||||
if err = clientConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||
}
|
||||
|
||||
log.Info("Starting sync server on ", cfg.Listen.SyncAPI)
|
||||
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n), deviceDB)
|
||||
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n, adb), deviceDB)
|
||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.SyncAPI), nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ var (
|
|||
)
|
||||
|
||||
const inputTopic = "syncserverInput"
|
||||
const clientTopic = "clientapiserverOutput"
|
||||
|
||||
var exe = test.KafkaExecutor{
|
||||
ZookeeperURI: zookeeperURI,
|
||||
|
|
@ -134,6 +135,7 @@ func startSyncServer() (*exec.Cmd, chan error) {
|
|||
cfg.Matrix.ServerName = "localhost"
|
||||
cfg.Listen.SyncAPI = config.Address(syncserverAddr)
|
||||
cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic)
|
||||
cfg.Kafka.Topics.OutputClientData = config.Topic(clientTopic)
|
||||
|
||||
if err := test.WriteConfig(cfg, dir); err != nil {
|
||||
panic(err)
|
||||
|
|
@ -177,6 +179,10 @@ func prepareKafka() {
|
|||
if err := exe.CreateTopic(inputTopic); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
exe.DeleteTopic(clientTopic)
|
||||
if err := exe.CreateTopic(clientTopic); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
|
||||
|
|
|
|||
|
|
@ -71,11 +71,11 @@ var outputRoomEventTestData = []string{
|
|||
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost"
|
||||
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"membership":"invite"},"depth":0,"event_id":"$zzLHVlHIWPrnE7DI:localhost","hashes":{"sha256":"LKk7tnYJAHsyffbi9CzfdP+TU4KQ5g6YTgYGKjJ7NxU"},"origin":"localhost","origin_server_ts":1494411709192,"prev_events":[["$4NBTdIwDxq5fDGpv:localhost",{"sha256":"EpqmxEoJP93Zb2Nt2fS95SJWTqqIutHm/Ne8OHqp6Ps"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"GdUzkC+7YKl1XDi7kYuD39yi2L/+nv+YrecIQHS+0BLDQqnEj+iRXfNBuZfTk6lUBCJCHXZlk7MnEIjvWDlZCg"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"adds_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$4NBTdIwDxq5fDGpv:localhost"}}`,
|
||||
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
|
||||
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"adds_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"removes_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$zzLHVlHIWPrnE7DI:localhost"}}`,
|
||||
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"unsigned":{"prev_content":{"membership":"invite"},"prev_sender":"@bob:localhost","replaces_state":"$zzLHVlHIWPrnE7DI:localhost"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"adds_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"removes_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$zzLHVlHIWPrnE7DI:localhost"}}`,
|
||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
||||
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"not charlie...","msgtype":"m.text"},"depth":0,"event_id":"$Ixfn5WT9ocWTYxfy:localhost","hashes":{"sha256":"hRChdyMQ3AY4jvrPpI8PEX6Taux83Qo5hdSeHlhPxGo"},"origin":"localhost","origin_server_ts":1494411792737,"prev_events":[["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"LC/Zqwu/XdqjmLdTOp/NQaFaE0niSAGgEpa39gCxsnsqEX80P7P5WDn/Kzx6rjWTnhIszrLsnoycqkXQT0Z4DQ"}},"type":"m.room.message"},"latest_event_ids":["$Ixfn5WT9ocWTYxfy:localhost"],"last_sent_event_id":"$uJVKyzZi8ZX0kOd9:localhost"}}`,
|
||||
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost"
|
||||
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"adds_state_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"removes_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"last_sent_event_id":"$Ixfn5WT9ocWTYxfy:localhost"}}`,
|
||||
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"unsigned":{"prev_content":{"membership":"join"},"prev_sender":"@charlie:localhost","replaces_state":"$uJVKyzZi8ZX0kOd9:localhost"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"adds_state_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"removes_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"last_sent_event_id":"$Ixfn5WT9ocWTYxfy:localhost"}}`,
|
||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
|
||||
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"why did you kick charlie","msgtype":"m.text"},"depth":0,"event_id":"$hgao5gTmr3r9TtK2:localhost","hashes":{"sha256":"Aa2ZCrvwjX5xhvkVqIOFUeEGqrnrQZjjNFiZRybjsPY"},"origin":"localhost","origin_server_ts":1494411912809,"prev_events":[["$om1F4AI8tCYlHUSp:localhost",{"sha256":"yVs+CW7AiJrJOYouL8xPIBrtIHAhnbxaegna8MxeCto"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"sGkpbEXGsvAuCvE3wb5E9H5fjCVKpRdWNt6csj1bCB9Fmg4Rg4mvj3TAJ+91DjO8IPsgSxDKdqqRYF0OtcynBA"}},"type":"m.room.message"},"latest_event_ids":["$hgao5gTmr3r9TtK2:localhost"],"last_sent_event_id":"$om1F4AI8tCYlHUSp:localhost"}}`,
|
||||
// $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
||||
|
|
|
|||
|
|
@ -98,6 +98,8 @@ type Dendrite struct {
|
|||
Topics struct {
|
||||
// Topic for roomserver/api.OutputRoomEvent events.
|
||||
OutputRoomEvent Topic `yaml:"output_room_event"`
|
||||
// Topic for sending account data from client API to sync API
|
||||
OutputClientData Topic `yaml:"output_client_data"`
|
||||
// Topic for user updates (profile, presence)
|
||||
UserUpdates Topic `yaml:"user_updates"`
|
||||
}
|
||||
|
|
@ -298,6 +300,7 @@ func (config *Dendrite) check() error {
|
|||
|
||||
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
|
||||
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
||||
checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
||||
checkNotEmpty("database.account", string(config.Database.Account))
|
||||
checkNotEmpty("database.device", string(config.Database.Device))
|
||||
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ kafka:
|
|||
topics:
|
||||
input_room_event: input.room
|
||||
output_room_event: output.room
|
||||
output_client_data: output.client
|
||||
database:
|
||||
media_api: "postgresql:///media_api"
|
||||
account: "postgresql:///account"
|
||||
|
|
|
|||
|
|
@ -82,6 +82,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
|||
// TODO: Different servers should be using different topics.
|
||||
// Make this configurable somehow?
|
||||
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
||||
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
||||
|
||||
// TODO: Use different databases for the different schemas.
|
||||
// Using the same database for every schema currently works because
|
||||
|
|
|
|||
22
src/github.com/matrix-org/dendrite/common/types.go
Normal file
22
src/github.com/matrix-org/dendrite/common/types.go
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package common
|
||||
|
||||
// AccountData represents account data sent from the client API server to the
|
||||
// sync API server
|
||||
type AccountData struct {
|
||||
RoomID string `json:"room_id"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
256
src/github.com/matrix-org/dendrite/roomserver/alias/alias.go
Normal file
256
src/github.com/matrix-org/dendrite/roomserver/alias/alias.go
Normal file
|
|
@ -0,0 +1,256 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package alias
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/query"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// RoomserverAliasAPIDatabase has the storage APIs needed to implement the alias API.
|
||||
type RoomserverAliasAPIDatabase interface {
|
||||
// Save a given room alias with the room ID it refers to.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
SetRoomAlias(alias string, roomID string) error
|
||||
// Lookup the room ID a given alias refers to.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
GetRoomIDFromAlias(alias string) (string, error)
|
||||
// Lookup all aliases referring to a given room ID.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
GetAliasesFromRoomID(roomID string) ([]string, error)
|
||||
// Remove a given room alias.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
RemoveRoomAlias(alias string) error
|
||||
}
|
||||
|
||||
// RoomserverAliasAPI is an implementation of api.RoomserverAliasAPI
|
||||
type RoomserverAliasAPI struct {
|
||||
DB RoomserverAliasAPIDatabase
|
||||
Cfg *config.Dendrite
|
||||
InputAPI input.RoomserverInputAPI
|
||||
QueryAPI query.RoomserverQueryAPI
|
||||
}
|
||||
|
||||
// SetRoomAlias implements api.RoomserverAliasAPI
|
||||
func (r *RoomserverAliasAPI) SetRoomAlias(
|
||||
request *api.SetRoomAliasRequest,
|
||||
response *api.SetRoomAliasResponse,
|
||||
) error {
|
||||
// Check if the alias isn't already referring to a room
|
||||
roomID, err := r.DB.GetRoomIDFromAlias(request.Alias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(roomID) > 0 {
|
||||
// If the alias already exists, stop the process
|
||||
response.AliasExists = true
|
||||
return nil
|
||||
}
|
||||
response.AliasExists = false
|
||||
|
||||
// Save the new alias
|
||||
if err := r.DB.SetRoomAlias(request.Alias, request.RoomID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send a m.room.aliases event with the updated list of aliases for this room
|
||||
if err := r.sendUpdatedAliasesEvent(request.UserID, request.RoomID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAliasRoomID implements api.RoomserverAliasAPI
|
||||
func (r *RoomserverAliasAPI) GetAliasRoomID(
|
||||
request *api.GetAliasRoomIDRequest,
|
||||
response *api.GetAliasRoomIDResponse,
|
||||
) error {
|
||||
// Lookup the room ID in the database
|
||||
roomID, err := r.DB.GetRoomIDFromAlias(request.Alias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
response.RoomID = roomID
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveRoomAlias implements api.RoomserverAliasAPI
|
||||
func (r *RoomserverAliasAPI) RemoveRoomAlias(
|
||||
request *api.RemoveRoomAliasRequest,
|
||||
response *api.RemoveRoomAliasResponse,
|
||||
) error {
|
||||
// Lookup the room ID in the database
|
||||
roomID, err := r.DB.GetRoomIDFromAlias(request.Alias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the dalias from the database
|
||||
if err := r.DB.RemoveRoomAlias(request.Alias); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send an updated m.room.aliases event
|
||||
if err := r.sendUpdatedAliasesEvent(request.UserID, roomID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type roomAliasesContent struct {
|
||||
Aliases []string `json:"aliases"`
|
||||
}
|
||||
|
||||
// Build the updated m.room.aliases event to send to the room after addition or
|
||||
// removal of an alias
|
||||
func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(userID string, roomID string) error {
|
||||
serverName := string(r.Cfg.Matrix.ServerName)
|
||||
|
||||
builder := gomatrixserverlib.EventBuilder{
|
||||
Sender: userID,
|
||||
RoomID: roomID,
|
||||
Type: "m.room.aliases",
|
||||
StateKey: &serverName,
|
||||
}
|
||||
|
||||
// Retrieve the updated list of aliases, marhal it and set it as the
|
||||
// event's content
|
||||
aliases, err := r.DB.GetAliasesFromRoomID(roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
content := roomAliasesContent{Aliases: aliases}
|
||||
rawContent, err := json.Marshal(content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = builder.SetContent(json.RawMessage(rawContent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get needed state events and depth
|
||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&builder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req := api.QueryLatestEventsAndStateRequest{
|
||||
RoomID: roomID,
|
||||
StateToFetch: eventsNeeded.Tuples(),
|
||||
}
|
||||
var res api.QueryLatestEventsAndStateResponse
|
||||
if err = r.QueryAPI.QueryLatestEventsAndState(&req, &res); err != nil {
|
||||
return err
|
||||
}
|
||||
builder.Depth = res.Depth
|
||||
builder.PrevEvents = res.LatestEvents
|
||||
|
||||
// Add auth events
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
for i := range res.StateEvents {
|
||||
authEvents.AddEvent(&res.StateEvents[i])
|
||||
}
|
||||
refs, err := eventsNeeded.AuthEventReferences(&authEvents)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
builder.AuthEvents = refs
|
||||
|
||||
// Build the event
|
||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.Cfg.Matrix.ServerName)
|
||||
now := time.Now()
|
||||
event, err := builder.Build(eventID, now, r.Cfg.Matrix.ServerName, r.Cfg.Matrix.KeyID, r.Cfg.Matrix.PrivateKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the request
|
||||
ire := api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: event,
|
||||
AuthEventIDs: event.AuthEventIDs(),
|
||||
SendAsServer: serverName,
|
||||
}
|
||||
inputReq := api.InputRoomEventsRequest{
|
||||
InputRoomEvents: []api.InputRoomEvent{ire},
|
||||
}
|
||||
var inputRes api.InputRoomEventsResponse
|
||||
|
||||
// Send the request
|
||||
if err := r.InputAPI.InputRoomEvents(&inputReq, &inputRes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetupHTTP adds the RoomserverAliasAPI handlers to the http.ServeMux.
|
||||
func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
|
||||
servMux.Handle(
|
||||
api.RoomserverSetRoomAliasPath,
|
||||
common.MakeAPI("setRoomAlias", func(req *http.Request) util.JSONResponse {
|
||||
var request api.SetRoomAliasRequest
|
||||
var response api.SetRoomAliasResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
if err := r.SetRoomAlias(&request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: 200, JSON: &response}
|
||||
}),
|
||||
)
|
||||
servMux.Handle(
|
||||
api.RoomserverGetAliasRoomIDPath,
|
||||
common.MakeAPI("getAliasRoomID", func(req *http.Request) util.JSONResponse {
|
||||
var request api.GetAliasRoomIDRequest
|
||||
var response api.GetAliasRoomIDResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
if err := r.GetAliasRoomID(&request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: 200, JSON: &response}
|
||||
}),
|
||||
)
|
||||
servMux.Handle(
|
||||
api.RoomserverRemoveRoomAliasPath,
|
||||
common.MakeAPI("removeRoomAlias", func(req *http.Request) util.JSONResponse {
|
||||
var request api.RemoveRoomAliasRequest
|
||||
var response api.RemoveRoomAliasResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
if err := r.RemoveRoomAlias(&request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: 200, JSON: &response}
|
||||
}),
|
||||
)
|
||||
}
|
||||
129
src/github.com/matrix-org/dendrite/roomserver/api/alias.go
Normal file
129
src/github.com/matrix-org/dendrite/roomserver/api/alias.go
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// SetRoomAliasRequest is a request to SetRoomAlias
|
||||
type SetRoomAliasRequest struct {
|
||||
// ID of the user setting the alias
|
||||
UserID string `json:"user_id"`
|
||||
// New alias for the room
|
||||
Alias string `json:"alias"`
|
||||
// The room ID the alias is referring to
|
||||
RoomID string `json:"room_id"`
|
||||
}
|
||||
|
||||
// SetRoomAliasResponse is a response to SetRoomAlias
|
||||
type SetRoomAliasResponse struct {
|
||||
// Does the alias already refer to a room?
|
||||
AliasExists bool `json:"alias_exists"`
|
||||
}
|
||||
|
||||
// GetAliasRoomIDRequest is a request to GetAliasRoomID
|
||||
type GetAliasRoomIDRequest struct {
|
||||
// Alias we want to lookup
|
||||
Alias string `json:"alias"`
|
||||
}
|
||||
|
||||
// GetAliasRoomIDResponse is a response to GetAliasRoomID
|
||||
type GetAliasRoomIDResponse struct {
|
||||
// The room ID the alias refers to
|
||||
RoomID string `json:"room_id"`
|
||||
}
|
||||
|
||||
// RemoveRoomAliasRequest is a request to RemoveRoomAlias
|
||||
type RemoveRoomAliasRequest struct {
|
||||
// ID of the user removing the alias
|
||||
UserID string `json:"user_id"`
|
||||
// The room alias to remove
|
||||
Alias string `json:"alias"`
|
||||
}
|
||||
|
||||
// RemoveRoomAliasResponse is a response to RemoveRoomAlias
|
||||
type RemoveRoomAliasResponse struct{}
|
||||
|
||||
// RoomserverAliasAPI is used to save, lookup or remove a room alias
|
||||
type RoomserverAliasAPI interface {
|
||||
// Set a room alias
|
||||
SetRoomAlias(
|
||||
req *SetRoomAliasRequest,
|
||||
response *SetRoomAliasResponse,
|
||||
) error
|
||||
|
||||
// Get the room ID for an alias
|
||||
GetAliasRoomID(
|
||||
req *GetAliasRoomIDRequest,
|
||||
response *GetAliasRoomIDResponse,
|
||||
) error
|
||||
|
||||
// Remove a room alias
|
||||
RemoveRoomAlias(
|
||||
req *RemoveRoomAliasRequest,
|
||||
response *RemoveRoomAliasResponse,
|
||||
) error
|
||||
}
|
||||
|
||||
// RoomserverSetRoomAliasPath is the HTTP path for the SetRoomAlias API.
|
||||
const RoomserverSetRoomAliasPath = "/api/roomserver/setRoomAlias"
|
||||
|
||||
// RoomserverGetAliasRoomIDPath is the HTTP path for the GetAliasRoomID API.
|
||||
const RoomserverGetAliasRoomIDPath = "/api/roomserver/getAliasRoomID"
|
||||
|
||||
// RoomserverRemoveRoomAliasPath is the HTTP path for the RemoveRoomAlias API.
|
||||
const RoomserverRemoveRoomAliasPath = "/api/roomserver/removeRoomAlias"
|
||||
|
||||
// NewRoomserverAliasAPIHTTP creates a RoomserverAliasAPI implemented by talking to a HTTP POST API.
|
||||
// If httpClient is nil then it uses the http.DefaultClient
|
||||
func NewRoomserverAliasAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverAliasAPI {
|
||||
if httpClient == nil {
|
||||
httpClient = http.DefaultClient
|
||||
}
|
||||
return &httpRoomserverAliasAPI{roomserverURL, httpClient}
|
||||
}
|
||||
|
||||
type httpRoomserverAliasAPI struct {
|
||||
roomserverURL string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// SetRoomAlias implements RoomserverAliasAPI
|
||||
func (h *httpRoomserverAliasAPI) SetRoomAlias(
|
||||
request *SetRoomAliasRequest,
|
||||
response *SetRoomAliasResponse,
|
||||
) error {
|
||||
apiURL := h.roomserverURL + RoomserverSetRoomAliasPath
|
||||
return postJSON(h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
// GetAliasRoomID implements RoomserverAliasAPI
|
||||
func (h *httpRoomserverAliasAPI) GetAliasRoomID(
|
||||
request *GetAliasRoomIDRequest,
|
||||
response *GetAliasRoomIDResponse,
|
||||
) error {
|
||||
// RemoveRoomAlias implements RoomserverAliasAPI
|
||||
apiURL := h.roomserverURL + RoomserverGetAliasRoomIDPath
|
||||
return postJSON(h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
func (h *httpRoomserverAliasAPI) RemoveRoomAlias(
|
||||
request *RemoveRoomAliasRequest,
|
||||
response *RemoveRoomAliasResponse,
|
||||
) error {
|
||||
apiURL := h.roomserverURL + RoomserverRemoveRoomAliasPath
|
||||
return postJSON(h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
|
@ -40,9 +40,21 @@ type RoomserverQueryAPIDatabase interface {
|
|||
// Lookup the numeric IDs for a list of events.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
EventNIDs(eventIDs []string) (map[string]types.EventNID, error)
|
||||
// Save a given room alias with the room ID it refers to.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
SetRoomAlias(alias string, roomID string) error
|
||||
// Lookup the room ID a given alias refers to.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
GetRoomIDFromAlias(alias string) (string, error)
|
||||
// Lookup all aliases referring to a given room ID.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
GetAliasesFromRoomID(roomID string) ([]string, error)
|
||||
// Remove a given room alias.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
RemoveRoomAlias(alias string) error
|
||||
}
|
||||
|
||||
// RoomserverQueryAPI is an implementation of RoomserverQueryAPI
|
||||
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
|
||||
type RoomserverQueryAPI struct {
|
||||
DB RoomserverQueryAPIDatabase
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,100 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
const roomAliasesSchema = `
|
||||
-- Stores room aliases and room IDs they refer to
|
||||
CREATE TABLE IF NOT EXISTS room_aliases (
|
||||
-- Alias of the room
|
||||
alias TEXT NOT NULL PRIMARY KEY,
|
||||
-- Room ID the alias refers to
|
||||
room_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS room_id_idx ON room_aliases(room_id);
|
||||
`
|
||||
|
||||
const insertRoomAliasSQL = "" +
|
||||
"INSERT INTO room_aliases (alias, room_id) VALUES ($1, $2)"
|
||||
|
||||
const selectRoomIDFromAliasSQL = "" +
|
||||
"SELECT room_id FROM room_aliases WHERE alias = $1"
|
||||
|
||||
const selectAliasesFromRoomIDSQL = "" +
|
||||
"SELECT alias FROM room_aliases WHERE room_id = $1"
|
||||
|
||||
const deleteRoomAliasSQL = "" +
|
||||
"DELETE FROM room_aliases WHERE alias = $1"
|
||||
|
||||
type roomAliasesStatements struct {
|
||||
insertRoomAliasStmt *sql.Stmt
|
||||
selectRoomIDFromAliasStmt *sql.Stmt
|
||||
selectAliasesFromRoomIDStmt *sql.Stmt
|
||||
deleteRoomAliasStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *roomAliasesStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(roomAliasesSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return statementList{
|
||||
{&s.insertRoomAliasStmt, insertRoomAliasSQL},
|
||||
{&s.selectRoomIDFromAliasStmt, selectRoomIDFromAliasSQL},
|
||||
{&s.selectAliasesFromRoomIDStmt, selectAliasesFromRoomIDSQL},
|
||||
{&s.deleteRoomAliasStmt, deleteRoomAliasSQL},
|
||||
}.prepare(db)
|
||||
}
|
||||
|
||||
func (s *roomAliasesStatements) insertRoomAlias(alias string, roomID string) (err error) {
|
||||
_, err = s.insertRoomAliasStmt.Exec(alias, roomID)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *roomAliasesStatements) selectRoomIDFromAlias(alias string) (roomID string, err error) {
|
||||
err = s.selectRoomIDFromAliasStmt.QueryRow(alias).Scan(&roomID)
|
||||
if err == sql.ErrNoRows {
|
||||
return "", nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *roomAliasesStatements) selectAliasesFromRoomID(roomID string) (aliases []string, err error) {
|
||||
aliases = []string{}
|
||||
rows, err := s.selectAliasesFromRoomIDStmt.Query(roomID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var alias string
|
||||
if err = rows.Scan(&alias); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
aliases = append(aliases, alias)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *roomAliasesStatements) deleteRoomAlias(alias string) (err error) {
|
||||
_, err = s.deleteRoomAliasStmt.Exec(alias)
|
||||
return
|
||||
}
|
||||
|
|
@ -29,6 +29,7 @@ type statements struct {
|
|||
previousEventStatements
|
||||
inviteStatements
|
||||
membershipStatements
|
||||
roomAliasesStatements
|
||||
}
|
||||
|
||||
func (s *statements) prepare(db *sql.DB) error {
|
||||
|
|
@ -51,5 +52,9 @@ func (s *statements) prepare(db *sql.DB) error {
|
|||
}
|
||||
}
|
||||
|
||||
if err = s.roomAliasesStatements.prepare(db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -353,6 +353,26 @@ func (d *Database) LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.Ev
|
|||
return references, currentStateSnapshotNID, depth, nil
|
||||
}
|
||||
|
||||
// SetRoomAlias implements alias.RoomserverAliasAPIDB
|
||||
func (d *Database) SetRoomAlias(alias string, roomID string) error {
|
||||
return d.statements.insertRoomAlias(alias, roomID)
|
||||
}
|
||||
|
||||
// GetRoomIDFromAlias implements alias.RoomserverAliasAPIDB
|
||||
func (d *Database) GetRoomIDFromAlias(alias string) (string, error) {
|
||||
return d.statements.selectRoomIDFromAlias(alias)
|
||||
}
|
||||
|
||||
// GetAliasesFromRoomID implements alias.RoomserverAliasAPIDB
|
||||
func (d *Database) GetAliasesFromRoomID(roomID string) ([]string, error) {
|
||||
return d.statements.selectAliasesFromRoomID(roomID)
|
||||
}
|
||||
|
||||
// RemoveRoomAlias implements alias.RoomserverAliasAPIDB
|
||||
func (d *Database) RemoveRoomAlias(alias string) error {
|
||||
return d.statements.deleteRoomAlias(alias)
|
||||
}
|
||||
|
||||
// StateEntriesForTuples implements state.RoomStateDatabase
|
||||
func (d *Database) StateEntriesForTuples(
|
||||
stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,91 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
// OutputClientData consumes events that originated in the client API server.
|
||||
type OutputClientData struct {
|
||||
clientAPIConsumer *common.ContinualConsumer
|
||||
db *storage.SyncServerDatabase
|
||||
notifier *sync.Notifier
|
||||
}
|
||||
|
||||
// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) {
|
||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
consumer := common.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: store,
|
||||
}
|
||||
s := &OutputClientData{
|
||||
clientAPIConsumer: &consumer,
|
||||
db: store,
|
||||
notifier: n,
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Start consuming from room servers
|
||||
func (s *OutputClientData) Start() error {
|
||||
return s.clientAPIConsumer.Start()
|
||||
}
|
||||
|
||||
// onMessage is called when the sync server receives a new event from the client API server output log.
|
||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||
// sync stream position may race and be incorrectly calculated.
|
||||
func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
// Parse out the event JSON
|
||||
var output common.AccountData
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("client API server output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"type": output.Type,
|
||||
"room_id": output.RoomID,
|
||||
}).Info("received data from client API server")
|
||||
|
||||
syncStreamPos, err := s.db.UpsertAccountData(string(msg.Key), output.RoomID, output.Type)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"type": output.Type,
|
||||
"room_id": output.RoomID,
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("could not save account data")
|
||||
}
|
||||
|
||||
s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -37,6 +37,12 @@ type OutputRoomEvent struct {
|
|||
query api.RoomserverQueryAPI
|
||||
}
|
||||
|
||||
type prevEventRef struct {
|
||||
PrevContent json.RawMessage `json:"prev_content"`
|
||||
PrevID string `json:"replaces_state"`
|
||||
UserID string `json:"prev_sender"`
|
||||
}
|
||||
|
||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||
|
|
@ -101,6 +107,18 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
}).Panicf("roomserver output log: state event lookup failure")
|
||||
}
|
||||
|
||||
ev, err = s.updateStateEvent(ev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range addsStateEvents {
|
||||
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
syncStreamPos, err := s.db.WriteEvent(
|
||||
&ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs,
|
||||
)
|
||||
|
|
@ -115,7 +133,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
}).Panicf("roomserver output log: write event failure")
|
||||
return nil
|
||||
}
|
||||
s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos))
|
||||
s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -177,6 +195,32 @@ func (s *OutputRoomEvent) lookupStateEvents(
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
|
||||
var stateKey string
|
||||
if event.StateKey() == nil {
|
||||
stateKey = ""
|
||||
} else {
|
||||
stateKey = *event.StateKey()
|
||||
}
|
||||
|
||||
prevEvent, err := s.db.GetStateEvent(event.Type(), event.RoomID(), stateKey)
|
||||
if err != nil {
|
||||
return event, err
|
||||
}
|
||||
|
||||
if prevEvent == nil {
|
||||
return event, nil
|
||||
}
|
||||
|
||||
prev := prevEventRef{
|
||||
PrevContent: prevEvent.Content(),
|
||||
PrevID: prevEvent.EventID(),
|
||||
UserID: prevEvent.Sender(),
|
||||
}
|
||||
|
||||
return event.SetUnsigned(prev)
|
||||
}
|
||||
|
||||
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
|
||||
have := map[string]bool{}
|
||||
for _, event := range events {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,113 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
const accountDataSchema = `
|
||||
-- Stores the users account data
|
||||
CREATE TABLE IF NOT EXISTS account_data_type (
|
||||
-- The highest numeric ID from the output_room_events at the time of saving the data
|
||||
id BIGINT,
|
||||
-- ID of the user the data belongs to
|
||||
user_id TEXT NOT NULL,
|
||||
-- ID of the room the data is related to (empty string if not related to a specific room)
|
||||
room_id TEXT NOT NULL,
|
||||
-- Type of the data
|
||||
type TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(user_id, room_id, type),
|
||||
|
||||
-- We don't want two entries of the same type for the same user
|
||||
CONSTRAINT account_data_unique UNIQUE (user_id, room_id, type)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS account_data_id_idx ON account_data_type(id);
|
||||
`
|
||||
|
||||
const insertAccountDataSQL = "" +
|
||||
"INSERT INTO account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
|
||||
" ON CONFLICT ON CONSTRAINT account_data_unique" +
|
||||
" DO UPDATE SET id = EXCLUDED.id"
|
||||
|
||||
const selectAccountDataInRangeSQL = "" +
|
||||
"SELECT room_id, type FROM account_data_type" +
|
||||
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
||||
" ORDER BY id ASC"
|
||||
|
||||
type accountDataStatements struct {
|
||||
insertAccountDataStmt *sql.Stmt
|
||||
selectAccountDataInRangeStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(accountDataSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) insertAccountData(
|
||||
pos types.StreamPosition, userID string, roomID string, dataType string,
|
||||
) (err error) {
|
||||
_, err = s.insertAccountDataStmt.Exec(pos, userID, roomID, dataType)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) selectAccountDataInRange(
|
||||
userID string, oldPos types.StreamPosition, newPos types.StreamPosition,
|
||||
) (data map[string][]string, err error) {
|
||||
data = make(map[string][]string)
|
||||
|
||||
// If both positions are the same, it means that the data was saved after the
|
||||
// latest room event. In that case, we need to decrement the old position as
|
||||
// it would prevent the SQL request from returning anything.
|
||||
if oldPos == newPos {
|
||||
oldPos--
|
||||
}
|
||||
|
||||
rows, err := s.selectAccountDataInRangeStmt.Query(userID, oldPos, newPos)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var dataType string
|
||||
var roomID string
|
||||
|
||||
if err = rows.Scan(&roomID, &dataType); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(data[roomID]) > 0 {
|
||||
data[roomID] = append(data[roomID], dataType)
|
||||
} else {
|
||||
data[roomID] = []string{dataType}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
@ -66,6 +66,9 @@ const selectCurrentStateSQL = "" +
|
|||
const selectJoinedUsersSQL = "" +
|
||||
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||
|
||||
const selectStateEventSQL = "" +
|
||||
"SELECT event_json FROM current_room_state WHERE type = $1 AND room_id = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
"SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
|
|
@ -76,6 +79,7 @@ type currentRoomStateStatements struct {
|
|||
selectCurrentStateStmt *sql.Stmt
|
||||
selectJoinedUsersStmt *sql.Stmt
|
||||
selectEventsWithEventIDsStmt *sql.Stmt
|
||||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||
|
|
@ -101,6 +105,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -195,3 +202,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
|||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) selectStateEvent(evType string, roomID string, stateKey string) (*gomatrixserverlib.Event, error) {
|
||||
var res []byte
|
||||
if err := s.selectStateEventStmt.QueryRow(evType, roomID, stateKey).Scan(&res); err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
|
||||
return &ev, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,10 +41,11 @@ type streamEvent struct {
|
|||
|
||||
// SyncServerDatabase represents a sync server database
|
||||
type SyncServerDatabase struct {
|
||||
db *sql.DB
|
||||
partitions common.PartitionOffsetStatements
|
||||
events outputRoomEventsStatements
|
||||
roomstate currentRoomStateStatements
|
||||
db *sql.DB
|
||||
partitions common.PartitionOffsetStatements
|
||||
accountData accountDataStatements
|
||||
events outputRoomEventsStatements
|
||||
roomstate currentRoomStateStatements
|
||||
}
|
||||
|
||||
// NewSyncServerDatabase creates a new sync server database
|
||||
|
|
@ -58,6 +59,10 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
|||
if err = partitions.Prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
accountData := accountDataStatements{}
|
||||
if err = accountData.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events := outputRoomEventsStatements{}
|
||||
if err = events.prepare(db); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -66,7 +71,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
|||
if err := state.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SyncServerDatabase{db, partitions, events, state}, nil
|
||||
return &SyncServerDatabase{db, partitions, accountData, events, state}, nil
|
||||
}
|
||||
|
||||
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
||||
|
|
@ -141,6 +146,13 @@ func (d *SyncServerDatabase) updateRoomState(
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
|
||||
// If no event could be found, returns nil
|
||||
// If there was an issue during the retrieval, returns an error
|
||||
func (d *SyncServerDatabase) GetStateEvent(evType string, roomID string, stateKey string) (*gomatrixserverlib.Event, error) {
|
||||
return d.roomstate.selectStateEvent(evType, roomID, stateKey)
|
||||
}
|
||||
|
||||
// PartitionOffsets implements common.PartitionStorer
|
||||
func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
||||
return d.partitions.SelectPartitionOffsets(topic)
|
||||
|
|
@ -267,6 +279,33 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
|
|||
return
|
||||
}
|
||||
|
||||
// GetAccountDataInRange returns all account data for a given user inserted or
|
||||
// updated between two given positions
|
||||
// Returns a map following the format data[roomID] = []dataTypes
|
||||
// If no data is retrieved, returns an empty map
|
||||
// If there was an issue with the retrieval, returns an error
|
||||
func (d *SyncServerDatabase) GetAccountDataInRange(
|
||||
userID string, oldPos types.StreamPosition, newPos types.StreamPosition,
|
||||
) (map[string][]string, error) {
|
||||
return d.accountData.selectAccountDataInRange(userID, oldPos, newPos)
|
||||
}
|
||||
|
||||
// UpsertAccountData keeps track of new or updated account data, by saving the type
|
||||
// of the new/updated data, and the user ID and room ID the data is related to (empty)
|
||||
// room ID means the data isn't specific to any room)
|
||||
// If no data with the given type, user ID and room ID exists in the database,
|
||||
// creates a new row, else update the existing one
|
||||
// Returns an error if there was an issue with the upsert
|
||||
func (d *SyncServerDatabase) UpsertAccountData(userID string, roomID string, dataType string) (types.StreamPosition, error) {
|
||||
pos, err := d.SyncStreamPosition()
|
||||
if err != nil {
|
||||
return pos, err
|
||||
}
|
||||
|
||||
err = d.accountData.insertAccountData(pos, userID, roomID, dataType)
|
||||
return pos, err
|
||||
}
|
||||
|
||||
func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error {
|
||||
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
|
||||
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(txn, userID, "invite")
|
||||
|
|
|
|||
|
|
@ -54,39 +54,44 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
|
|||
// OnNewEvent is called when a new event is received from the room server. Must only be
|
||||
// called from a single goroutine, to avoid races between updates which could set the
|
||||
// current position in the stream incorrectly.
|
||||
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
|
||||
// Can be called either with a *gomatrixserverlib.Event, or with an user ID
|
||||
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) {
|
||||
// update the current position then notify relevant /sync streams.
|
||||
// This needs to be done PRIOR to waking up users as they will read this value.
|
||||
n.streamLock.Lock()
|
||||
defer n.streamLock.Unlock()
|
||||
n.currPos = pos
|
||||
|
||||
// Map this event's room_id to a list of joined users, and wake them up.
|
||||
userIDs := n.joinedUsers(ev.RoomID())
|
||||
// If this is an invite, also add in the invitee to this list.
|
||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||
userID := *ev.StateKey()
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
||||
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
||||
)
|
||||
} else {
|
||||
// Keep the joined user map up-to-date
|
||||
switch membership {
|
||||
case "invite":
|
||||
userIDs = append(userIDs, userID)
|
||||
case "join":
|
||||
n.addJoinedUser(ev.RoomID(), userID)
|
||||
case "leave":
|
||||
fallthrough
|
||||
case "ban":
|
||||
n.removeJoinedUser(ev.RoomID(), userID)
|
||||
if ev != nil {
|
||||
// Map this event's room_id to a list of joined users, and wake them up.
|
||||
userIDs := n.joinedUsers(ev.RoomID())
|
||||
// If this is an invite, also add in the invitee to this list.
|
||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||
userID := *ev.StateKey()
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
||||
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
||||
)
|
||||
} else {
|
||||
// Keep the joined user map up-to-date
|
||||
switch membership {
|
||||
case "invite":
|
||||
userIDs = append(userIDs, userID)
|
||||
case "join":
|
||||
n.addJoinedUser(ev.RoomID(), userID)
|
||||
case "leave":
|
||||
fallthrough
|
||||
case "ban":
|
||||
n.removeJoinedUser(ev.RoomID(), userID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, userID := range userIDs {
|
||||
for _, userID := range userIDs {
|
||||
n.wakeupUser(userID, pos)
|
||||
}
|
||||
} else if len(userID) > 0 {
|
||||
n.wakeupUser(userID, pos)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -123,7 +123,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
|||
stream := n.fetchUserStream(bob, true)
|
||||
waitForBlocking(stream, 1)
|
||||
|
||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
|
||||
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
@ -151,7 +151,7 @@ func TestNewInviteEventForUser(t *testing.T) {
|
|||
stream := n.fetchUserStream(bob, true)
|
||||
waitForBlocking(stream, 1)
|
||||
|
||||
n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter)
|
||||
n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
@ -182,7 +182,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
|||
stream := n.fetchUserStream(bob, true)
|
||||
waitForBlocking(stream, 3)
|
||||
|
||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
|
||||
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
|
|
@ -217,7 +217,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|||
}()
|
||||
bobStream := n.fetchUserStream(bob, true)
|
||||
waitForBlocking(bobStream, 1)
|
||||
n.OnNewEvent(&bobLeaveEvent, streamPositionAfter)
|
||||
n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
|
||||
leaveWG.Wait()
|
||||
|
||||
// send an event into the room. Make sure alice gets it. Bob should not.
|
||||
|
|
@ -246,7 +246,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|||
waitForBlocking(aliceStream, 1)
|
||||
waitForBlocking(bobStream, 1)
|
||||
|
||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter2)
|
||||
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
|
||||
aliceWG.Wait()
|
||||
|
||||
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
||||
|
|
|
|||
|
|
@ -20,22 +20,25 @@ import (
|
|||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// RequestPool manages HTTP long-poll connections for /sync
|
||||
type RequestPool struct {
|
||||
db *storage.SyncServerDatabase
|
||||
notifier *Notifier
|
||||
db *storage.SyncServerDatabase
|
||||
accountDB *accounts.Database
|
||||
notifier *Notifier
|
||||
}
|
||||
|
||||
// NewRequestPool makes a new RequestPool
|
||||
func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier) *RequestPool {
|
||||
return &RequestPool{db, n}
|
||||
func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.Database) *RequestPool {
|
||||
return &RequestPool{db, adb, n}
|
||||
}
|
||||
|
||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||
|
|
@ -77,9 +80,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
|||
if err != nil {
|
||||
res = httputil.LogThenError(req, err)
|
||||
} else {
|
||||
res = util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: syncData,
|
||||
syncData, err = rp.appendAccountData(syncData, device.UserID, *syncReq, currentPos)
|
||||
if err != nil {
|
||||
res = httputil.LogThenError(req, err)
|
||||
} else {
|
||||
res = util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: syncData,
|
||||
}
|
||||
}
|
||||
}
|
||||
done <- res
|
||||
|
|
@ -104,3 +112,69 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre
|
|||
}
|
||||
return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
||||
}
|
||||
|
||||
func (rp *RequestPool) appendAccountData(
|
||||
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
||||
) (*types.Response, error) {
|
||||
// TODO: We currently send all account data on every sync response, we should instead send data
|
||||
// that has changed on incremental sync responses
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.since == types.StreamPosition(0) {
|
||||
// If this is the initial sync, we don't need to check if a data has
|
||||
// already been sent. Instead, we send the whole batch.
|
||||
var global []gomatrixserverlib.ClientEvent
|
||||
var rooms map[string][]gomatrixserverlib.ClientEvent
|
||||
global, rooms, err = rp.accountDB.GetAccountData(localpart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data.AccountData.Events = global
|
||||
|
||||
for r, j := range data.Rooms.Join {
|
||||
if len(rooms[r]) > 0 {
|
||||
j.AccountData.Events = rooms[r]
|
||||
data.Rooms.Join[r] = j
|
||||
}
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Sync is not initial, get all account data since the latest sync
|
||||
dataTypes, err := rp.db.GetAccountDataInRange(userID, req.since, currentPos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(dataTypes) == 0 {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Iterate over the rooms
|
||||
for roomID, dataTypes := range dataTypes {
|
||||
events := []gomatrixserverlib.ClientEvent{}
|
||||
// Request the missing data from the database
|
||||
for _, dataType := range dataTypes {
|
||||
evs, err := rp.accountDB.GetAccountDataByType(localpart, roomID, dataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events = append(events, evs...)
|
||||
}
|
||||
|
||||
// Append the data to the response
|
||||
if len(roomID) > 0 {
|
||||
jr := data.Rooms.Join[roomID]
|
||||
jr.AccountData.Events = events
|
||||
data.Rooms.Join[roomID] = jr
|
||||
} else {
|
||||
data.AccountData.Events = events
|
||||
}
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue