From d9b8e5de4514fefd2095b41e9b01607cbae9fb18 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 17 Jul 2017 18:10:56 +0100 Subject: [PATCH 1/4] Keep track of membership in Client API (#159) * Saving memberships * Removed unused index * Removed useless log * Fixed membership not being saved on the right conditions + added membership removal * Updated outdated comment * Use server lib method + check server name + use new roomserver API * Better handling of events from the room server * Fixed membership removal * Corrected indentation * Fix tests (hopefully) * Replace broken kafka mirror * Apply requested changes on database management * Remove useless check and function * Moved memberships update to the database package * Use new common function * Remove useless function --- .../auth/storage/accounts/membership_table.go | 85 +++++++++++ .../auth/storage/accounts/storage.go | 99 +++++++++++- .../clientapi/consumers/roomserver.go | 141 ++++++++++++++++++ .../cmd/dendrite-client-api-server/main.go | 9 ++ .../cmd/mediaapi-integration-tests/main.go | 9 +- .../cmd/syncserver-integration-tests/main.go | 9 +- .../matrix-org/dendrite/common/test/server.go | 13 +- travis-install-kafka.sh | 2 +- 8 files changed, 345 insertions(+), 22 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go create mode 100644 src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go new file mode 100644 index 000000000..8eca4a574 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/membership_table.go @@ -0,0 +1,85 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package accounts + +import ( + "database/sql" + + "github.com/lib/pq" +) + +const membershipSchema = ` +-- Stores data about users memberships to rooms. +CREATE TABLE IF NOT EXISTS memberships ( + -- The Matrix user ID localpart for the member + localpart TEXT NOT NULL, + -- The room this user is a member of + room_id TEXT NOT NULL, + -- The ID of the join membership event + event_id TEXT NOT NULL, + + -- A user can only be member of a room once + PRIMARY KEY (localpart, room_id) +); + +-- Use index to process deletion by ID more efficiently +CREATE UNIQUE INDEX IF NOT EXISTS membership_event_id ON memberships(event_id); +` + +const insertMembershipSQL = "" + + "INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)" + +const selectMembershipSQL = "" + + "SELECT * from memberships WHERE localpart = $1 AND room_id = $2" + +const selectMembershipsByLocalpartSQL = "" + + "SELECT room_id FROM memberships WHERE localpart = $1" + +const deleteMembershipsByEventIDsSQL = "" + + "DELETE FROM memberships WHERE event_id = ANY($1)" + +type membershipStatements struct { + deleteMembershipsByEventIDsStmt *sql.Stmt + insertMembershipStmt *sql.Stmt + selectMembershipByEventIDStmt *sql.Stmt + selectMembershipsByLocalpartStmt *sql.Stmt +} + +func (s *membershipStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(membershipSchema) + if err != nil { + return + } + if s.deleteMembershipsByEventIDsStmt, err = db.Prepare(deleteMembershipsByEventIDsSQL); err != nil { + return + } + if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil { + return + } + if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil { + return + } + return +} + +func (s *membershipStatements) insertMembership(localpart string, roomID string, eventID string, txn *sql.Tx) (err error) { + _, err = txn.Stmt(s.insertMembershipStmt).Exec(localpart, roomID, eventID) + return +} + +func (s *membershipStatements) deleteMembershipsByEventIDs(eventIDs []string, txn *sql.Tx) (err error) { + _, err = txn.Stmt(s.deleteMembershipsByEventIDsStmt).Exec(pq.StringArray(eventIDs)) + return +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go index cd6abc09a..65d87d5a7 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go @@ -18,6 +18,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" "golang.org/x/crypto/bcrypt" // Import the postgres database driver. @@ -26,9 +27,12 @@ import ( // Database represents an account database type Database struct { - db *sql.DB - accounts accountsStatements - profiles profilesStatements + db *sql.DB + partitions common.PartitionOffsetStatements + accounts accountsStatements + profiles profilesStatements + memberships membershipStatements + serverName gomatrixserverlib.ServerName } // NewDatabase creates a new accounts and profiles database @@ -38,6 +42,10 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName) if db, err = sql.Open("postgres", dataSourceName); err != nil { return nil, err } + partitions := common.PartitionOffsetStatements{} + if err = partitions.Prepare(db); err != nil { + return nil, err + } a := accountsStatements{} if err = a.prepare(db, serverName); err != nil { return nil, err @@ -46,7 +54,11 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName) if err = p.prepare(db); err != nil { return nil, err } - return &Database{db, a, p}, nil + m := membershipStatements{} + if err = m.prepare(db); err != nil { + return nil, err + } + return &Database{db, partitions, a, p, m, serverName}, nil } // GetAccountByPassword returns the account associated with the given localpart and password. @@ -93,6 +105,85 @@ func (d *Database) CreateAccount(localpart, plaintextPassword string) (*authtype return d.accounts.insertAccount(localpart, hash) } +// PartitionOffsets implements common.PartitionStorer +func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.partitions.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.partitions.UpsertPartitionOffset(topic, partition, offset) +} + +// SaveMembership saves the user matching a given localpart as a member of a given +// room. It also stores the ID of the `join` membership event. +// If a membership already exists between the user and the room, or of the +// insert fails, returns the SQL error +func (d *Database) SaveMembership(localpart string, roomID string, eventID string, txn *sql.Tx) error { + return d.memberships.insertMembership(localpart, roomID, eventID, txn) +} + +// removeMembershipsByEventIDs removes the memberships of which the `join` membership +// event ID is included in a given array of events IDs +// If the removal fails, or if there is no membership to remove, returns an error +func (d *Database) removeMembershipsByEventIDs(eventIDs []string, txn *sql.Tx) error { + return d.memberships.deleteMembershipsByEventIDs(eventIDs, txn) +} + +// UpdateMemberships adds the "join" membership events included in a given state +// events array, and removes those which ID is included in a given array of events +// IDs. All of the process is run in a transaction, which commits only once/if every +// insertion and deletion has been successfully processed. +// Returns a SQL error if there was an issue with any part of the process +func (d *Database) UpdateMemberships(eventsToAdd []gomatrixserverlib.Event, idsToRemove []string) error { + return common.WithTransaction(d.db, func(txn *sql.Tx) error { + if err := d.removeMembershipsByEventIDs(idsToRemove, txn); err != nil { + return err + } + + for _, event := range eventsToAdd { + if err := d.newMembership(event, txn); err != nil { + return err + } + } + + return nil + }) +} + +// newMembership will save a new membership in the database if the given state +// event is a "join" membership event +// If the event isn't a "join" membership event, does nothing +// If an error occurred, returns it +func (d *Database) newMembership(ev gomatrixserverlib.Event, txn *sql.Tx) error { + if ev.Type() == "m.room.member" && ev.StateKey() != nil { + localpart, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) + if err != nil { + return err + } + + // We only want state events from local users + if string(serverName) != string(d.serverName) { + return nil + } + + eventID := ev.EventID() + roomID := ev.RoomID() + membership, err := ev.Membership() + if err != nil { + return err + } + + // Only "join" membership events can be considered as new memberships + if membership == "join" { + if err := d.SaveMembership(localpart, roomID, eventID, txn); err != nil { + return err + } + } + } + return nil +} + func hashPassword(plaintext string) (hash string, err error) { hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost) return string(hashBytes), err diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go new file mode 100644 index 000000000..98dcd5b65 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -0,0 +1,141 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEvent consumes events that originated in the room server. +type OutputRoomEvent struct { + roomServerConsumer *common.ContinualConsumer + db *accounts.Database + query api.RoomserverQueryAPI + serverName string +} + +// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*OutputRoomEvent, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + return nil, err + } + roomServerURL := cfg.RoomServerURL() + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEvent{ + roomServerConsumer: &consumer, + db: store, + query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + serverName: string(cfg.Matrix.ServerName), + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *OutputRoomEvent) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the room server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. +func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + + ev := output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "type": ev.Type(), + }).Info("received event from roomserver") + + events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + if err != nil { + return err + } + + if err := s.db.UpdateMemberships(events, output.NewRoomEvent.RemovesStateEventIDs); err != nil { + return err + } + + return nil +} + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEvent) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + result := []gomatrixserverlib.Event{} + missing := []string{} + for _, id := range addsStateEventIDs { + // Append the current event in the results if its ID is in the events list + if id == event.EventID() { + result = append(result, event) + } else { + // If the event isn't the current one, add it to the list of events + // to retrieve from the roomserver + missing = append(missing, id) + } + } + + // Request the missing events from the roomserver + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + + return result, nil +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index a64cc9a07..6699897f4 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/common" @@ -86,6 +87,14 @@ func main() { KeyDatabase: keyDB, } + consumer, err := consumers.NewOutputRoomEvent(cfg, accountDB) + if err != nil { + log.Panicf("startup: failed to create room server consumer: %s", err) + } + if err = consumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer") + } + log.Info("Starting client API server on ", cfg.Listen.ClientAPI) routing.Setup( http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer, diff --git a/src/github.com/matrix-org/dendrite/cmd/mediaapi-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/mediaapi-integration-tests/main.go index 29a79d762..f0e3b0c03 100644 --- a/src/github.com/matrix-org/dendrite/cmd/mediaapi-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/mediaapi-integration-tests/main.go @@ -104,14 +104,17 @@ func startMediaAPI(suffix string, dynamicThumbnails bool) (*exec.Cmd, chan error proxyCmd, proxyCmdChan := test.StartProxy(proxyAddr, cfg) - cmd, cmdChan := test.StartServer( - serverType, - serverArgs, + test.InitDatabase( postgresDatabase, postgresContainerName, databases, ) + cmd, cmdChan := test.CreateBackgroundCommand( + filepath.Join(filepath.Dir(os.Args[0]), "dendrite-"+serverType+"-server"), + serverArgs, + ) + fmt.Printf("==TESTSERVER== STARTED %v -> %v : %v\n", proxyAddr, cfg.Listen.MediaAPI, dir) return cmd, cmdChan, string(cfg.Listen.MediaAPI), proxyCmd, proxyCmdChan, proxyAddr, dir } diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index 6bc59456b..b0e36c425 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -147,9 +147,7 @@ func startSyncServer() (*exec.Cmd, chan error) { testDatabaseName, } - cmd, cmdChan := test.StartServer( - "sync-api", - serverArgs, + test.InitDatabase( postgresDatabase, postgresContainerName, databases, @@ -165,6 +163,11 @@ func startSyncServer() (*exec.Cmd, chan error) { panic(err) } + cmd, cmdChan := test.CreateBackgroundCommand( + filepath.Join(filepath.Dir(os.Args[0]), "dendrite-sync-api-server"), + serverArgs, + ) + return cmd, cmdChan } diff --git a/src/github.com/matrix-org/dendrite/common/test/server.go b/src/github.com/matrix-org/dendrite/common/test/server.go index 2f089fe34..d990d4105 100644 --- a/src/github.com/matrix-org/dendrite/common/test/server.go +++ b/src/github.com/matrix-org/dendrite/common/test/server.go @@ -65,12 +65,8 @@ func CreateBackgroundCommand(command string, args []string) (*exec.Cmd, chan err return cmd, cmdChan } -// StartServer creates the database and config file needed for the server to run and -// then starts the server. The Cmd being executed is returned. A channel is also returned, -// which will have any termination errors sent down it, followed immediately by the channel being closed. -// If postgresContainerName is not an empty string, psql will be run from inside that container. If it is -// an empty string, psql will be assumed to be in PATH. -func StartServer(serverType string, serverArgs []string, postgresDatabase, postgresContainerName string, databases []string) (*exec.Cmd, chan error) { +// InitDatabase creates the database and config file needed for the server to run +func InitDatabase(postgresDatabase, postgresContainerName string, databases []string) { if len(databases) > 0 { var dbCmd string var dbArgs []string @@ -89,11 +85,6 @@ func StartServer(serverType string, serverArgs []string, postgresDatabase, postg } } } - - return CreateBackgroundCommand( - filepath.Join(filepath.Dir(os.Args[0]), "dendrite-"+serverType+"-server"), - serverArgs, - ) } // StartProxy creates a reverse proxy diff --git a/travis-install-kafka.sh b/travis-install-kafka.sh index 20855fcc1..32f952349 100755 --- a/travis-install-kafka.sh +++ b/travis-install-kafka.sh @@ -5,7 +5,7 @@ set -eu # The mirror to download kafka from is picked from the list of mirrors at # https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz # TODO: Check the signature since we are downloading over HTTP. -MIRROR=http://mirror.ox.ac.uk/sites/rsync.apache.org/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz +MIRROR=http://apache.mirror.anlx.net/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz # Only download the kafka if it isn't already downloaded. test -f kafka.tgz || wget $MIRROR -O kafka.tgz From e6d77d6bdef4a6320ccdf89a8ddff15ff2d0db79 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 18 Jul 2017 13:40:03 +0100 Subject: [PATCH 2/4] Use HTTP API for roomserver input. (#161) * Use HTTP API for roomserver input. * Use synchronous HTTP API for writing events to the roomserver * Remove unused config for kafka topic * Tweak comments --- .../clientapi/producers/roomserver.go | 61 ++------ .../cmd/dendrite-client-api-server/main.go | 7 +- .../dendrite-federation-api-server/main.go | 4 +- .../dendrite/cmd/dendrite-room-server/main.go | 47 ++----- .../cmd/roomserver-integration-tests/main.go | 77 ++++++++-- .../dendrite/common/config/config.go | 10 +- .../matrix-org/dendrite/common/test/config.go | 8 +- .../dendrite/roomserver/input/consumer.go | 133 ------------------ .../dendrite/roomserver/storage/sql.go | 7 - .../dendrite/roomserver/storage/storage.go | 11 -- 10 files changed, 94 insertions(+), 271 deletions(-) delete mode 100644 src/github.com/matrix-org/dendrite/roomserver/input/consumer.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index 3b46487a2..34455ddbc 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -15,35 +15,24 @@ package producers import ( - "encoding/json" - "fmt" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" - sarama "gopkg.in/Shopify/sarama.v1" ) // RoomserverProducer produces events for the roomserver to consume. type RoomserverProducer struct { - Topic string - Producer sarama.SyncProducer + InputAPI api.RoomserverInputAPI } // NewRoomserverProducer creates a new RoomserverProducer -func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) { - producer, err := sarama.NewSyncProducer(kafkaURIs, nil) - if err != nil { - return nil, err - } +func NewRoomserverProducer(roomserverURI string) *RoomserverProducer { return &RoomserverProducer{ - Topic: topic, - Producer: producer, - }, nil + InputAPI: api.NewRoomserverInputAPIHTTP(roomserverURI, nil), + } } // SendEvents writes the given events to the roomserver input log. The events are written with KindNew. func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error { - eventIDs := make([]string, len(events)) ires := make([]api.InputRoomEvent, len(events)) for i, event := range events { ires[i] = api.InputRoomEvent{ @@ -52,9 +41,8 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), } - eventIDs[i] = event.EventID() } - return c.SendInputRoomEvents(ires, eventIDs) + return c.SendInputRoomEvents(ires) } // SendEventWithState writes an event with KindNew to the roomserver input log @@ -65,7 +53,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat return err } - eventIDs := make([]string, len(outliers)+1) ires := make([]api.InputRoomEvent, len(outliers)+1) for i, outlier := range outliers { ires[i] = api.InputRoomEvent{ @@ -73,7 +60,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat Event: outlier, AuthEventIDs: outlier.AuthEventIDs(), } - eventIDs[i] = outlier.EventID() } stateEventIDs := make([]string, len(state.StateEvents)) @@ -88,41 +74,14 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat HasState: true, StateEventIDs: stateEventIDs, } - eventIDs[len(outliers)] = event.EventID() - return c.SendInputRoomEvents(ires, eventIDs) + return c.SendInputRoomEvents(ires) } // SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both // arrays must match, and each element must correspond to the same event. -func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error { - // TODO: Nicer way of doing this. Options are: - // A) Like this - // B) Add EventID field to InputRoomEvent - // C) Add wrapper struct with the EventID and the InputRoomEvent - if len(eventIDs) != len(ires) { - return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires)) - } - - msgs := make([]*sarama.ProducerMessage, len(ires)) - for i := range ires { - msg, err := c.toProducerMessage(ires[i], eventIDs[i]) - if err != nil { - return err - } - msgs[i] = msg - } - return c.Producer.SendMessages(msgs) -} - -func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) { - value, err := json.Marshal(ire) - if err != nil { - return nil, err - } - var m sarama.ProducerMessage - m.Topic = c.Topic - m.Key = sarama.StringEncoder(eventID) - m.Value = sarama.ByteEncoder(value) - return &m, nil +func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent) error { + request := api.InputRoomEventsRequest{InputRoomEvents: ires} + var response api.InputRoomEventsResponse + return c.InputAPI.InputRoomEvents(&request, &response) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 6699897f4..eb1218e7c 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -51,9 +51,9 @@ func main() { log.Info("config: ", cfg) - roomserverProducer, err := producers.NewRoomserverProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), - ) + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) + + roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL()) userUpdateProducer, err := producers.NewUserUpdateProducer( cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), ) @@ -65,7 +65,6 @@ func main() { cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, ) - queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName) if err != nil { log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error()) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go index f4f19cdcd..a479ad554 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go @@ -67,9 +67,7 @@ func main() { queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) - roomserverProducer, err := producers.NewRoomserverProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), - ) + roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL()) if err != nil { log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go index 0a1686791..715a40740 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go @@ -16,11 +16,9 @@ package main import ( "flag" - "fmt" "net/http" _ "net/http/pprof" "os" - "strconv" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" @@ -33,9 +31,8 @@ import ( ) var ( - logDir = os.Getenv("LOG_DIR") - configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") - stopProcessingAfter = os.Getenv("STOP_AFTER") + logDir = os.Getenv("LOG_DIR") + configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") ) func main() { @@ -56,49 +53,25 @@ func main() { panic(err) } - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - panic(err) - } - kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) if err != nil { panic(err) } - consumer := input.Consumer{ - ContinualConsumer: common.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.InputRoomEvent), - Consumer: kafkaConsumer, - PartitionStore: db, - }, - DB: db, - Producer: kafkaProducer, - OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent), - } - - if stopProcessingAfter != "" { - count, err := strconv.ParseInt(stopProcessingAfter, 10, 64) - if err != nil { - panic(err) - } - consumer.StopProcessingAfter = &count - consumer.ShutdownCallback = func(message string) { - fmt.Println("Stopping roomserver", message) - os.Exit(0) - } - } - - if err = consumer.Start(); err != nil { - panic(err) - } - queryAPI := query.RoomserverQueryAPI{ DB: db, } queryAPI.SetupHTTP(http.DefaultServeMux) + inputAPI := input.RoomserverInputAPI{ + DB: db, + Producer: kafkaProducer, + OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent), + } + + inputAPI.SetupHTTP(http.DefaultServeMux) + http.DefaultServeMux.Handle("/metrics", prometheus.Handler()) log.Info("Started room server on ", cfg.Listen.RoomServer) diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go index c4bea7f3e..43305c2fe 100644 --- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go @@ -23,6 +23,10 @@ import ( "strings" "time" + "encoding/json" + + "net/http" + "github.com/matrix-org/dendrite/common/test" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -90,7 +94,7 @@ func createDatabase(database string) error { // messages is reached or after a timeout. It kills the command before it returns. // It returns a list of the messages read from the command on success or an error // on failure. -func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAPI func()) ([]string, error) { +func runAndReadFromTopic(runCmd *exec.Cmd, readyURL string, doInput func(), topic string, count int, checkQueryAPI func()) ([]string, error) { type result struct { // data holds all of stdout on success. data []byte @@ -107,6 +111,11 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP ) // Send stderr to our stderr so the user can see any error messages. readCmd.Stderr = os.Stderr + + // Kill both processes before we exit. + defer func() { runCmd.Process.Kill() }() + defer func() { readCmd.Process.Kill() }() + // Run the command, read the messages and wait for a timeout in parallel. go func() { // Read all of stdout. @@ -131,14 +140,40 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP time.Sleep(timeout) done <- result{nil, fmt.Errorf("Timeout reading %d messages from topic %q", count, topic)} }() + + // Poll the HTTP listener of the process waiting for it to be ready to receive requests. + ready := make(chan struct{}) + go func() { + delay := 10 * time.Millisecond + for { + time.Sleep(delay) + if delay < 100*time.Millisecond { + delay *= 2 + } + resp, err := http.Get(readyURL) + if err != nil { + continue + } + if resp.StatusCode == 200 { + break + } + } + ready <- struct{}{} + }() + + // Wait for the roomserver to be ready to receive input or for it to crash. + select { + case <-ready: + case r := <-done: + return nil, r.err + } + + // Write the input now that the server is running. + doInput() + // Wait for one of the tasks to finsh. r := <-done - // Kill both processes. We don't check if the processes are running and - // we ignore failures since we are just trying to clean up before returning. - runCmd.Process.Kill() - readCmd.Process.Kill() - if r.err != nil { return nil, r.err } @@ -153,6 +188,20 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP return lines, nil } +func writeToRoomServer(input []string, roomserverURL string) error { + var request api.InputRoomEventsRequest + var response api.InputRoomEventsResponse + var err error + request.InputRoomEvents = make([]api.InputRoomEvent, len(input)) + for i := range input { + if err = json.Unmarshal([]byte(input[i]), &request.InputRoomEvents[i]); err != nil { + return err + } + } + x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil) + return x.InputRoomEvents(&request, &response) +} + // testRoomserver is used to run integration tests against a single roomserver. // It creates new kafka topics for the input and output of the roomserver. // It writes the input messages to the input kafka topic, formatting each message @@ -176,24 +225,22 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R panic(err) } - inputTopic := string(cfg.Kafka.Topics.InputRoomEvent) outputTopic := string(cfg.Kafka.Topics.OutputRoomEvent) - exe.DeleteTopic(inputTopic) - if err := exe.CreateTopic(inputTopic); err != nil { - panic(err) - } exe.DeleteTopic(outputTopic) if err := exe.CreateTopic(outputTopic); err != nil { panic(err) } - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { + if err = createDatabase(testDatabaseName); err != nil { panic(err) } - if err = createDatabase(testDatabaseName); err != nil { - panic(err) + doInput := func() { + fmt.Printf("Roomserver is ready to receive input, sending %d events\n", len(input)) + if err = writeToRoomServer(input, cfg.RoomServerURL()); err != nil { + panic(err) + } } cmd := exec.Command(filepath.Join(filepath.Dir(os.Args[0]), "dendrite-room-server")) @@ -205,7 +252,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R cmd.Stderr = os.Stderr cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)} - gotOutput, err := runAndReadFromTopic(cmd, outputTopic, len(wantOutput), func() { + gotOutput, err := runAndReadFromTopic(cmd, cfg.RoomServerURL()+"/metrics", doInput, outputTopic, len(wantOutput), func() { queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil) checkQueries(queryAPI) }) diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 311312a9d..4b362b5ff 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -19,13 +19,14 @@ import ( "crypto/sha256" "encoding/pem" "fmt" - "github.com/matrix-org/gomatrixserverlib" - "golang.org/x/crypto/ed25519" - "gopkg.in/yaml.v2" "io/ioutil" "path/filepath" "strings" "time" + + "github.com/matrix-org/gomatrixserverlib" + "golang.org/x/crypto/ed25519" + "gopkg.in/yaml.v2" ) // Version is the current version of the config format. @@ -95,8 +96,6 @@ type Dendrite struct { Addresses []string `yaml:"addresses"` // The names of the topics to use when reading and writing from kafka. Topics struct { - // Topic for roomserver/api.InputRoomEvent events. - InputRoomEvent Topic `yaml:"input_room_event"` // Topic for roomserver/api.OutputRoomEvent events. OutputRoomEvent Topic `yaml:"output_room_event"` // Topic for user updates (profile, presence) @@ -298,7 +297,6 @@ func (config *Dendrite) check() error { } checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) - checkNotEmpty("kafka.topics.input_room_event", string(config.Kafka.Topics.InputRoomEvent)) checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.device", string(config.Database.Device)) diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index 8c957e77a..e429d06b0 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -21,14 +21,15 @@ import ( "encoding/base64" "encoding/pem" "fmt" - "github.com/matrix-org/dendrite/common/config" - "github.com/matrix-org/gomatrixserverlib" - "gopkg.in/yaml.v2" "io/ioutil" "math/big" "os" "path/filepath" "time" + + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/gomatrixserverlib" + "gopkg.in/yaml.v2" ) const ( @@ -80,7 +81,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Kafka.Addresses = []string{kafkaURI} // TODO: Different servers should be using different topics. // Make this configurable somehow? - cfg.Kafka.Topics.InputRoomEvent = "test.room.input" cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" // TODO: Use different databases for the different schemas. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go deleted file mode 100644 index efe450381..000000000 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package input contains the code that writes -package input - -import ( - "encoding/json" - "fmt" - "sync/atomic" - - "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/roomserver/api" - sarama "gopkg.in/Shopify/sarama.v1" -) - -// A ConsumerDatabase has the storage APIs needed by the consumer. -type ConsumerDatabase interface { - RoomEventDatabase - common.PartitionStorer -} - -// An ErrorLogger handles the errors encountered by the consumer. -type ErrorLogger interface { - OnError(message *sarama.ConsumerMessage, err error) -} - -// A Consumer consumes a kafkaesque stream of room events. -// The room events are supplied as api.InputRoomEvent structs serialised as JSON. -// The events should be valid matrix events. -// The events needed to authenticate the event should already be stored on the roomserver. -// The events needed to construct the state at the event should already be stored on the roomserver. -// If the event is not valid then it will be discarded and an error will be logged. -type Consumer struct { - ContinualConsumer common.ContinualConsumer - // The database used to store the room events. - DB ConsumerDatabase - Producer sarama.SyncProducer - // The kafkaesque topic to output new room events to. - // This is the name used in kafka to identify the stream to write events to. - OutputRoomEventTopic string - // The ErrorLogger for this consumer. - // If left as nil then the consumer will panic when it encounters an error - ErrorLogger ErrorLogger - // If non-nil then the consumer will stop processing messages after this - // many messages and will shutdown. Malformed messages are included in the count. - StopProcessingAfter *int64 - // If not-nil then the consumer will call this to shutdown the server. - ShutdownCallback func(reason string) - // How many messages the consumer has processed. - processed int64 -} - -// WriteOutputRoomEvent implements OutputRoomEventWriter -func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { - var m sarama.ProducerMessage - oe := api.OutputEvent{ - Type: api.OutputTypeNewRoomEvent, - NewRoomEvent: &output, - } - value, err := json.Marshal(oe) - if err != nil { - return err - } - m.Topic = c.OutputRoomEventTopic - m.Key = sarama.StringEncoder("") - m.Value = sarama.ByteEncoder(value) - _, _, err = c.Producer.SendMessage(&m) - return err -} - -// Start starts the consumer consuming. -// Starts up a goroutine for each partition in the kafka stream. -// Returns nil once all the goroutines are started. -// Returns an error if it can't start consuming for any of the partitions. -func (c *Consumer) Start() error { - c.ContinualConsumer.ProcessMessage = c.processMessage - c.ContinualConsumer.ShutdownCallback = c.shutdown - return c.ContinualConsumer.Start() -} - -func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error { - var input api.InputRoomEvent - if err := json.Unmarshal(message.Value, &input); err != nil { - // If the message is invalid then log it and move onto the next message in the stream. - c.logError(message, err) - } else { - if err := processRoomEvent(c.DB, c, input); err != nil { - // If there was an error processing the message then log it and - // move onto the next message in the stream. - // TODO: If the error was due to a problem talking to the database - // then we shouldn't move onto the next message and we should either - // retry processing the message, or panic and kill ourselves. - c.logError(message, err) - } - } - // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. - processed := atomic.AddInt64(&c.processed, 1) - // Check if we should stop processing. - // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. - // If we try to stop processing after M message and we have N goroutines then we will process somewhere - // between M and (N + M) messages because the N goroutines could all try to process what they think will be the - // last message. We could be more careful here but this is good enough for getting rough benchmarks. - if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) { - return common.ErrShutdown - } - return nil -} - -func (c *Consumer) shutdown() { - if c.ShutdownCallback != nil { - c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed)) - } -} - -// logError is a convenience method for logging errors. -func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) { - if c.ErrorLogger == nil { - panic(err) - } - c.ErrorLogger.OnError(message, err) -} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index fcc5eb882..fcb414f02 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -16,12 +16,9 @@ package storage import ( "database/sql" - - "github.com/matrix-org/dendrite/common" ) type statements struct { - common.PartitionOffsetStatements eventTypeStatements eventStateKeyStatements roomStatements @@ -35,10 +32,6 @@ type statements struct { func (s *statements) prepare(db *sql.DB) error { var err error - if err = s.PartitionOffsetStatements.Prepare(db); err != nil { - return err - } - if err = s.eventTypeStatements.prepare(db); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 1dfc89d4c..50e2f44d6 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -18,7 +18,6 @@ import ( "database/sql" // Import the postgres database driver. _ "github.com/lib/pq" - "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -42,16 +41,6 @@ func Open(dataSourceName string) (*Database, error) { return &d, nil } -// PartitionOffsets implements input.ConsumerDatabase -func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { - return d.statements.SelectPartitionOffsets(topic) -} - -// SetPartitionOffset implements input.ConsumerDatabase -func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { - return d.statements.UpsertPartitionOffset(topic, partition, offset) -} - // StoreEvent implements input.EventDatabase func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) { var ( From ce311ce0fe33dae8f47d631428dd3b5149a25ce0 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 20 Jul 2017 13:06:14 +0100 Subject: [PATCH 3/4] Improve room creation (#164) --- .../dendrite/clientapi/events/eventcontent.go | 15 +++++++++++++++ .../dendrite/clientapi/writers/createroom.go | 10 +++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/events/eventcontent.go b/src/github.com/matrix-org/dendrite/clientapi/events/eventcontent.go index 2a97ee4ff..8fed23596 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/events/eventcontent.go +++ b/src/github.com/matrix-org/dendrite/clientapi/events/eventcontent.go @@ -28,6 +28,21 @@ type MemberContent struct { // TODO: ThirdPartyInvite string `json:"third_party_invite,omitempty"` } +// NameContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-name +type NameContent struct { + Name string `json:"name"` +} + +// TopicContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-topic +type TopicContent struct { + Topic string `json:"topic"` +} + +// GuestAccessContent is the event content for https://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-guest-access +type GuestAccessContent struct { + GuestAccess string `json:"guest_access"` +} + // JoinRulesContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-join-rules type JoinRulesContent struct { JoinRule string `json:"join_rule"` diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index d078982f4..0a2b185e8 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -124,7 +124,7 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite // 4- m.room.canonical_alias (opt) TODO // 5- m.room.join_rules // 6- m.room.history_visibility - // 7- m.room.guest_access (opt) TODO + // 7- m.room.guest_access (opt) // 8- other initial state items TODO // 9- m.room.name (opt) // 10- m.room.topic (opt) @@ -142,13 +142,13 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite // TODO: m.room.canonical_alias {"m.room.join_rules", "", events.JoinRulesContent{"public"}}, // FIXME: Allow this to be changed {"m.room.history_visibility", "", events.HistoryVisibilityContent{"joined"}}, // FIXME: Allow this to be changed - // TODO: m.room.guest_access + {"m.room.guest_access", "", events.GuestAccessContent{"can_join"}}, // FIXME: Allow this to be changed // TODO: Other initial state items - // TODO: m.room.name - // TODO: m.room.topic + {"m.room.name", "", events.NameContent{r.Name}}, // FIXME: Only send the name event if a name is supplied, to avoid sending a false room name removal event + {"m.room.topic", "", events.TopicContent{r.Topic}}, // TODO: invite events // TODO: 3pid invite events - // TODO m.room.aliases + // TODO: m.room.aliases } authEvents := gomatrixserverlib.NewAuthEvents(nil) From a904380e1bb685af03d49ad00a3ab80e728dc7f0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 24 Jul 2017 13:42:55 +0100 Subject: [PATCH 4/4] gb vendor update github.com/matrix-org/gomatrixserverlib --- vendor/manifest | 2 +- .../matrix-org/gomatrixserverlib/event.go | 33 ++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/vendor/manifest b/vendor/manifest index 95c4b6f8b..425cc8f1f 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -98,7 +98,7 @@ { "importpath": "github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib", - "revision": "30652b26ec2e83b97c941eb1c293bf7d67340f74", + "revision": "768a8767051a4aca7f5e41f912954ae04d5f1efb", "branch": "master" }, { diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go index 94de35189..e8c2de0f1 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go @@ -252,10 +252,41 @@ func (e Event) Redact() Event { // This is unreachable for events created with EventBuilder.Build or NewEventFromUntrustedJSON panic(fmt.Errorf("gomatrixserverlib: invalid event %v", err)) } - return Event{ + result := Event{ redacted: true, eventJSON: eventJSON, } + if err = json.Unmarshal(eventJSON, &result.fields); err != nil { + // This is unreachable for events created with EventBuilder.Build or NewEventFromUntrustedJSON + panic(fmt.Errorf("gomatrixserverlib: invalid event %v", err)) + } + return result +} + +// SetUnsigned sets the unsigned key of the event. +// Returns a copy of the event with the "unsigned" key set. +func (e Event) SetUnsigned(unsigned interface{}) (Event, error) { + var eventAsMap map[string]rawJSON + var err error + if err = json.Unmarshal(e.eventJSON, &eventAsMap); err != nil { + return Event{}, err + } + unsignedJSON, err := json.Marshal(unsigned) + if err != nil { + return Event{}, err + } + eventAsMap["unsigned"] = unsignedJSON + eventJSON, err := json.Marshal(eventAsMap) + if err != nil { + return Event{}, err + } + if eventJSON, err = CanonicalJSON(eventJSON); err != nil { + return Event{}, err + } + result := e + result.eventJSON = eventJSON + result.fields.Unsigned = unsignedJSON + return result, nil } // EventReference returns an EventReference for the event.