From b3ad8475ac85ec14c865f669fc8136c99347e9b7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 12 May 2017 16:27:46 +0100 Subject: [PATCH] Add query for all joined users in all rooms. Glue everything together. It might work. Untested. --- .../cmd/dendrite-sync-api-server/main.go | 3 ++ .../storage/current_room_state_table.go | 29 +++++++++++++++++++ .../dendrite/syncapi/storage/syncserver.go | 5 ++++ .../dendrite/syncapi/sync/notifier.go | 17 +++++++++-- 4 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index f67bf0e5e..8b1da837b 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -78,6 +78,9 @@ func main() { } n := sync.NewNotifier(types.StreamPosition(pos)) + if err := n.Load(db); err != nil { + log.Panicf("startup: failed to set up notifier: %s", err) + } server, err := consumers.NewServer(cfg, n, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index e8cc68517..b74514c16 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -61,11 +61,15 @@ const selectRoomIDsWithMembershipSQL = "" + const selectCurrentStateSQL = "" + "SELECT event_json FROM current_room_state WHERE room_id = $1" +const selectJoinedUsersSQL = "" + + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt + selectJoinedUsersStmt *sql.Stmt } func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { @@ -85,9 +89,34 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil { return } + if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { + return + } return } +// JoinedMemberLists returns a map of room ID to a list of joined user IDs. +func (s *currentRoomStateStatements) JoinedMemberLists() (map[string][]string, error) { + rows, err := s.selectJoinedUsersStmt.Query() + if err != nil { + return nil, err + } + defer rows.Close() + + result := make(map[string][]string) + for rows.Next() { + var roomID string + var userID string + if err := rows.Scan(&roomID, &userID); err != nil { + return nil, err + } + users := result[roomID] + users = append(users, userID) + result[roomID] = users + } + return result, nil +} + // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(txn *sql.Tx, userID, membership string) ([]string, error) { rows, err := txn.Stmt(s.selectRoomIDsWithMembershipStmt).Query(userID, membership) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index cc9fbdec0..d0411c00c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -53,6 +53,11 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { return &SyncServerDatabase{db, partitions, events, state}, nil } +// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. +func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error) { + return d.roomstate.JoinedMemberLists() +} + // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 08b27a900..555245def 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -20,6 +20,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/events" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -46,7 +47,7 @@ type Notifier struct { // NewNotifier creates a new notifier set to the given stream position. // In order for this to be of any use, the Notifier needs to be told all rooms and -// the joined users within each of them by calling Notifier.UsersJoinedToRooms(). +// the joined users within each of them by calling Notifier.LoadFromDatabase(). func NewNotifier(pos types.StreamPosition) *Notifier { return &Notifier{ currPos: pos, @@ -127,10 +128,20 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { } } -// UsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from +// Load the membership states required to notify users correctly. +func (n *Notifier) Load(db *storage.SyncServerDatabase) error { + roomToUsers, err := db.AllJoinedUsersInRooms() + if err != nil { + return err + } + n.usersJoinedToRooms(roomToUsers) + return nil +} + +// usersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from // these rooms will wake the given users /sync requests. This should be called prior to ANY calls to // OnNewEvent (eg on startup) to prevent racing. -func (n *Notifier) UsersJoinedToRooms(roomIDToUserIDs map[string][]string) { +func (n *Notifier) usersJoinedToRooms(roomIDToUserIDs map[string][]string) { // This is just the bulk form of userJoined where we only lock once. n.roomIDToJoinedUsersMutex.Lock() defer n.roomIDToJoinedUsersMutex.Unlock()