diff --git a/syncapi/routing/location_sync.go b/syncapi/routing/location_sync.go index 340726525..68794ca24 100644 --- a/syncapi/routing/location_sync.go +++ b/syncapi/routing/location_sync.go @@ -15,7 +15,6 @@ package routing import ( - "context" "database/sql" "net/http" @@ -45,15 +44,10 @@ func GetLocationSync( logrus.WithError(err).Error("Failed to get snapshot for locations sync") return jsonerror.InternalServerError() } - id, err := snapshot.SelectMaxMultiRoomDataEventId(context.Background()) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("failed to get max multiroom data event id") - return jsonerror.InternalServerError() - } - mr, err := snapshot.SelectMultiRoomData(req.Context(), &types.Range{From: 0, To: id}, []string{roomID}) + mr, err := snapshot.SelectAllMultiRoomDataInRoom(req.Context(), roomID) if err != nil { if err != sql.ErrNoRows { - util.GetLogger(req.Context()).WithError(err).Error("failed to select multiroom data for room") + util.GetLogger(req.Context()).WithError(err).Error("failed to select all most recent multiroom data for room") return jsonerror.InternalServerError() } } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index a53822f7e..faadbdfe3 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -109,6 +109,7 @@ type DatabaseTransaction interface { PresenceAfter(ctx context.Context, after types.StreamPosition, filter synctypes.EventFilter) (map[string]*types.PresenceInternal, error) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (events []types.StreamEvent, prevBatch, nextBatch string, err error) SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string) (types.MultiRoom, error) + SelectAllMultiRoomDataInRoom(ctx context.Context, roomId string) (types.MultiRoom, error) } type Database interface { diff --git a/syncapi/storage/postgres/multiroomcast_table.go b/syncapi/storage/postgres/multiroomcast_table.go index de07616ed..60e4acf85 100644 --- a/syncapi/storage/postgres/multiroomcast_table.go +++ b/syncapi/storage/postgres/multiroomcast_table.go @@ -25,12 +25,15 @@ WHERE v.room_id = ANY($1) AND id > $2 AND id <= $3` -const selectMaxMultiCastIDSQL = "" + - "SELECT MAX(id) FROM syncapi_multiroom_data" +const selectAllMultiRoomCastInRoomSQL = `SELECT d.user_id, d.type, d.data, d.ts FROM syncapi_multiroom_data AS d +JOIN syncapi_multiroom_visibility AS v +ON d.user_id = v.user_id +AND concat(d.type, '.visibility') = v.type +WHERE v.room_id = $1` type multiRoomStatements struct { - selectMultiRoomCast *sql.Stmt - selectMaxMultiCastID *sql.Stmt + selectMultiRoomCast *sql.Stmt + selectAllMultiRoomCastInRoom *sql.Stmt } func NewPostgresMultiRoomCastTable(db *sql.DB) (tables.MultiRoom, error) { @@ -41,7 +44,7 @@ func NewPostgresMultiRoomCastTable(db *sql.DB) (tables.MultiRoom, error) { } return r, sqlutil.StatementList{ {&r.selectMultiRoomCast, selectMultiRoomCastSQL}, - {&r.selectMaxMultiCastID, selectMaxMultiCastIDSQL}, + {&r.selectAllMultiRoomCastInRoom, selectAllMultiRoomCastInRoomSQL}, }.Prepare(db) } @@ -65,10 +68,22 @@ func (s *multiRoomStatements) SelectMultiRoomData(ctx context.Context, r *types. return data, rows.Err() } -func (s *multiRoomStatements) SelectMaxMultiRoomDataEventId( - ctx context.Context, txn *sql.Tx, -) (id int64, err error) { - stmt := sqlutil.TxStmt(txn, s.selectMaxMultiCastID) - err = stmt.QueryRowContext(ctx).Scan(&id) - return +func (s *multiRoomStatements) SelectAllMultiRoomDataInRoom(ctx context.Context, roomId string, txn *sql.Tx) ([]*types.MultiRoomDataRow, error) { + rows, err := sqlutil.TxStmt(txn, s.selectAllMultiRoomCastInRoom).QueryContext(ctx, roomId) + if err != nil { + return nil, err + } + data := make([]*types.MultiRoomDataRow, 0) + defer internal.CloseAndLogIfError(ctx, rows, "SelectAllMultiRoomDataInRoom: rows.close() failed") + var t time.Time + for rows.Next() { + r := types.MultiRoomDataRow{} + err = rows.Scan(&r.UserId, &r.Type, &r.Data, &t) + r.Timestamp = t.UnixMilli() + if err != nil { + return nil, fmt.Errorf("rows scan: %w", err) + } + data = append(data, &r) + } + return data, rows.Err() } diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index 5b4d528a7..12b3df11b 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -825,7 +825,21 @@ func (d *DatabaseTransaction) SelectMultiRoomData(ctx context.Context, r *types. } -func (d *DatabaseTransaction) SelectMaxMultiRoomDataEventId(ctx context.Context) (types.StreamPosition, error) { - id, err := d.MultiRoom.SelectMaxMultiRoomDataEventId(ctx, d.txn) - return types.StreamPosition(id), err +func (d *DatabaseTransaction) SelectAllMultiRoomDataInRoom(ctx context.Context, roomId string) (types.MultiRoom, error) { + rows, err := d.MultiRoom.SelectAllMultiRoomDataInRoom(ctx, roomId, d.txn) + if err != nil { + return nil, fmt.Errorf("select all multi room data in room: %w", err) + } + mr := make(types.MultiRoom, 3) + for _, row := range rows { + if mr[row.UserId] == nil { + mr[row.UserId] = make(map[string]types.MultiRoomData) + } + mr[row.UserId][row.Type] = types.MultiRoomData{ + Content: row.Data, + OriginServerTs: row.Timestamp, + } + } + return mr, nil + } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 462380fc5..7a66ed7dd 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -245,5 +245,5 @@ type Relations interface { type MultiRoom interface { SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string, txn *sql.Tx) ([]*types.MultiRoomDataRow, error) - SelectMaxMultiRoomDataEventId(ctx context.Context, txn *sql.Tx) (id int64, err error) + SelectAllMultiRoomDataInRoom(ctx context.Context, roomId string, txn *sql.Tx) ([]*types.MultiRoomDataRow, error) }