mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-08 07:23:10 -06:00
Add query for all joined users in all rooms. Glue everything together.
It might work. Untested.
This commit is contained in:
parent
aab28c2970
commit
b3ad8475ac
|
|
@ -78,6 +78,9 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
n := sync.NewNotifier(types.StreamPosition(pos))
|
n := sync.NewNotifier(types.StreamPosition(pos))
|
||||||
|
if err := n.Load(db); err != nil {
|
||||||
|
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||||
|
}
|
||||||
server, err := consumers.NewServer(cfg, n, db)
|
server, err := consumers.NewServer(cfg, n, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create sync server: %s", err)
|
log.Panicf("startup: failed to create sync server: %s", err)
|
||||||
|
|
|
||||||
|
|
@ -61,11 +61,15 @@ const selectRoomIDsWithMembershipSQL = "" +
|
||||||
const selectCurrentStateSQL = "" +
|
const selectCurrentStateSQL = "" +
|
||||||
"SELECT event_json FROM current_room_state WHERE room_id = $1"
|
"SELECT event_json FROM current_room_state WHERE room_id = $1"
|
||||||
|
|
||||||
|
const selectJoinedUsersSQL = "" +
|
||||||
|
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||||
|
|
||||||
type currentRoomStateStatements struct {
|
type currentRoomStateStatements struct {
|
||||||
upsertRoomStateStmt *sql.Stmt
|
upsertRoomStateStmt *sql.Stmt
|
||||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||||
selectCurrentStateStmt *sql.Stmt
|
selectCurrentStateStmt *sql.Stmt
|
||||||
|
selectJoinedUsersStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
|
@ -85,8 +89,33 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
|
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
|
||||||
|
func (s *currentRoomStateStatements) JoinedMemberLists() (map[string][]string, error) {
|
||||||
|
rows, err := s.selectJoinedUsersStmt.Query()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
result := make(map[string][]string)
|
||||||
|
for rows.Next() {
|
||||||
|
var roomID string
|
||||||
|
var userID string
|
||||||
|
if err := rows.Scan(&roomID, &userID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
users := result[roomID]
|
||||||
|
users = append(users, userID)
|
||||||
|
result[roomID] = users
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||||
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(txn *sql.Tx, userID, membership string) ([]string, error) {
|
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(txn *sql.Tx, userID, membership string) ([]string, error) {
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,11 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
||||||
return &SyncServerDatabase{db, partitions, events, state}, nil
|
return &SyncServerDatabase{db, partitions, events, state}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
||||||
|
func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error) {
|
||||||
|
return d.roomstate.JoinedMemberLists()
|
||||||
|
}
|
||||||
|
|
||||||
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
||||||
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
||||||
// Returns an error if there was a problem inserting this event.
|
// Returns an error if there was a problem inserting this event.
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/clientapi/events"
|
"github.com/matrix-org/dendrite/clientapi/events"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
@ -46,7 +47,7 @@ type Notifier struct {
|
||||||
|
|
||||||
// NewNotifier creates a new notifier set to the given stream position.
|
// NewNotifier creates a new notifier set to the given stream position.
|
||||||
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
||||||
// the joined users within each of them by calling Notifier.UsersJoinedToRooms().
|
// the joined users within each of them by calling Notifier.LoadFromDatabase().
|
||||||
func NewNotifier(pos types.StreamPosition) *Notifier {
|
func NewNotifier(pos types.StreamPosition) *Notifier {
|
||||||
return &Notifier{
|
return &Notifier{
|
||||||
currPos: pos,
|
currPos: pos,
|
||||||
|
|
@ -127,10 +128,20 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
|
// Load the membership states required to notify users correctly.
|
||||||
|
func (n *Notifier) Load(db *storage.SyncServerDatabase) error {
|
||||||
|
roomToUsers, err := db.AllJoinedUsersInRooms()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n.usersJoinedToRooms(roomToUsers)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// usersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
|
||||||
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
|
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
|
||||||
// OnNewEvent (eg on startup) to prevent racing.
|
// OnNewEvent (eg on startup) to prevent racing.
|
||||||
func (n *Notifier) UsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
|
func (n *Notifier) usersJoinedToRooms(roomIDToUserIDs map[string][]string) {
|
||||||
// This is just the bulk form of userJoined where we only lock once.
|
// This is just the bulk form of userJoined where we only lock once.
|
||||||
n.roomIDToJoinedUsersMutex.Lock()
|
n.roomIDToJoinedUsersMutex.Lock()
|
||||||
defer n.roomIDToJoinedUsersMutex.Unlock()
|
defer n.roomIDToJoinedUsersMutex.Unlock()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue