From 57b7097368f0be9d89ec740d1f6528322784b0e6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Aug 2017 16:37:11 +0100 Subject: [PATCH 1/6] Add input API for adding invites to the roomserver. (#187) * Add input API for adding invites to the roomserver. This API handles invites received over federation that occur outside of a room. * Add some docstring for withTransaction * Use a nicer pattern for wrapping transactions * Fix MembershipUpdater method to not commit the transaction before returning it * Use the Transaction interface from common --- .../matrix-org/dendrite/common/sql.go | 36 +++++++--- .../dendrite/roomserver/api/input.go | 10 ++- .../dendrite/roomserver/input/events.go | 72 ++++++++++++++++--- .../dendrite/roomserver/input/input.go | 5 ++ .../roomserver/input/latest_events.go | 23 +++--- .../roomserver/storage/rooms_table.go | 16 +++-- .../dendrite/roomserver/storage/storage.go | 44 ++++++++++-- .../dendrite/roomserver/types/types.go | 13 +--- 8 files changed, 166 insertions(+), 53 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/common/sql.go b/src/github.com/matrix-org/dendrite/common/sql.go index cabbe6662..4abe7410e 100644 --- a/src/github.com/matrix-org/dendrite/common/sql.go +++ b/src/github.com/matrix-org/dendrite/common/sql.go @@ -18,6 +18,24 @@ import ( "database/sql" ) +// A Transaction is something that can be committed or rolledback. +type Transaction interface { + // Commit the transaction + Commit() error + // Rollback the transaction. + Rollback() error +} + +// EndTransaction ends a transaction. +// If the transaction succeeded then it is committed, otherwise it is rolledback. +func EndTransaction(txn Transaction, succeeded *bool) { + if *succeeded { + txn.Commit() + } else { + txn.Rollback() + } +} + // WithTransaction runs a block of code passing in an SQL transaction // If the code returns an error or panics then the transactions is rolledback // Otherwise the transaction is committed. @@ -26,16 +44,14 @@ func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { if err != nil { return } - defer func() { - if r := recover(); r != nil { - txn.Rollback() - panic(r) - } else if err != nil { - txn.Rollback() - } else { - err = txn.Commit() - } - }() + succeeded := false + defer EndTransaction(txn, &succeeded) + err = fn(txn) + if err != nil { + return + } + + succeeded = true return } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index 558eb28c4..cbe7399ba 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -68,9 +68,17 @@ type InputRoomEvent struct { SendAsServer string `json:"send_as_server"` } +// InputInviteEvent is a matrix invite event received over federation without +// the usual context a matrix room event would have. We usually do not have +// access to the events needed to check the event auth rules for the invite. +type InputInviteEvent struct { + Event gomatrixserverlib.Event `json:"event"` +} + // InputRoomEventsRequest is a request to InputRoomEvents type InputRoomEventsRequest struct { - InputRoomEvents []InputRoomEvent `json:"input_room_events"` + InputRoomEvents []InputRoomEvent `json:"input_room_events"` + InputInviteEvents []InputInviteEvent `json:"input_invite_events"` } // InputRoomEventsResponse is a response to InputRoomEvents diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index c1eee4c96..82b4652e6 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -15,6 +15,9 @@ package input import ( + "fmt" + + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" @@ -39,6 +42,8 @@ type RoomEventDatabase interface { GetLatestEventsForUpdate(roomNID types.RoomNID) (updater types.RoomRecentEventsUpdater, err error) // Lookup the string event IDs for a list of numeric event IDs EventIDs(eventNIDs []types.EventNID) (map[types.EventNID]string, error) + // Build a membership updater for the target user in a room. + MembershipUpdater(roomID, targerUserID string) (types.MembershipUpdater, error) } // OutputRoomEventWriter has the APIs needed to write an event to the output logs. @@ -103,13 +108,64 @@ func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api. return err } - // TODO: - // * Caculate the new current state for the room if the forward extremities have changed. - // * Work out the delta between the new current state and the previous current state. - // * Work out the visibility of the event. - // * Write a message to the output logs containing: - // - The event itself - // - The visiblity of the event, i.e. who is allowed to see the event. - // - The changes to the current state of the room. + return nil +} + +func processInviteEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputInviteEvent) (err error) { + if input.Event.StateKey() == nil { + return fmt.Errorf("invite must be a state event") + } + + roomID := input.Event.RoomID() + targetUserID := *input.Event.StateKey() + + updater, err := db.MembershipUpdater(roomID, targetUserID) + if err != nil { + return err + } + succeeded := false + defer common.EndTransaction(updater, &succeeded) + + if updater.IsJoin() { + // If the user is joined to the room then that takes precedence over this + // invite event. It makes little sense to move a user that is already + // joined to the room into the invite state. + // This could plausibly happen if an invite request raced with a join + // request for a user. For example if a user was invited to a public + // room and they joined the room at the same time as the invite was sent. + // The other way this could plausibly happen is if an invite raced with + // a kick. For example if a user was kicked from a room in error and in + // response someone else in the room re-invited them then it is possible + // for the invite request to race with the leave event so that the + // target receives invite before it learns that it has been kicked. + // There are a few ways this could be plausibly handled in the roomserver. + // 1) Store the invite, but mark it as retired. That will result in the + // permanent rejection of that invite event. So even if the target + // user leaves the room and the invite is retransmitted it will be + // ignored. However a new invite with a new event ID would still be + // accepted. + // 2) Silently discard the invite event. This means that if the event + // was retransmitted at a later date after the target user had left + // the room we would accept the invite. However since we hadn't told + // the sending server that the invite had been discarded it would + // have no reason to attempt to retry. + // 3) Signal the sending server that the user is already joined to the + // room. + // For now we will implement option 2. Since in the abesence of a retry + // mechanism it will be equivalent to option 1, and we don't have a + // signalling mechanism to implement option 3. + return nil + } + + outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil) + if err != nil { + return err + } + + if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil { + return err + } + + succeeded = true return nil } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index 210abfa29..17e94599e 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -61,6 +61,11 @@ func (r *RoomserverInputAPI) InputRoomEvents( return err } } + for i := range request.InputInviteEvents { + if err := processInviteEvent(r.DB, r, request.InputInviteEvents[i]); err != nil { + return err + } + } return nil } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 9328ecf3b..d9aa2b455 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -17,6 +17,7 @@ package input import ( "bytes" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" @@ -52,25 +53,19 @@ func updateLatestEvents( if err != nil { return } - defer func() { - if err == nil { - // Commit if there wasn't an error. - // Set the returned err value if we encounter an error committing. - // This only works because err is a named return. - err = updater.Commit() - } else { - // Ignore any error we get rolling back since we don't want to - // clobber the current error - // TODO: log the error here. - updater.Rollback() - } - }() + succeeded := false + defer common.EndTransaction(updater, &succeeded) u := latestEventsUpdater{ db: db, updater: updater, ow: ow, roomNID: roomNID, stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, } - return u.doUpdateLatestEvents() + if err = u.doUpdateLatestEvents(); err != nil { + return err + } + + succeeded = true + return } // latestEventsUpdater tracks the state used to update the latest events in the diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go index 03cacd7db..24744fdff 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go @@ -80,15 +80,23 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { }.prepare(db) } -func (s *roomStatements) insertRoomNID(roomID string) (types.RoomNID, error) { +func (s *roomStatements) insertRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { var roomNID int64 - err := s.insertRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) + stmt := s.insertRoomNIDStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + err := stmt.QueryRow(roomID).Scan(&roomNID) return types.RoomNID(roomNID), err } -func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) { +func (s *roomStatements) selectRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { var roomNID int64 - err := s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) + stmt := s.selectRoomNIDStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + err := stmt.QueryRow(roomID).Scan(&roomNID) return types.RoomNID(roomNID), err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 17b30860c..fbbc723ee 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -53,7 +53,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ err error ) - if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil { + if roomNID, err = d.assignRoomNID(nil, event.RoomID()); err != nil { return 0, types.StateAtEvent{}, err } @@ -104,15 +104,15 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ }, nil } -func (d *Database) assignRoomNID(roomID string) (types.RoomNID, error) { +func (d *Database) assignRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { // Check if we already have a numeric ID in the database. - roomNID, err := d.statements.selectRoomNID(roomID) + roomNID, err := d.statements.selectRoomNID(txn, roomID) if err == sql.ErrNoRows { // We don't have a numeric ID so insert one into the database. - roomNID, err = d.statements.insertRoomNID(roomID) + roomNID, err = d.statements.insertRoomNID(txn, roomID) if err == sql.ErrNoRows { // We raced with another insert so run the select again. - roomNID, err = d.statements.selectRoomNID(roomID) + roomNID, err = d.statements.selectRoomNID(txn, roomID) } } return roomNID, err @@ -329,7 +329,7 @@ func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventSta // RoomNID implements query.RoomserverQueryAPIDB func (d *Database) RoomNID(roomID string) (types.RoomNID, error) { - roomNID, err := d.statements.selectRoomNID(roomID) + roomNID, err := d.statements.selectRoomNID(nil, roomID) if err == sql.ErrNoRows { return 0, nil } @@ -380,6 +380,38 @@ func (d *Database) StateEntriesForTuples( return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples) } +// MembershipUpdater implements input.RoomEventDatabase +func (d *Database) MembershipUpdater(roomID, targetUserID string) (types.MembershipUpdater, error) { + txn, err := d.db.Begin() + if err != nil { + return nil, err + } + succeeded := false + defer func() { + if !succeeded { + txn.Rollback() + } + }() + + roomNID, err := d.assignRoomNID(txn, roomID) + if err != nil { + return nil, err + } + + targetUserNID, err := d.assignStateKeyNID(txn, targetUserID) + if err != nil { + return nil, err + } + + updater, err := d.membershipUpdaterTxn(txn, roomNID, targetUserNID) + if err != nil { + return nil, err + } + + succeeded = true + return updater, nil +} + type membershipUpdater struct { transaction d *Database diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index e8bc99fcf..d5fe32762 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -16,6 +16,7 @@ package types import ( + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" ) @@ -135,14 +136,6 @@ type StateEntryList struct { StateEntries []StateEntry } -// A Transaction is something that can be committed or rolledback. -type Transaction interface { - // Commit the transaction - Commit() error - // Rollback the transaction. - Rollback() error -} - // A RoomRecentEventsUpdater is used to update the recent events in a room. // (On postgresql this wraps a database transaction that holds a "FOR UPDATE" // lock on the row in the rooms table holding the latest events for the room.) @@ -175,7 +168,7 @@ type RoomRecentEventsUpdater interface { // It will share the same transaction as this updater. MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error) // Implements Transaction so it can be committed or rolledback - Transaction + common.Transaction } // A MembershipUpdater is used to update the membership of a user in a room. @@ -200,7 +193,7 @@ type MembershipUpdater interface { // Returns a list of invite event IDs that this state change retired. SetToLeave(senderUserID string, eventID string) (inviteEventIDs []string, err error) // Implements Transaction so it can be committed or rolledback. - Transaction + common.Transaction } // A MissingEventError is an error that happened because the roomserver was From 808c2e09f6d1d93cd01bb36d75ba2a589bc06933 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Aug 2017 17:20:23 +0100 Subject: [PATCH 2/6] Make txn *sql.Tx arguments optional everywhere using a utility function (#191) * Make txn *sql.Tx arguments optional everywhere using a utility function * Clarify that if the txn is nil the stmt will run outside a transaction --- .../matrix-org/dendrite/common/sql.go | 11 +++++++++++ .../storage/joined_hosts_table.go | 7 ++++--- .../federationsender/storage/room_table.go | 8 +++++--- .../storage/event_state_keys_table.go | 19 ++++--------------- .../roomserver/storage/events_table.go | 9 +++++---- .../roomserver/storage/invite_table.go | 5 +++-- .../roomserver/storage/membership_table.go | 7 ++++--- .../storage/previous_events_table.go | 5 +++-- .../roomserver/storage/rooms_table.go | 17 +++++------------ .../storage/current_room_state_table.go | 11 ++++++----- .../storage/output_room_events_table.go | 17 +++++------------ 11 files changed, 55 insertions(+), 61 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/common/sql.go b/src/github.com/matrix-org/dendrite/common/sql.go index 4abe7410e..c2fb753fc 100644 --- a/src/github.com/matrix-org/dendrite/common/sql.go +++ b/src/github.com/matrix-org/dendrite/common/sql.go @@ -55,3 +55,14 @@ func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { succeeded = true return } + +// TxStmt wraps an SQL stmt inside an optional transaction. +// If the transaction is nil then it returns the original statement that will +// run outside of a transaction. +// Otherwise returns a copy of the statement that will run inside the transaction. +func TxStmt(transaction *sql.Tx, statement *sql.Stmt) *sql.Stmt { + if transaction != nil { + statement = transaction.Stmt(statement) + } + return statement +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go index 7ba1b0b07..fffcc7f3f 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go @@ -18,6 +18,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -79,18 +80,18 @@ func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { func (s *joinedHostsStatements) insertJoinedHosts( txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName, ) error { - _, err := txn.Stmt(s.insertJoinedHostsStmt).Exec(roomID, eventID, serverName) + _, err := common.TxStmt(txn, s.insertJoinedHostsStmt).Exec(roomID, eventID, serverName) return err } func (s *joinedHostsStatements) deleteJoinedHosts(txn *sql.Tx, eventIDs []string) error { - _, err := txn.Stmt(s.deleteJoinedHostsStmt).Exec(pq.StringArray(eventIDs)) + _, err := common.TxStmt(txn, s.deleteJoinedHostsStmt).Exec(pq.StringArray(eventIDs)) return err } func (s *joinedHostsStatements) selectJoinedHosts(txn *sql.Tx, roomID string, ) ([]types.JoinedHost, error) { - rows, err := txn.Stmt(s.selectJoinedHostsStmt).Query(roomID) + rows, err := common.TxStmt(txn, s.selectJoinedHostsStmt).Query(roomID) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go index daac7ddf4..bcc0bb1df 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go @@ -16,6 +16,8 @@ package storage import ( "database/sql" + + "github.com/matrix-org/dendrite/common" ) const roomSchema = ` @@ -65,7 +67,7 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { // insertRoom inserts the room if it didn't already exist. // If the room didn't exist then last_event_id is set to the empty string. func (s *roomStatements) insertRoom(txn *sql.Tx, roomID string) error { - _, err := txn.Stmt(s.insertRoomStmt).Exec(roomID) + _, err := common.TxStmt(txn, s.insertRoomStmt).Exec(roomID) return err } @@ -74,7 +76,7 @@ func (s *roomStatements) insertRoom(txn *sql.Tx, roomID string) error { // exists by calling insertRoom first. func (s *roomStatements) selectRoomForUpdate(txn *sql.Tx, roomID string) (string, error) { var lastEventID string - err := txn.Stmt(s.selectRoomForUpdateStmt).QueryRow(roomID).Scan(&lastEventID) + err := common.TxStmt(txn, s.selectRoomForUpdateStmt).QueryRow(roomID).Scan(&lastEventID) if err != nil { return "", err } @@ -84,6 +86,6 @@ func (s *roomStatements) selectRoomForUpdate(txn *sql.Tx, roomID string) (string // updateRoom updates the last_event_id for the room. selectRoomForUpdate should // have already been called earlier within the transaction. func (s *roomStatements) updateRoom(txn *sql.Tx, roomID, lastEventID string) error { - _, err := txn.Stmt(s.updateRoomStmt).Exec(roomID, lastEventID) + _, err := common.TxStmt(txn, s.updateRoomStmt).Exec(roomID, lastEventID) return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go index b4dae8f25..b06f5b2a5 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go @@ -18,6 +18,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -92,21 +93,13 @@ func (s *eventStateKeyStatements) prepare(db *sql.DB) (err error) { func (s *eventStateKeyStatements) insertEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) { var eventStateKeyNID int64 - stmt := s.insertEventStateKeyNIDStmt - if txn != nil { - stmt = txn.Stmt(stmt) - } - err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) + err := common.TxStmt(txn, s.insertEventStateKeyNIDStmt).QueryRow(eventStateKey).Scan(&eventStateKeyNID) return types.EventStateKeyNID(eventStateKeyNID), err } func (s *eventStateKeyStatements) selectEventStateKeyNID(txn *sql.Tx, eventStateKey string) (types.EventStateKeyNID, error) { var eventStateKeyNID int64 - stmt := s.selectEventStateKeyNIDStmt - if txn != nil { - stmt = txn.Stmt(stmt) - } - err := stmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) + err := common.TxStmt(txn, s.selectEventStateKeyNIDStmt).QueryRow(eventStateKey).Scan(&eventStateKeyNID) return types.EventStateKeyNID(eventStateKeyNID), err } @@ -131,11 +124,7 @@ func (s *eventStateKeyStatements) bulkSelectEventStateKeyNID(eventStateKeys []st func (s *eventStateKeyStatements) selectEventStateKey(txn *sql.Tx, eventStateKeyNID types.EventStateKeyNID) (string, error) { var eventStateKey string - stmt := s.selectEventStateKeyStmt - if txn != nil { - stmt = txn.Stmt(stmt) - } - err := stmt.QueryRow(eventStateKeyNID).Scan(&eventStateKey) + err := common.TxStmt(txn, s.selectEventStateKeyStmt).QueryRow(eventStateKeyNID).Scan(&eventStateKey) return eventStateKey, err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index b6db15c82..2d2b85625 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -253,22 +254,22 @@ func (s *eventStatements) updateEventState(eventNID types.EventNID, stateNID typ } func (s *eventStatements) selectEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) (sentToOutput bool, err error) { - err = txn.Stmt(s.selectEventSentToOutputStmt).QueryRow(int64(eventNID)).Scan(&sentToOutput) + err = common.TxStmt(txn, s.selectEventSentToOutputStmt).QueryRow(int64(eventNID)).Scan(&sentToOutput) return } func (s *eventStatements) updateEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) error { - _, err := txn.Stmt(s.updateEventSentToOutputStmt).Exec(int64(eventNID)) + _, err := common.TxStmt(txn, s.updateEventSentToOutputStmt).Exec(int64(eventNID)) return err } func (s *eventStatements) selectEventID(txn *sql.Tx, eventNID types.EventNID) (eventID string, err error) { - err = txn.Stmt(s.selectEventIDStmt).QueryRow(int64(eventNID)).Scan(&eventID) + err = common.TxStmt(txn, s.selectEventIDStmt).QueryRow(int64(eventNID)).Scan(&eventID) return } func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) { - rows, err := txn.Stmt(s.bulkSelectStateAtEventAndReferenceStmt).Query(eventNIDsAsArray(eventNIDs)) + rows, err := common.TxStmt(txn, s.bulkSelectStateAtEventAndReferenceStmt).Query(eventNIDsAsArray(eventNIDs)) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go index 9e0860b42..8bae2b781 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go @@ -17,6 +17,7 @@ package storage import ( "database/sql" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -94,7 +95,7 @@ func (s *inviteStatements) insertInviteEvent( targetUserNID, senderUserNID types.EventStateKeyNID, inviteEventJSON []byte, ) (bool, error) { - result, err := txn.Stmt(s.insertInviteEventStmt).Exec( + result, err := common.TxStmt(txn, s.insertInviteEventStmt).Exec( inviteEventID, roomNID, targetUserNID, senderUserNID, inviteEventJSON, ) if err != nil { @@ -110,7 +111,7 @@ func (s *inviteStatements) insertInviteEvent( func (s *inviteStatements) updateInviteRetired( txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, ) ([]string, error) { - rows, err := txn.Stmt(s.updateInviteRetiredStmt).Query(roomNID, targetUserNID) + rows, err := common.TxStmt(txn, s.updateInviteRetiredStmt).Query(roomNID, targetUserNID) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go index 52051af59..6edc7a528 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go @@ -17,6 +17,7 @@ package storage import ( "database/sql" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -115,14 +116,14 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) { func (s *membershipStatements) insertMembership( txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, ) error { - _, err := txn.Stmt(s.insertMembershipStmt).Exec(roomNID, targetUserNID) + _, err := common.TxStmt(txn, s.insertMembershipStmt).Exec(roomNID, targetUserNID) return err } func (s *membershipStatements) selectMembershipForUpdate( txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, ) (membership membershipState, err error) { - err = txn.Stmt(s.selectMembershipForUpdateStmt).QueryRow( + err = common.TxStmt(txn, s.selectMembershipForUpdateStmt).QueryRow( roomNID, targetUserNID, ).Scan(&membership) return @@ -179,7 +180,7 @@ func (s *membershipStatements) updateMembership( senderUserNID types.EventStateKeyNID, membership membershipState, eventNID types.EventNID, ) error { - _, err := txn.Stmt(s.updateMembershipStmt).Exec( + _, err := common.TxStmt(txn, s.updateMembershipStmt).Exec( roomNID, targetUserNID, senderUserNID, membership, eventNID, ) return err diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/previous_events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/previous_events_table.go index 71795d488..9fcf1cb5c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/previous_events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/previous_events_table.go @@ -17,6 +17,7 @@ package storage import ( "database/sql" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -73,7 +74,7 @@ func (s *previousEventStatements) prepare(db *sql.DB) (err error) { } func (s *previousEventStatements) insertPreviousEvent(txn *sql.Tx, previousEventID string, previousEventReferenceSHA256 []byte, eventNID types.EventNID) error { - _, err := txn.Stmt(s.insertPreviousEventStmt).Exec(previousEventID, previousEventReferenceSHA256, int64(eventNID)) + _, err := common.TxStmt(txn, s.insertPreviousEventStmt).Exec(previousEventID, previousEventReferenceSHA256, int64(eventNID)) return err } @@ -81,5 +82,5 @@ func (s *previousEventStatements) insertPreviousEvent(txn *sql.Tx, previousEvent // Returns sql.ErrNoRows if the event reference doesn't exist. func (s *previousEventStatements) selectPreviousEventExists(txn *sql.Tx, eventID string, eventReferenceSHA256 []byte) error { var ok int64 - return txn.Stmt(s.selectPreviousEventExistsStmt).QueryRow(eventID, eventReferenceSHA256).Scan(&ok) + return common.TxStmt(txn, s.selectPreviousEventExistsStmt).QueryRow(eventID, eventReferenceSHA256).Scan(&ok) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go index 24744fdff..4ba329f39 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go @@ -18,6 +18,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -82,21 +83,13 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { func (s *roomStatements) insertRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { var roomNID int64 - stmt := s.insertRoomNIDStmt - if txn != nil { - stmt = txn.Stmt(stmt) - } - err := stmt.QueryRow(roomID).Scan(&roomNID) + err := common.TxStmt(txn, s.insertRoomNIDStmt).QueryRow(roomID).Scan(&roomNID) return types.RoomNID(roomNID), err } func (s *roomStatements) selectRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { var roomNID int64 - stmt := s.selectRoomNIDStmt - if txn != nil { - stmt = txn.Stmt(stmt) - } - err := stmt.QueryRow(roomID).Scan(&roomNID) + err := common.TxStmt(txn, s.selectRoomNIDStmt).QueryRow(roomID).Scan(&roomNID) return types.RoomNID(roomNID), err } @@ -120,7 +113,7 @@ func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID ty var nids pq.Int64Array var lastEventSentNID int64 var stateSnapshotNID int64 - err := txn.Stmt(s.selectLatestEventNIDsForUpdateStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID, &stateSnapshotNID) + err := common.TxStmt(txn, s.selectLatestEventNIDsForUpdateStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID, &stateSnapshotNID) if err != nil { return nil, 0, 0, err } @@ -135,7 +128,7 @@ func (s *roomStatements) updateLatestEventNIDs( txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID, ) error { - _, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec( + _, err := common.TxStmt(txn, s.updateLatestEventNIDsStmt).Exec( roomNID, eventNIDsAsArray(eventNIDs), int64(lastEventSentNID), int64(stateSnapshotNID), ) return err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 9958e0d15..10933e965 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -18,6 +18,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" ) @@ -136,7 +137,7 @@ func (s *currentRoomStateStatements) selectJoinedUsers() (map[string][]string, e // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. func (s *currentRoomStateStatements) selectRoomIDsWithMembership(txn *sql.Tx, userID, membership string) ([]string, error) { - rows, err := txn.Stmt(s.selectRoomIDsWithMembershipStmt).Query(userID, membership) + rows, err := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt).Query(userID, membership) if err != nil { return nil, err } @@ -155,7 +156,7 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(txn *sql.Tx, us // CurrentState returns all the current state events for the given room. func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID string) ([]gomatrixserverlib.Event, error) { - rows, err := txn.Stmt(s.selectCurrentStateStmt).Query(roomID) + rows, err := common.TxStmt(txn, s.selectCurrentStateStmt).Query(roomID) if err != nil { return nil, err } @@ -165,21 +166,21 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri } func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error { - _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) + _, err := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt).Exec(eventID) return err } func (s *currentRoomStateStatements) upsertRoomState( txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64, ) error { - _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( + _, err := common.TxStmt(txn, s.upsertRoomStateStmt).Exec( event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt, ) return err } func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { - rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs)) + rows, err := common.TxStmt(txn, s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs)) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index f3c46298a..93774d1f1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -19,6 +19,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -105,7 +106,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { func (s *outputRoomEventsStatements) selectStateInRange( txn *sql.Tx, oldPos, newPos types.StreamPosition, ) (map[string]map[string]bool, map[string]streamEvent, error) { - rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos) + rows, err := common.TxStmt(txn, s.selectStateInRangeStmt).Query(oldPos, newPos) if err != nil { return nil, nil, err } @@ -167,12 +168,8 @@ func (s *outputRoomEventsStatements) selectStateInRange( // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. func (s *outputRoomEventsStatements) selectMaxID(txn *sql.Tx) (id int64, err error) { - stmt := s.selectMaxIDStmt - if txn != nil { - stmt = txn.Stmt(stmt) - } var nullableID sql.NullInt64 - err = stmt.QueryRow().Scan(&nullableID) + err = common.TxStmt(txn, s.selectMaxIDStmt).QueryRow().Scan(&nullableID) if nullableID.Valid { id = nullableID.Int64 } @@ -182,7 +179,7 @@ func (s *outputRoomEventsStatements) selectMaxID(txn *sql.Tx) (id int64, err err // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position // of the inserted event. func (s *outputRoomEventsStatements) insertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) { - err = txn.Stmt(s.insertEventStmt).QueryRow( + err = common.TxStmt(txn, s.insertEventStmt).QueryRow( event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), ).Scan(&streamPos) return @@ -209,11 +206,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // from the database. func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { - stmt := s.selectEventsStmt - if txn != nil { - stmt = txn.Stmt(stmt) - } - rows, err := stmt.Query(pq.StringArray(eventIDs)) + rows, err := common.TxStmt(txn, s.selectEventsStmt).Query(pq.StringArray(eventIDs)) if err != nil { return nil, err } From 46877b6baa96e2e178b456f646210dfb7f8d1f1b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 21 Aug 2017 17:34:05 +0100 Subject: [PATCH 3/6] Add installation instructions to repo (#192) --- INSTALL.md | 177 +++++++++++++++++++++++++++++++++++++++++++ README.md | 5 ++ dendrite-config.yaml | 2 +- 3 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 INSTALL.md diff --git a/INSTALL.md b/INSTALL.md new file mode 100644 index 000000000..668c9c13c --- /dev/null +++ b/INSTALL.md @@ -0,0 +1,177 @@ +# Installing Dendrite + +Dendrite can be run in one of two configurations: + + * A cluster of individual components, dealing with different aspects of the + Matrix protocol (see [WIRING.md](./WIRING.md)). Components communicate with + one another via [Apache Kafka](https://kafka.apache.org). + + * A monolith server, in which all components run in the same process. In this + configuration, Kafka can be replaced with an in-process implementation + called [naffka](https://github.com/matrix-org/naffka). + +## Requirements + + - Go 1.8+ + - Postgres 9.5+ + - For Kafka (optional if using the monolith server): + - Unix-based system (https://kafka.apache.org/documentation/#os) + - JDK 1.8+ / OpenJDK 1.8+ + - Apache Kafka 0.10.2+ (see https://github.com/matrix-org/dendrite/blob/master/travis-install-kafka.sh for up-to-date version numbers) + + +## Setting up a development environment + +Assumes Go 1.8 and JDK 1.8 are already installed and are on PATH. + +```bash +# Get the code +git clone https://github.com/matrix-org/dendrite +cd dendrite + +# Build it +go get github.com/constabulary/gb/... +gb build +``` + +If using Kafka, install and start it: +```bash +MIRROR=http://apache.mirror.anlx.net/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz + +# Only download the kafka if it isn't already downloaded. +test -f kafka.tgz || wget $MIRROR -O kafka.tgz +# Unpack the kafka over the top of any existing installation +mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1 + +# Start the zookeeper running in the background. +# By default the zookeeper listens on localhost:2181 +kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties + +# Start the kafka server running in the background. +# By default the kafka listens on localhost:9092 +kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties +``` + +## Configuration + +### Postgres database setup + +Dendrite requires a postgres database engine, version 9.5 or later. + +* Create role: + ```bash + sudo -u postgres createuser -P dendrite # prompts for password + ``` +* Create databases: + ```bash + for i in account device mediaapi syncapi roomserver serverkey federationsender; do + sudo -u postgres createdb -O dendrite dendrite_$i + done + ``` + +### Crypto key generation + +Generate the keys (unlike synapse, dendrite doesn't autogen yet): + +```bash +# Generate a self-signed SSL cert for federation: +test -f server.key || openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 3650 -nodes -subj /CN=localhost + +# generate ed25519 signing key +test -f matrix_key.pem || python3 > matrix_key.pem <| dendrite-sync-api-server | || + | +--------------------------+ +----------------------+ +Matrix +------------------+ | :7773 query API | dendrite-room-server |--DB:roomserver +Clients --->| client-api-proxy |---+ +----------------->+----------------------+ + +------------------+ | | :7770 ^^ + :8008 | CS API +----------------------------+ || + +--------->| dendrite-client-api-server |===================|| + | +----------------------------+ roomserver_input_topic_dev + | :7771 + | + | /media +---------------------------+ + +--------->| dendrite-media-api-server | + +---------------------------+ + :7774 + + + A --> B = HTTP requests (A = client, B = server) + A ==> B = Kafka (A = producer, B = consumer) +``` + +### Run a client api proxy + +This is what Matrix clients will talk to. If you use the script below, point your client at `http://localhost:8008`. + +```bash +#!/bin/bash + +./bin/client-api-proxy \ +--bind-address ":8008" \ +--sync-api-server-url "http://localhost:7773" \ +--client-api-server-url "http://localhost:7771" \ +--media-api-server-url "http://localhost:7774" +``` + +### Run a client api + +This is what implements message sending. Clients talk to this via the proxy in order to send messages. + +```bash +./bin/dendrite-client-api-server --config=dendrite.yaml +``` + +(If this fails with `pq: syntax error at or near "ON"`, check you are using at least postgres 9.5.) + +### Run a room server + +This is what implements the room DAG. Clients do not talk to this. + +```bash +./bin/dendrite-room-server --config=dendrite.yaml +``` + +### Run a sync server + +This is what implements `/sync` requests. Clients talk to this via the proxy in order to receive messages. + +```bash +./bin/dendrite-sync-api-server --config dendrite.yaml +``` + +### Run a media server + +This implements `/media` requests. Clients talk to this via the proxy in order to upload and retrieve media. + +```bash +./bin/dendrite-media-api-server --config dendrite.yaml +``` diff --git a/README.md b/README.md index 98d31d718..123c1207b 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,11 @@ Dendrite will be a matrix homeserver written in go. +# Install + +Dendrite is still very much a work in progress, but those wishing to work on it +may be interested in the installation instructions in [INSTALL.md](INSTALL.md). + # Design ## Log Based Architecture diff --git a/dendrite-config.yaml b/dendrite-config.yaml index b19add182..8275ac4e0 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -11,7 +11,7 @@ matrix: # The path to the PEM formatted matrix private key. private_key: "/etc/dendrite/matrix_key.pem" # The x509 certificates used by the federation listeners for this server - federation_certificates: ["/etc/dendrite/federation_tls.pem"] + federation_certificates: ["/etc/dendrite/server.pem"] # The media repository config media: From fc86821a9074fbe471f87ab92fbebb41a8c9c373 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 22 Aug 2017 11:01:14 +0100 Subject: [PATCH 4/6] notes on running monolith server (#193) * notes on running monolith server * Clarify default ports for monolith server --- INSTALL.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/INSTALL.md b/INSTALL.md index 668c9c13c..044aedf7c 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -97,7 +97,17 @@ Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Th ## Starting a monolith server -TODO +It is possible to use 'naffka' as an in-process replacement to Kafka when using +the monolith server. To do this, set `use_naffka: true` in `dendrite.yaml`. + +The monolith server can be started as shown below. By default it listens for +HTTP connections on port 8008, so point your client at +`http://localhost:8008`. If you set `--tls-cert` and `--tls-key` as shown +below, it will also listen for HTTPS connections on port 8448. + +```bash +./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key +``` ## Starting a multiprocess server From b15ce900abe2309c2d8ce73c1be9254bc772490c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 22 Aug 2017 11:12:51 +0100 Subject: [PATCH 5/6] Implement public rooms APIs (#185) * Move events contents to common * Basic database structure * Complete database update * Support visibility update and retrieval * Add HTTP methods for visibility update and retrieval * Add the database for the new component * Add a listener for the new component * Fix attribute update statements * Create public rooms component * Fix failing test * Add roomserver consumer * Fix a bug in aliases creation * Add a check on type * Implement public rooms directory * Use auth API for visibility update * Support filtering * Add component to monolith * Various fixes * Fix computation of next public rooms batch * Retrieve state events from the roomserver query API + avoid dupes on join * Split update of string or boolean attribute in two separate functions * Use event type to detect duplicate joins * Improve the joined members counter computation * Use event.RoomID() --- dendrite-config.yaml | 2 + .../dendrite/clientapi/readers/profile.go | 3 +- .../dendrite/clientapi/routing/routing.go | 14 - .../dendrite/clientapi/writers/createroom.go | 18 +- .../dendrite/clientapi/writers/membership.go | 3 +- .../dendrite/cmd/client-api-proxy/main.go | 33 ++- .../cmd/dendrite-monolith-server/main.go | 18 ++ .../dendrite-public-rooms-api-server/main.go | 85 ++++++ .../dendrite/common/config/config.go | 4 + .../events => common}/eventcontent.go | 28 +- .../matrix-org/dendrite/common/test/config.go | 2 + .../matrix-org/dendrite/common/test/server.go | 1 + .../dendrite/publicroomsapi/README.md | 5 + .../publicroomsapi/consumers/roomserver.go | 101 +++++++ .../publicroomsapi/directory/directory.go | 73 +++++ .../publicroomsapi/directory/public_rooms.go | 113 ++++++++ .../publicroomsapi/routing/routing.go | 51 ++++ .../publicroomsapi/storage/prepare.go | 35 +++ .../storage/public_rooms_table.go | 268 ++++++++++++++++++ .../publicroomsapi/storage/storage.go | 247 ++++++++++++++++ .../dendrite/publicroomsapi/types/types.go | 28 ++ .../roomserver/storage/room_aliases_table.go | 2 +- 22 files changed, 1098 insertions(+), 36 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go rename src/github.com/matrix-org/dendrite/{clientapi/events => common}/eventcontent.go (77%) create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/README.md create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/directory/directory.go create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/directory/public_rooms.go create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/routing/routing.go create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/storage/prepare.go create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/storage/public_rooms_table.go create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/types/types.go diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 8275ac4e0..a91429c22 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -72,6 +72,7 @@ database: room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable" server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable" federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable" + public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable" # The TCP host:port pairs to bind the internal HTTP APIs to. # These shouldn't be exposed to the public internet. @@ -82,3 +83,4 @@ listen: federation_api: "localhost:7772" sync_api: "localhost:7773" media_api: "localhost:7774" + public_rooms_api: "localhost:7775" diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go index 15ca4961b..069fb1c2f 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -271,7 +272,7 @@ func buildMembershipEvents( StateKey: &userID, } - content := events.MemberContent{ + content := common.MemberContent{ Membership: "join", } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 8a5799c0e..32301c784 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -265,20 +265,6 @@ func Setup( }), ) - r0mux.Handle("/publicRooms", - common.MakeAPI("public_rooms", func(req *http.Request) util.JSONResponse { - // TODO: Return a list of public rooms - return util.JSONResponse{ - Code: 200, - JSON: struct { - Chunk []struct{} `json:"chunk"` - Start string `json:"start"` - End string `json:"end"` - }{[]struct{}{}, "", ""}, - } - }), - ) - unstableMux.Handle("/thirdparty/protocols", common.MakeAPI("thirdparty_protocols", func(req *http.Request) util.JSONResponse { // TODO: Return the third party protcols diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index e43ae780d..58826c2d9 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -24,10 +24,10 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -132,7 +132,7 @@ func createRoom(req *http.Request, device *authtypes.Device, return httputil.LogThenError(req, err) } - membershipContent := events.MemberContent{ + membershipContent := common.MemberContent{ Membership: "join", DisplayName: profile.DisplayName, AvatarURL: profile.AvatarURL, @@ -159,16 +159,16 @@ func createRoom(req *http.Request, device *authtypes.Device, // harder to reason about, hence sticking to a strict static ordering. // TODO: Synapse has txn/token ID on each event. Do we need to do this here? eventsToMake := []fledglingEvent{ - {"m.room.create", "", events.CreateContent{Creator: userID}}, + {"m.room.create", "", common.CreateContent{Creator: userID}}, {"m.room.member", userID, membershipContent}, - {"m.room.power_levels", "", events.InitialPowerLevelsContent(userID)}, + {"m.room.power_levels", "", common.InitialPowerLevelsContent(userID)}, // TODO: m.room.canonical_alias - {"m.room.join_rules", "", events.JoinRulesContent{"public"}}, // FIXME: Allow this to be changed - {"m.room.history_visibility", "", events.HistoryVisibilityContent{"joined"}}, // FIXME: Allow this to be changed - {"m.room.guest_access", "", events.GuestAccessContent{"can_join"}}, // FIXME: Allow this to be changed + {"m.room.join_rules", "", common.JoinRulesContent{"public"}}, // FIXME: Allow this to be changed + {"m.room.history_visibility", "", common.HistoryVisibilityContent{"joined"}}, // FIXME: Allow this to be changed + {"m.room.guest_access", "", common.GuestAccessContent{"can_join"}}, // FIXME: Allow this to be changed // TODO: Other initial state items - {"m.room.name", "", events.NameContent{r.Name}}, // FIXME: Only send the name event if a name is supplied, to avoid sending a false room name removal event - {"m.room.topic", "", events.TopicContent{r.Topic}}, + {"m.room.name", "", common.NameContent{r.Name}}, // FIXME: Only send the name event if a name is supplied, to avoid sending a false room name removal event + {"m.room.topic", "", common.TopicContent{r.Topic}}, // TODO: invite events // TODO: 3pid invite events // TODO: m.room.aliases diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/membership.go b/src/github.com/matrix-org/dendrite/clientapi/writers/membership.go index 7b199a606..ab3a16459 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/membership.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/membership.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -69,7 +70,7 @@ func SendMembership( membership = "leave" } - content := events.MemberContent{ + content := common.MemberContent{ Membership: membership, DisplayName: profile.DisplayName, AvatarURL: profile.AvatarURL, diff --git a/src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go b/src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go index 1c6cc4f2c..477f8d127 100644 --- a/src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go @@ -47,12 +47,13 @@ Arguments: ` var ( - syncServerURL = flag.String("sync-api-server-url", "", "The base URL of the listening 'dendrite-sync-api-server' process. E.g. 'http://localhost:4200'") - clientAPIURL = flag.String("client-api-server-url", "", "The base URL of the listening 'dendrite-client-api-server' process. E.g. 'http://localhost:4321'") - mediaAPIURL = flag.String("media-api-server-url", "", "The base URL of the listening 'dendrite-media-api-server' process. E.g. 'http://localhost:7779'") - bindAddress = flag.String("bind-address", ":8008", "The listening port for the proxy.") - certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS") - keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS") + syncServerURL = flag.String("sync-api-server-url", "", "The base URL of the listening 'dendrite-sync-api-server' process. E.g. 'http://localhost:4200'") + clientAPIURL = flag.String("client-api-server-url", "", "The base URL of the listening 'dendrite-client-api-server' process. E.g. 'http://localhost:4321'") + mediaAPIURL = flag.String("media-api-server-url", "", "The base URL of the listening 'dendrite-media-api-server' process. E.g. 'http://localhost:7779'") + publicRoomsAPIURL = flag.String("public-rooms-api-server-url", "", "The base URL of the listening 'dendrite-public-rooms-api-server' process. E.g. 'http://localhost:7775'") + bindAddress = flag.String("bind-address", ":8008", "The listening port for the proxy.") + certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS") + keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS") ) func makeProxy(targetURL string) (*httputil.ReverseProxy, error) { @@ -122,6 +123,12 @@ func main() { os.Exit(1) } + if *publicRoomsAPIURL == "" { + flag.Usage() + fmt.Fprintln(os.Stderr, "no --public-rooms-api-server-url specified.") + os.Exit(1) + } + syncProxy, err := makeProxy(*syncServerURL) if err != nil { panic(err) @@ -134,8 +141,14 @@ func main() { if err != nil { panic(err) } + publicRoomsProxy, err := makeProxy(*publicRoomsAPIURL) + if err != nil { + panic(err) + } http.Handle("/_matrix/client/r0/sync", syncProxy) + http.Handle("/_matrix/client/r0/directory/list/", publicRoomsProxy) + http.Handle("/_matrix/client/r0/publicRooms", publicRoomsProxy) http.Handle("/_matrix/media/v1/", mediaProxy) http.Handle("/", clientProxy) @@ -146,9 +159,11 @@ func main() { } fmt.Println("Proxying requests to:") - fmt.Println(" /_matrix/client/r0/sync => ", *syncServerURL+"/api/_matrix/client/r0/sync") - fmt.Println(" /_matrix/media/v1 => ", *mediaAPIURL+"/api/_matrix/media/v1") - fmt.Println(" /* => ", *clientAPIURL+"/api/*") + fmt.Println(" /_matrix/client/r0/sync => ", *syncServerURL+"/api/_matrix/client/r0/sync") + fmt.Println(" /_matrix/client/r0/directory/list => ", *publicRoomsAPIURL+"/_matrix/client/r0/directory/list") + fmt.Println(" /_matrix/client/r0/publicRooms => ", *publicRoomsAPIURL+"/_matrix/media/client/r0/publicRooms") + fmt.Println(" /_matrix/media/v1 => ", *mediaAPIURL+"/api/_matrix/media/v1") + fmt.Println(" /* => ", *clientAPIURL+"/api/*") fmt.Println("Listening on ", *bindAddress) if *certFile != "" && *keyFile != "" { panic(srv.ListenAndServeTLS(*certFile, *keyFile)) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index a856a7249..25b269b89 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -52,6 +52,10 @@ import ( "github.com/matrix-org/dendrite/federationsender/queue" federationsender_storage "github.com/matrix-org/dendrite/federationsender/storage" + publicroomsapi_consumers "github.com/matrix-org/dendrite/publicroomsapi/consumers" + publicroomsapi_routing "github.com/matrix-org/dendrite/publicroomsapi/routing" + publicroomsapi_storage "github.com/matrix-org/dendrite/publicroomsapi/storage" + log "github.com/Sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -119,6 +123,7 @@ type monolith struct { mediaAPIDB *mediaapi_storage.Database syncAPIDB *syncapi_storage.SyncServerDatabase federationSenderDB *federationsender_storage.Database + publicRoomsAPIDB *publicroomsapi_storage.PublicRoomsServerDatabase federation *gomatrixserverlib.FederationClient keyRing gomatrixserverlib.KeyRing @@ -171,6 +176,10 @@ func (m *monolith) setupDatabases() { if err != nil { log.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err) } + m.publicRoomsAPIDB, err = publicroomsapi_storage.NewPublicRoomsServerDatabase(string(m.cfg.Database.PublicRoomsAPI)) + if err != nil { + log.Panicf("startup: failed to setup public rooms api database with data source %s : %s", m.cfg.Database.PublicRoomsAPI, err) + } } func (m *monolith) setupFederation() { @@ -290,6 +299,13 @@ func (m *monolith) setupConsumers() { log.Panicf("startup: failed to start client API server consumer: %s", err) } + publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI, + ) + if err = publicRoomsAPIConsumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer: %s", err) + } + federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent( @@ -318,4 +334,6 @@ func (m *monolith) setupAPIs() { federationapi_routing.Setup( m.api, *m.cfg, m.queryAPI, m.roomServerProducer, m.keyRing, m.federation, ) + + publicroomsapi_routing.Setup(m.api, m.deviceDB, m.publicRoomsAPIDB) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go new file mode 100644 index 000000000..c8e705f91 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go @@ -0,0 +1,85 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "net/http" + "os" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/publicroomsapi/consumers" + "github.com/matrix-org/dendrite/publicroomsapi/routing" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/dendrite/roomserver/api" + + log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" +) + +var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") + +func main() { + common.SetupLogging(os.Getenv("LOG_DIR")) + + flag.Parse() + + if *configPath == "" { + log.Fatal("--config must be supplied") + } + cfg, err := config.Load(*configPath) + if err != nil { + log.Fatalf("Invalid config file: %s", err) + } + + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) + + db, err := storage.NewPublicRoomsServerDatabase(string(cfg.Database.PublicRoomsAPI)) + if err != nil { + log.Panicf("startup: failed to create public rooms server database with data source %s : %s", cfg.Database.PublicRoomsAPI, err) + } + + deviceDB, err := devices.NewDatabase(string(cfg.Database.Device), cfg.Matrix.ServerName) + if err != nil { + log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err) + } + + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + + roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db, queryAPI) + if err != nil { + log.Panicf("startup: failed to create room server consumer: %s", err) + } + if err = roomConsumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer: %s", err) + } + + log.Info("Starting public rooms server on ", cfg.Listen.PublicRoomsAPI) + + api := mux.NewRouter() + routing.Setup(api, deviceDB, db) + common.SetupHTTPAPI(http.DefaultServeMux, api) + + log.Fatal(http.ListenAndServe(string(cfg.Listen.PublicRoomsAPI), nil)) +} diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index ae0fe62cd..8d76a03d4 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -133,6 +133,9 @@ type Dendrite struct { // The FederationSender database stores information used by the FederationSender // It is only accessed by the FederationSender. FederationSender DataSource `yaml:"federation_sender"` + // The PublicRoomsAPI database stores information used to compute the public + // room directory. It is only accessed by the PublicRoomsAPI server. + PublicRoomsAPI DataSource `yaml:"public_rooms_api"` } `yaml:"database"` // The internal addresses the components will listen on. @@ -144,6 +147,7 @@ type Dendrite struct { SyncAPI Address `yaml:"sync_api"` RoomServer Address `yaml:"room_server"` FederationSender Address `yaml:"federation_sender"` + PublicRoomsAPI Address `yaml:"public_rooms_api"` } `yaml:"listen"` } diff --git a/src/github.com/matrix-org/dendrite/clientapi/events/eventcontent.go b/src/github.com/matrix-org/dendrite/common/eventcontent.go similarity index 77% rename from src/github.com/matrix-org/dendrite/clientapi/events/eventcontent.go rename to src/github.com/matrix-org/dendrite/common/eventcontent.go index e16b54004..7aa869388 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/events/eventcontent.go +++ b/src/github.com/matrix-org/dendrite/common/eventcontent.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package events +package common // CreateContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-create type CreateContent struct { @@ -90,3 +90,29 @@ func InitialPowerLevelsContent(roomCreator string) PowerLevelContent { Users: map[string]int{roomCreator: 100}, } } + +// AliasesContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-aliases +type AliasesContent struct { + Aliases []string `json:"aliases"` +} + +// CanonicalAliasContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-canonical-alias +type CanonicalAliasContent struct { + Alias string `json:"alias"` +} + +// AvatarContent is the event content for http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar +type AvatarContent struct { + Info ImageInfo `json:"info,omitempty"` + URL string `json:"url"` + ThumbnailURL string `json:"thumbnail_url,omitempty"` + ThumbnailInfo ImageInfo `json:"thumbnail_info,omitempty"` +} + +// ImageInfo implements the ImageInfo structure from http://matrix.org/docs/spec/client_server/r0.2.0.html#m-room-avatar +type ImageInfo struct { + Mimetype string `json:"mimetype"` + Height int64 `json:"h"` + Width int64 `json:"w"` + Size int64 `json:"size"` +} diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index 948c60f10..0efba5dfb 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -95,12 +95,14 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Database.RoomServer = config.DataSource(database) cfg.Database.ServerKey = config.DataSource(database) cfg.Database.SyncAPI = config.DataSource(database) + cfg.Database.PublicRoomsAPI = config.DataSource(database) cfg.Listen.ClientAPI = assignAddress() cfg.Listen.FederationAPI = assignAddress() cfg.Listen.MediaAPI = assignAddress() cfg.Listen.RoomServer = assignAddress() cfg.Listen.SyncAPI = assignAddress() + cfg.Listen.PublicRoomsAPI = assignAddress() return &cfg, port, nil } diff --git a/src/github.com/matrix-org/dendrite/common/test/server.go b/src/github.com/matrix-org/dendrite/common/test/server.go index 8e1af2041..4fdd5e638 100644 --- a/src/github.com/matrix-org/dendrite/common/test/server.go +++ b/src/github.com/matrix-org/dendrite/common/test/server.go @@ -94,6 +94,7 @@ func StartProxy(bindAddr string, cfg *config.Dendrite) (*exec.Cmd, chan error) { "--sync-api-server-url", "http://" + string(cfg.Listen.SyncAPI), "--client-api-server-url", "http://" + string(cfg.Listen.ClientAPI), "--media-api-server-url", "http://" + string(cfg.Listen.MediaAPI), + "--public-rooms-api-server-url", "http://" + string(cfg.Listen.PublicRoomsAPI), "--tls-cert", "server.crt", "--tls-key", "server.key", } diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/README.md b/src/github.com/matrix-org/dendrite/publicroomsapi/README.md new file mode 100644 index 000000000..594fe29c5 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/README.md @@ -0,0 +1,5 @@ +# Public rooms API + +This server is responsible for serving requests hitting `/publicRooms` and `/directory/list/room/{roomID}` as per: + +https://matrix.org/docs/spec/client_server/r0.2.0.html#listing-rooms diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go new file mode 100644 index 000000000..959151052 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -0,0 +1,101 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/dendrite/roomserver/api" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEvent consumes events that originated in the room server. +type OutputRoomEvent struct { + roomServerConsumer *common.ContinualConsumer + db *storage.PublicRoomsServerDatabase + query api.RoomserverQueryAPI +} + +// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + store *storage.PublicRoomsServerDatabase, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEvent { + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEvent{ + roomServerConsumer: &consumer, + db: store, + query: queryAPI, + } + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from room servers +func (s *OutputRoomEvent) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the room server output log. +func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + + ev := output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "type": ev.Type(), + }).Info("received event from roomserver") + + addQueryReq := api.QueryEventsByIDRequest{output.NewRoomEvent.AddsStateEventIDs} + var addQueryRes api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&addQueryReq, &addQueryRes); err != nil { + log.Warn(err) + return err + } + + remQueryReq := api.QueryEventsByIDRequest{output.NewRoomEvent.RemovesStateEventIDs} + var remQueryRes api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&remQueryReq, &remQueryRes); err != nil { + log.Warn(err) + return err + } + + return s.db.UpdateRoomFromEvents(addQueryRes.Events, remQueryRes.Events) +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/directory/directory.go b/src/github.com/matrix-org/dendrite/publicroomsapi/directory/directory.go new file mode 100644 index 000000000..1718a18af --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/directory/directory.go @@ -0,0 +1,73 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package directory + +import ( + "net/http" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + + "github.com/matrix-org/util" +) + +type roomVisibility struct { + Visibility string `json:"visibility"` +} + +// GetVisibility implements GET /directory/list/room/{roomID} +func GetVisibility( + req *http.Request, publicRoomsDatabase *storage.PublicRoomsServerDatabase, + roomID string, +) util.JSONResponse { + isPublic, err := publicRoomsDatabase.GetRoomVisibility(roomID) + if err != nil { + return httputil.LogThenError(req, err) + } + + var v roomVisibility + if isPublic { + v.Visibility = "public" + } else { + v.Visibility = "private" + } + + return util.JSONResponse{ + Code: 200, + JSON: v, + } +} + +// SetVisibility implements PUT /directory/list/room/{roomID} +// TODO: Check if user has the power level to edit the room visibility +func SetVisibility( + req *http.Request, publicRoomsDatabase *storage.PublicRoomsServerDatabase, + roomID string, +) util.JSONResponse { + var v roomVisibility + if reqErr := httputil.UnmarshalJSONRequest(req, &v); reqErr != nil { + return *reqErr + } + + isPublic := v.Visibility == "public" + if err := publicRoomsDatabase.SetRoomVisibility(isPublic, roomID); err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: struct{}{}, + } +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/directory/public_rooms.go b/src/github.com/matrix-org/dendrite/publicroomsapi/directory/public_rooms.go new file mode 100644 index 000000000..4566715c2 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/directory/public_rooms.go @@ -0,0 +1,113 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package directory + +import ( + "net/http" + "strconv" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/dendrite/publicroomsapi/types" + "github.com/matrix-org/util" +) + +type publicRoomReq struct { + Since string `json:"since,omitempty"` + Limit int16 `json:"limit,omitempty"` + Filter filter `json:"filter,omitempty"` +} + +type filter struct { + SearchTerms string `json:"generic_search_term,omitempty"` +} + +type publicRoomRes struct { + Chunk []types.PublicRoom `json:"chunk"` + NextBatch string `json:"next_batch,omitempty"` + PrevBatch string `json:"prev_batch,omitempty"` + Estimate int64 `json:"total_room_count_estimate,omitempty"` +} + +// GetPublicRooms implements GET /publicRooms +func GetPublicRooms( + req *http.Request, publicRoomDatabase *storage.PublicRoomsServerDatabase, +) util.JSONResponse { + var limit int16 + var offset int64 + var request publicRoomReq + var response publicRoomRes + + if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { + return *fillErr + } + + limit = request.Limit + offset, err := strconv.ParseInt(request.Since, 10, 64) + // ParseInt returns 0 and an error when trying to parse an empty string + // In that case, we want to assign 0 so we ignore the error + if err != nil && len(request.Since) > 0 { + return httputil.LogThenError(req, err) + } + + if response.Estimate, err = publicRoomDatabase.CountPublicRooms(); err != nil { + return httputil.LogThenError(req, err) + } + + if offset > 0 { + response.PrevBatch = strconv.Itoa(int(offset) - 1) + } + nextIndex := int(offset) + int(limit) + if response.Estimate > int64(nextIndex) { + response.NextBatch = strconv.Itoa(nextIndex) + } + + if response.Chunk, err = publicRoomDatabase.GetPublicRooms(offset, limit, request.Filter.SearchTerms); err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: response, + } +} + +// fillPublicRoomsReq fills the Limit, Since and Filter attributes of a GET or POST request +// on /publicRooms by parsing the incoming HTTP request +func fillPublicRoomsReq(httpReq *http.Request, request *publicRoomReq) *util.JSONResponse { + if httpReq.Method == "GET" { + limit, err := strconv.Atoi(httpReq.FormValue("limit")) + // Atoi returns 0 and an error when trying to parse an empty string + // In that case, we want to assign 0 so we ignore the error + if err != nil && len(httpReq.FormValue("limit")) > 0 { + reqErr := httputil.LogThenError(httpReq, err) + return &reqErr + } + request.Limit = int16(limit) + request.Since = httpReq.FormValue("since") + return nil + } else if httpReq.Method == "POST" { + if reqErr := httputil.UnmarshalJSONRequest(httpReq, request); reqErr != nil { + return reqErr + } + return nil + } + + return &util.JSONResponse{ + Code: 405, + JSON: jsonerror.NotFound("Bad method"), + } +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/routing/routing.go b/src/github.com/matrix-org/dendrite/publicroomsapi/routing/routing.go new file mode 100644 index 000000000..18b8cc57a --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/routing/routing.go @@ -0,0 +1,51 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/publicroomsapi/directory" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/util" +) + +const pathPrefixR0 = "/_matrix/client/r0" + +// Setup configures the given mux with publicroomsapi server listeners +func Setup(apiMux *mux.Router, deviceDB *devices.Database, publicRoomsDB *storage.PublicRoomsServerDatabase) { + r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() + r0mux.Handle("/directory/list/room/{roomID}", + common.MakeAPI("directory_list", func(req *http.Request) util.JSONResponse { + vars := mux.Vars(req) + return directory.GetVisibility(req, publicRoomsDB, vars["roomID"]) + }), + ).Methods("GET") + r0mux.Handle("/directory/list/room/{roomID}", + common.MakeAuthAPI("directory_list", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return directory.SetVisibility(req, publicRoomsDB, vars["roomID"]) + }), + ).Methods("PUT", "OPTIONS") + r0mux.Handle("/publicRooms", + common.MakeAPI("public_rooms", func(req *http.Request) util.JSONResponse { + return directory.GetPublicRooms(req, publicRoomsDB) + }), + ).Methods("GET", "POST", "OPTIONS") +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/prepare.go b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/prepare.go new file mode 100644 index 000000000..b19765992 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/prepare.go @@ -0,0 +1,35 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" +) + +// a statementList is a list of SQL statements to prepare and a pointer to where to store the resulting prepared statement. +type statementList []struct { + statement **sql.Stmt + sql string +} + +// prepare the SQL for each statement in the list and assign the result to the prepared statement. +func (s statementList) prepare(db *sql.DB) (err error) { + for _, statement := range s { + if *statement.statement, err = db.Prepare(statement.sql); err != nil { + return + } + } + return +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/public_rooms_table.go b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/public_rooms_table.go new file mode 100644 index 000000000..5cef577e7 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/public_rooms_table.go @@ -0,0 +1,268 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + "errors" + "fmt" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/publicroomsapi/types" +) + +var editableAttributes = []string{ + "aliases", + "canonical_alias", + "name", + "topic", + "world_readable", + "guest_can_join", + "avatar_url", + "visibility", +} + +const publicRoomsSchema = ` +-- Stores all of the rooms with data needed to create the server's room directory +CREATE TABLE IF NOT EXISTS publicroomsapi_public_rooms( + -- The room's ID + room_id TEXT NOT NULL PRIMARY KEY, + -- Number of joined members in the room + joined_members INTEGER NOT NULL DEFAULT 0, + -- Aliases of the room (empty array if none) + aliases TEXT[] NOT NULL DEFAULT '{}'::TEXT[], + -- Canonical alias of the room (empty string if none) + canonical_alias TEXT NOT NULL DEFAULT '', + -- Name of the room (empty string if none) + name TEXT NOT NULL DEFAULT '', + -- Topic of the room (empty string if none) + topic TEXT NOT NULL DEFAULT '', + -- Is the room world readable? + world_readable BOOLEAN NOT NULL DEFAULT false, + -- Can guest join the room? + guest_can_join BOOLEAN NOT NULL DEFAULT false, + -- URL of the room avatar (empty string if none) + avatar_url TEXT NOT NULL DEFAULT '', + -- Visibility of the room: true means the room is publicly visible, false + -- means the room is private + visibility BOOLEAN NOT NULL DEFAULT false +); +` + +const countPublicRoomsSQL = "" + + "SELECT COUNT(*) FROM publicroomsapi_public_rooms" + + " WHERE visibility = true" + +const selectPublicRoomsSQL = "" + + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + + " FROM publicroomsapi_public_rooms WHERE visibility = true" + + " ORDER BY joined_members DESC" + + " OFFSET $1" + +const selectPublicRoomsWithLimitSQL = "" + + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + + " FROM publicroomsapi_public_rooms WHERE visibility = true" + + " ORDER BY joined_members DESC" + + " OFFSET $1 LIMIT $2" + +const selectPublicRoomsWithFilterSQL = "" + + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + + " FROM publicroomsapi_public_rooms" + + " WHERE visibility = true" + + " AND (LOWER(name) LIKE LOWER($1)" + + " OR LOWER(topic) LIKE LOWER($1)" + + " OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" + + " ORDER BY joined_members DESC" + + " OFFSET $2" + +const selectPublicRoomsWithLimitAndFilterSQL = "" + + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + + " FROM publicroomsapi_public_rooms" + + " WHERE visibility = true" + + " AND (LOWER(name) LIKE LOWER($1)" + + " OR LOWER(topic) LIKE LOWER($1)" + + " OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" + + " ORDER BY joined_members DESC" + + " OFFSET $2 LIMIT $3" + +const selectRoomVisibilitySQL = "" + + "SELECT visibility FROM publicroomsapi_public_rooms" + + " WHERE room_id = $1" + +const insertNewRoomSQL = "" + + "INSERT INTO publicroomsapi_public_rooms(room_id)" + + " VALUES ($1)" + +const incrementJoinedMembersInRoomSQL = "" + + "UPDATE publicroomsapi_public_rooms" + + " SET joined_members = joined_members + 1" + + " WHERE room_id = $1" + +const decrementJoinedMembersInRoomSQL = "" + + "UPDATE publicroomsapi_public_rooms" + + " SET joined_members = joined_members - 1" + + " WHERE room_id = $1" + +const updateRoomAttributeSQL = "" + + "UPDATE publicroomsapi_public_rooms" + + " SET %s = $1" + + " WHERE room_id = $2" + +type publicRoomsStatements struct { + countPublicRoomsStmt *sql.Stmt + selectPublicRoomsStmt *sql.Stmt + selectPublicRoomsWithLimitStmt *sql.Stmt + selectPublicRoomsWithFilterStmt *sql.Stmt + selectPublicRoomsWithLimitAndFilterStmt *sql.Stmt + selectRoomVisibilityStmt *sql.Stmt + insertNewRoomStmt *sql.Stmt + incrementJoinedMembersInRoomStmt *sql.Stmt + decrementJoinedMembersInRoomStmt *sql.Stmt + updateRoomAttributeStmts map[string]*sql.Stmt +} + +func (s *publicRoomsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(publicRoomsSchema) + if err != nil { + return + } + + stmts := statementList{ + {&s.countPublicRoomsStmt, countPublicRoomsSQL}, + {&s.selectPublicRoomsStmt, selectPublicRoomsSQL}, + {&s.selectPublicRoomsWithLimitStmt, selectPublicRoomsWithLimitSQL}, + {&s.selectPublicRoomsWithFilterStmt, selectPublicRoomsWithFilterSQL}, + {&s.selectPublicRoomsWithLimitAndFilterStmt, selectPublicRoomsWithLimitAndFilterSQL}, + {&s.selectRoomVisibilityStmt, selectRoomVisibilitySQL}, + {&s.insertNewRoomStmt, insertNewRoomSQL}, + {&s.incrementJoinedMembersInRoomStmt, incrementJoinedMembersInRoomSQL}, + {&s.decrementJoinedMembersInRoomStmt, decrementJoinedMembersInRoomSQL}, + } + + if err = stmts.prepare(db); err != nil { + return + } + + s.updateRoomAttributeStmts = make(map[string]*sql.Stmt) + for _, editable := range editableAttributes { + stmt := fmt.Sprintf(updateRoomAttributeSQL, editable) + if s.updateRoomAttributeStmts[editable], err = db.Prepare(stmt); err != nil { + return + } + } + + return +} + +func (s *publicRoomsStatements) countPublicRooms() (nb int64, err error) { + err = s.countPublicRoomsStmt.QueryRow().Scan(&nb) + return +} + +func (s *publicRoomsStatements) selectPublicRooms(offset int64, limit int16, filter string) ([]types.PublicRoom, error) { + var rows *sql.Rows + var err error + + if len(filter) > 0 { + pattern := "%" + filter + "%" + if limit == 0 { + rows, err = s.selectPublicRoomsWithFilterStmt.Query(pattern, offset) + } else { + rows, err = s.selectPublicRoomsWithLimitAndFilterStmt.Query(pattern, offset, limit) + } + } else { + if limit == 0 { + rows, err = s.selectPublicRoomsStmt.Query(offset) + } else { + rows, err = s.selectPublicRoomsWithLimitStmt.Query(offset, limit) + } + } + + if err != nil { + return []types.PublicRoom{}, nil + } + + rooms := []types.PublicRoom{} + for rows.Next() { + var r types.PublicRoom + var aliases pq.StringArray + + err = rows.Scan( + &r.RoomID, &r.NumJoinedMembers, &aliases, &r.CanonicalAlias, + &r.Name, &r.Topic, &r.WorldReadable, &r.GuestCanJoin, &r.AvatarURL, + ) + if err != nil { + return rooms, err + } + + r.Aliases = make([]string, len(aliases)) + for i := range aliases { + r.Aliases[i] = aliases[i] + } + + rooms = append(rooms, r) + } + + return rooms, nil +} + +func (s *publicRoomsStatements) selectRoomVisibility(roomID string) (v bool, err error) { + err = s.selectRoomVisibilityStmt.QueryRow(roomID).Scan(&v) + return +} + +func (s *publicRoomsStatements) insertNewRoom(roomID string) error { + _, err := s.insertNewRoomStmt.Exec(roomID) + return err +} + +func (s *publicRoomsStatements) incrementJoinedMembersInRoom(roomID string) error { + _, err := s.incrementJoinedMembersInRoomStmt.Exec(roomID) + return err +} + +func (s *publicRoomsStatements) decrementJoinedMembersInRoom(roomID string) error { + _, err := s.decrementJoinedMembersInRoomStmt.Exec(roomID) + return err +} + +func (s *publicRoomsStatements) updateRoomAttribute(attrName string, attrValue attributeValue, roomID string) error { + isEditable := false + for _, editable := range editableAttributes { + if editable == attrName { + isEditable = true + } + } + + if !isEditable { + return errors.New("Cannot edit " + attrName) + } + + var value interface{} + if attrName == "aliases" { + // Aliases need a special conversion + valueAsSlice, isSlice := attrValue.([]string) + if !isSlice { + // attrValue isn't a slice of strings + return errors.New("New list of aliases is of the wrong type") + } + value = pq.StringArray(valueAsSlice) + } else { + value = attrValue + } + + _, err := s.updateRoomAttributeStmts[attrName].Exec(value, roomID) + return err +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go new file mode 100644 index 000000000..83861180f --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go @@ -0,0 +1,247 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/publicroomsapi/types" + + "github.com/matrix-org/gomatrixserverlib" +) + +// PublicRoomsServerDatabase represents a public rooms server database. +type PublicRoomsServerDatabase struct { + db *sql.DB + partitions common.PartitionOffsetStatements + statements publicRoomsStatements +} + +type attributeValue interface{} + +// NewPublicRoomsServerDatabase creates a new public rooms server database. +func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerDatabase, error) { + var db *sql.DB + var err error + if db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + partitions := common.PartitionOffsetStatements{} + if err = partitions.Prepare(db, "publicroomsapi"); err != nil { + return nil, err + } + statements := publicRoomsStatements{} + if err = statements.prepare(db); err != nil { + return nil, err + } + return &PublicRoomsServerDatabase{db, partitions, statements}, nil +} + +// PartitionOffsets implements common.PartitionStorer +func (d *PublicRoomsServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.partitions.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *PublicRoomsServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.partitions.UpsertPartitionOffset(topic, partition, offset) +} + +// GetRoomVisibility returns the room visibility as a boolean: true if the room +// is publicly visible, false if not. +// Returns an error if the retrieval failed. +func (d *PublicRoomsServerDatabase) GetRoomVisibility(roomID string) (bool, error) { + return d.statements.selectRoomVisibility(roomID) +} + +// SetRoomVisibility updates the visibility attribute of a room. This attribute +// must be set to true if the room is publicly visible, false if not. +// Returns an error if the update failed. +func (d *PublicRoomsServerDatabase) SetRoomVisibility(visible bool, roomID string) error { + return d.statements.updateRoomAttribute("visibility", visible, roomID) +} + +// CountPublicRooms returns the number of room set as publicly visible on the server. +// Returns an error if the retrieval failed. +func (d *PublicRoomsServerDatabase) CountPublicRooms() (int64, error) { + return d.statements.countPublicRooms() +} + +// GetPublicRooms returns an array containing the local rooms set as publicly visible, ordered by their number +// of joined members. This array can be limited by a given number of elements, and offset by a given value. +// If the limit is 0, doesn't limit the number of results. If the offset is 0 too, the array contains all +// the rooms set as publicly visible on the server. +// Returns an error if the retrieval failed. +func (d *PublicRoomsServerDatabase) GetPublicRooms(offset int64, limit int16, filter string) ([]types.PublicRoom, error) { + return d.statements.selectPublicRooms(offset, limit, filter) +} + +// UpdateRoomFromEvents iterate over a slice of state events and call +// UpdateRoomFromEvent on each of them to update the database representation of +// the rooms updated by each event. +// The slice of events to remove is used to update the number of joined members +// for the room in the database. +// If the update triggered by one of the events failed, aborts the process and +// returns an error. +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents( + eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event, +) error { + for _, event := range eventsToAdd { + if err := d.UpdateRoomFromEvent(event); err != nil { + return err + } + } + + for _, event := range eventsToRemove { + if event.Type() == "m.room.member" { + if err := d.updateNumJoinedUsers(event, true); err != nil { + return err + } + } + } + + return nil +} + +// UpdateRoomFromEvent updates the database representation of a room from a Matrix event, by +// checking the event's type to know which attribute to change and using the event's content +// to define the new value of the attribute. +// If the event doesn't match with any property used to compute the public room directory, +// does nothing. +// If something went wrong during the process, returns an error. +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(event gomatrixserverlib.Event) error { + // Process the event according to its type + switch event.Type() { + case "m.room.create": + return d.statements.insertNewRoom(event.RoomID()) + case "m.room.member": + return d.updateNumJoinedUsers(event, false) + case "m.room.aliases": + return d.updateRoomAliases(event) + case "m.room.canonical_alias": + var content common.CanonicalAliasContent + field := &(content.Alias) + attrName := "canonical_alias" + return d.updateStringAttribute(attrName, event, &content, field) + case "m.room.name": + var content common.NameContent + field := &(content.Name) + attrName := "name" + return d.updateStringAttribute(attrName, event, &content, field) + case "m.room.topic": + var content common.TopicContent + field := &(content.Topic) + attrName := "topic" + return d.updateStringAttribute(attrName, event, &content, field) + case "m.room.avatar": + var content common.AvatarContent + field := &(content.URL) + attrName := "avatar_url" + return d.updateStringAttribute(attrName, event, &content, field) + case "m.room.history_visibility": + var content common.HistoryVisibilityContent + field := &(content.HistoryVisibility) + attrName := "world_readable" + strForTrue := "world_readable" + return d.updateBooleanAttribute(attrName, event, &content, field, strForTrue) + case "m.room.guest_access": + var content common.GuestAccessContent + field := &(content.GuestAccess) + attrName := "guest_can_join" + strForTrue := "can_join" + return d.updateBooleanAttribute(attrName, event, &content, field, strForTrue) + } + + // If the event type didn't match, return with no error + return nil +} + +// updateNumJoinedUsers updates the number of joined user in the database representation +// of a room using a given "m.room.member" Matrix event. +// If the membership property of the event isn't "join", ignores it and returs nil. +// If the remove parameter is set to false, increments the joined members counter in the +// database, if set to truem decrements it. +// Returns an error if the update failed. +func (d *PublicRoomsServerDatabase) updateNumJoinedUsers( + membershipEvent gomatrixserverlib.Event, remove bool, +) error { + membership, err := membershipEvent.Membership() + if err != nil { + return err + } + + if membership != "join" { + return nil + } + + if remove { + return d.statements.decrementJoinedMembersInRoom(membershipEvent.RoomID()) + } + return d.statements.incrementJoinedMembersInRoom(membershipEvent.RoomID()) +} + +// updateStringAttribute updates a given string attribute in the database +// representation of a room using a given string data field from content of the +// Matrix event triggering the update. +// Returns an error if decoding the Matrix event's content or updating the attribute +// failed. +func (d *PublicRoomsServerDatabase) updateStringAttribute( + attrName string, event gomatrixserverlib.Event, content interface{}, + field *string, +) error { + if err := json.Unmarshal(event.Content(), content); err != nil { + return err + } + + return d.statements.updateRoomAttribute(attrName, *field, event.RoomID()) +} + +// updateBooleanAttribute updates a given boolean attribute in the database +// representation of a room using a given string data field from content of the +// Matrix event triggering the update. +// The attribute is set to true if the field matches a given string, false if not. +// Returns an error if decoding the Matrix event's content or updating the attribute +// failed. +func (d *PublicRoomsServerDatabase) updateBooleanAttribute( + attrName string, event gomatrixserverlib.Event, content interface{}, + field *string, strForTrue string, +) error { + if err := json.Unmarshal(event.Content(), content); err != nil { + return err + } + + var attrValue bool + if *field == strForTrue { + attrValue = true + } else { + attrValue = false + } + + return d.statements.updateRoomAttribute(attrName, attrValue, event.RoomID()) +} + +// updateRoomAliases decodes the content of a "m.room.aliases" Matrix event and update the list of aliases of +// a given room with it. +// Returns an error if decoding the Matrix event or updating the list failed. +func (d *PublicRoomsServerDatabase) updateRoomAliases(aliasesEvent gomatrixserverlib.Event) error { + var content common.AliasesContent + if err := json.Unmarshal(aliasesEvent.Content(), &content); err != nil { + return err + } + + return d.statements.updateRoomAttribute("aliases", content.Aliases, aliasesEvent.RoomID()) +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/types/types.go b/src/github.com/matrix-org/dendrite/publicroomsapi/types/types.go new file mode 100644 index 000000000..c284bcca4 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/types/types.go @@ -0,0 +1,28 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +// PublicRoom represents a local public room +type PublicRoom struct { + RoomID string `json:"room_id"` + Aliases []string `json:"aliases,omitempty"` + CanonicalAlias string `json:"canonical_alias,omitempty"` + Name string `json:"name,omitempty"` + Topic string `json:"topic,omitempty"` + AvatarURL string `json:"avatar_url,omitempty"` + NumJoinedMembers int64 `json:"num_joined_members"` + WorldReadable bool `json:"world_readable"` + GuestCanJoin bool `json:"guest_can_join"` +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/room_aliases_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/room_aliases_table.go index 433835d7a..bfd6cc090 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/room_aliases_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/room_aliases_table.go @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS roomserver_room_aliases ( room_id TEXT NOT NULL ); -CREATE UNIQUE INDEX IF NOT EXISTS roomserver_room_id_idx ON roomserver_room_aliases(room_id); +CREATE INDEX IF NOT EXISTS roomserver_room_id_idx ON roomserver_room_aliases(room_id); ` const insertRoomAliasSQL = "" + From 166ac9d092855652cc01ed57c076ced0d48351df Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 22 Aug 2017 14:14:37 +0100 Subject: [PATCH 6/6] Fix sync not returning on room join (#195) * Use BuildEvent method on room join * Fix building the list of room members in the sync notifier * Fix building the list of room members in the sync notifier * Rephrase comment --- .../dendrite/clientapi/writers/joinroom.go | 51 +++---------------- .../dendrite/syncapi/sync/notifier.go | 3 ++ 2 files changed, 9 insertions(+), 45 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go index 6618b0bdf..e50f5f276 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" @@ -168,52 +169,10 @@ func (r joinRoomReq) joinRoomUsingServers( var eb gomatrixserverlib.EventBuilder r.writeToBuilder(&eb, roomID) - needed, err := gomatrixserverlib.StateNeededForEventBuilder(&eb) - if err != nil { - return httputil.LogThenError(r.req, err) - } - - // Ask the roomserver for information about this room - queryReq := api.QueryLatestEventsAndStateRequest{ - RoomID: roomID, - StateToFetch: needed.Tuples(), - } var queryRes api.QueryLatestEventsAndStateResponse - if queryErr := r.queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil { - return httputil.LogThenError(r.req, queryErr) - } - - if queryRes.RoomExists { - // The room exists in the local database, so we just have to send a join - // membership event and return the room ID - // TODO: Check if the user is allowed in the room (has been invited if - // the room is invite-only) - eb.Depth = queryRes.Depth - eb.PrevEvents = queryRes.LatestEvents - - authEvents := gomatrixserverlib.NewAuthEvents(nil) - - for i := range queryRes.StateEvents { - authEvents.AddEvent(&queryRes.StateEvents[i]) - } - - refs, err := needed.AuthEventReferences(&authEvents) - if err != nil { - return httputil.LogThenError(r.req, err) - } - eb.AuthEvents = refs - - now := time.Now() - eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.Matrix.ServerName) - event, err := eb.Build( - eventID, now, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey, - ) - if err != nil { - return httputil.LogThenError(r.req, err) - } - - if err := r.producer.SendEvents([]gomatrixserverlib.Event{event}, r.cfg.Matrix.ServerName); err != nil { - return httputil.LogThenError(r.req, err) + if event, err := events.BuildEvent(&eb, r.cfg, r.queryAPI, &queryRes); err == nil { + if sendErr := r.producer.SendEvents([]gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName); err != nil { + return httputil.LogThenError(r.req, sendErr) } return util.JSONResponse{ @@ -222,6 +181,8 @@ func (r joinRoomReq) joinRoomUsingServers( RoomID string `json:"room_id"` }{roomID}, } + } else if err != events.ErrRoomNoExists { + return httputil.LogThenError(r.req, err) } if len(servers) == 0 { diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index c2fdd8f03..2fa4279c1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -79,6 +79,9 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty case "invite": userIDs = append(userIDs, userID) case "join": + // Manually append the new user's ID so they get notified + // along all members in the room + userIDs = append(userIDs, userID) n.addJoinedUser(ev.RoomID(), userID) case "leave": fallthrough