Only load members of newly joined rooms (#2389)
* Only load members of newly joined rooms * Comment that the query is prepared at runtime Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
parent
c6ea2c9ff2
commit
2a5b8e0306
|
@ -333,6 +333,20 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LoadRooms loads the membership states required to notify users correctly.
|
||||||
|
func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs []string) error {
|
||||||
|
n.lock.Lock()
|
||||||
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
|
roomToUsers, err := db.AllJoinedUsersInRoom(ctx, roomIDs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n.setUsersJoinedToRooms(roomToUsers)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// CurrentPosition returns the current sync position
|
// CurrentPosition returns the current sync position
|
||||||
func (n *Notifier) CurrentPosition() types.StreamingToken {
|
func (n *Notifier) CurrentPosition() types.StreamingToken {
|
||||||
n.lock.RLock()
|
n.lock.RLock()
|
||||||
|
|
|
@ -52,6 +52,9 @@ type Database interface {
|
||||||
|
|
||||||
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
||||||
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
|
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
|
||||||
|
// AllJoinedUsersInRoom returns a map of room ID to a list of all joined user IDs for a given room.
|
||||||
|
AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error)
|
||||||
|
|
||||||
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.
|
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.
|
||||||
AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error)
|
AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error)
|
||||||
// Events lookups a list of event by their event ID.
|
// Events lookups a list of event by their event ID.
|
||||||
|
|
|
@ -93,6 +93,9 @@ const selectCurrentStateSQL = "" +
|
||||||
const selectJoinedUsersSQL = "" +
|
const selectJoinedUsersSQL = "" +
|
||||||
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||||
|
|
||||||
|
const selectJoinedUsersInRoomSQL = "" +
|
||||||
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id = ANY($1)"
|
||||||
|
|
||||||
const selectStateEventSQL = "" +
|
const selectStateEventSQL = "" +
|
||||||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||||
|
|
||||||
|
@ -112,6 +115,7 @@ type currentRoomStateStatements struct {
|
||||||
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
|
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
|
||||||
selectCurrentStateStmt *sql.Stmt
|
selectCurrentStateStmt *sql.Stmt
|
||||||
selectJoinedUsersStmt *sql.Stmt
|
selectJoinedUsersStmt *sql.Stmt
|
||||||
|
selectJoinedUsersInRoomStmt *sql.Stmt
|
||||||
selectEventsWithEventIDsStmt *sql.Stmt
|
selectEventsWithEventIDsStmt *sql.Stmt
|
||||||
selectStateEventStmt *sql.Stmt
|
selectStateEventStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
@ -143,6 +147,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro
|
||||||
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
|
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -163,9 +170,32 @@ func (s *currentRoomStateStatements) SelectJoinedUsers(
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
|
||||||
|
|
||||||
result := make(map[string][]string)
|
result := make(map[string][]string)
|
||||||
for rows.Next() {
|
|
||||||
var roomID string
|
var roomID string
|
||||||
var userID string
|
var userID string
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&roomID, &userID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
users := result[roomID]
|
||||||
|
users = append(users, userID)
|
||||||
|
result[roomID] = users
|
||||||
|
}
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
|
||||||
|
func (s *currentRoomStateStatements) SelectJoinedUsersInRoom(
|
||||||
|
ctx context.Context, roomIDs []string,
|
||||||
|
) (map[string][]string, error) {
|
||||||
|
rows, err := s.selectJoinedUsersInRoomStmt.QueryContext(ctx, pq.StringArray(roomIDs))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
|
||||||
|
|
||||||
|
result := make(map[string][]string)
|
||||||
|
var userID, roomID string
|
||||||
|
for rows.Next() {
|
||||||
if err := rows.Scan(&roomID, &userID); err != nil {
|
if err := rows.Scan(&roomID, &userID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,6 +168,10 @@ func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]stri
|
||||||
return d.CurrentRoomState.SelectJoinedUsers(ctx)
|
return d.CurrentRoomState.SelectJoinedUsers(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) {
|
||||||
|
return d.CurrentRoomState.SelectJoinedUsersInRoom(ctx, roomIDs)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) {
|
func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) {
|
||||||
return d.Peeks.SelectPeekingDevices(ctx)
|
return d.Peeks.SelectPeekingDevices(ctx)
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,9 @@ const selectCurrentStateSQL = "" +
|
||||||
const selectJoinedUsersSQL = "" +
|
const selectJoinedUsersSQL = "" +
|
||||||
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||||
|
|
||||||
|
const selectJoinedUsersInRoomSQL = "" +
|
||||||
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id IN ($1)"
|
||||||
|
|
||||||
const selectStateEventSQL = "" +
|
const selectStateEventSQL = "" +
|
||||||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||||
|
|
||||||
|
@ -97,6 +100,7 @@ type currentRoomStateStatements struct {
|
||||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||||
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
|
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
|
||||||
selectJoinedUsersStmt *sql.Stmt
|
selectJoinedUsersStmt *sql.Stmt
|
||||||
|
//selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic
|
||||||
selectStateEventStmt *sql.Stmt
|
selectStateEventStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,13 +131,16 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (t
|
||||||
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
//if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil {
|
||||||
|
// return nil, err
|
||||||
|
//}
|
||||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
|
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
|
||||||
func (s *currentRoomStateStatements) SelectJoinedUsers(
|
func (s *currentRoomStateStatements) SelectJoinedUsers(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
) (map[string][]string, error) {
|
) (map[string][]string, error) {
|
||||||
|
@ -144,9 +151,9 @@ func (s *currentRoomStateStatements) SelectJoinedUsers(
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
|
||||||
|
|
||||||
result := make(map[string][]string)
|
result := make(map[string][]string)
|
||||||
for rows.Next() {
|
|
||||||
var roomID string
|
var roomID string
|
||||||
var userID string
|
var userID string
|
||||||
|
for rows.Next() {
|
||||||
if err := rows.Scan(&roomID, &userID); err != nil {
|
if err := rows.Scan(&roomID, &userID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -157,6 +164,40 @@ func (s *currentRoomStateStatements) SelectJoinedUsers(
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
|
||||||
|
func (s *currentRoomStateStatements) SelectJoinedUsersInRoom(
|
||||||
|
ctx context.Context, roomIDs []string,
|
||||||
|
) (map[string][]string, error) {
|
||||||
|
query := strings.Replace(selectJoinedUsersInRoomSQL, "($1)", sqlutil.QueryVariadic(len(roomIDs)), 1)
|
||||||
|
params := make([]interface{}, 0, len(roomIDs))
|
||||||
|
for _, roomID := range roomIDs {
|
||||||
|
params = append(params, roomID)
|
||||||
|
}
|
||||||
|
stmt, err := s.db.Prepare(query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("SelectJoinedUsersInRoom s.db.Prepare: %w", err)
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, stmt, "SelectJoinedUsersInRoom: stmt.close() failed")
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "SelectJoinedUsersInRoom: rows.close() failed")
|
||||||
|
|
||||||
|
result := make(map[string][]string)
|
||||||
|
var userID, roomID string
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&roomID, &userID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
users := result[roomID]
|
||||||
|
users = append(users, userID)
|
||||||
|
result[roomID] = users
|
||||||
|
}
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||||
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|
|
@ -102,6 +102,8 @@ type CurrentRoomState interface {
|
||||||
SelectRoomIDsWithAnyMembership(ctx context.Context, txn *sql.Tx, userID string) (map[string]string, error)
|
SelectRoomIDsWithAnyMembership(ctx context.Context, txn *sql.Tx, userID string) (map[string]string, error)
|
||||||
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
|
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
|
||||||
SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
|
SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
|
||||||
|
// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
|
||||||
|
SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackwardsExtremities keeps track of backwards extremities for a room.
|
// BackwardsExtremities keeps track of backwards extremities for a room.
|
||||||
|
|
|
@ -67,9 +67,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
// add newly joined rooms user presences
|
// add newly joined rooms user presences
|
||||||
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
|
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
|
||||||
if len(newlyJoined) > 0 {
|
if len(newlyJoined) > 0 {
|
||||||
// TODO: This refreshes all lists and is quite expensive
|
// TODO: Check if this is working better than before.
|
||||||
// The notifier should update the lists itself
|
if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
|
||||||
if err = p.notifier.Load(ctx, p.DB); err != nil {
|
|
||||||
req.Log.WithError(err).Error("unable to refresh notifier lists")
|
req.Log.WithError(err).Error("unable to refresh notifier lists")
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue