mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-04 12:43:10 -06:00
Merge branch 'matrix-org:main' into main
This commit is contained in:
commit
88431773f7
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
|
"github.com/matrix-org/dendrite/internal/transactions"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
|
@ -40,12 +41,21 @@ type redactionResponse struct {
|
||||||
func SendRedaction(
|
func SendRedaction(
|
||||||
req *http.Request, device *userapi.Device, roomID, eventID string, cfg *config.ClientAPI,
|
req *http.Request, device *userapi.Device, roomID, eventID string, cfg *config.ClientAPI,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
|
txnID *string,
|
||||||
|
txnCache *transactions.Cache,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
resErr := checkMemberInRoom(req.Context(), rsAPI, device.UserID, roomID)
|
resErr := checkMemberInRoom(req.Context(), rsAPI, device.UserID, roomID)
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
return *resErr
|
return *resErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if txnID != nil {
|
||||||
|
// Try to fetch response from transactionsCache
|
||||||
|
if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok {
|
||||||
|
return *res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ev := roomserverAPI.GetEvent(req.Context(), rsAPI, eventID)
|
ev := roomserverAPI.GetEvent(req.Context(), rsAPI, eventID)
|
||||||
if ev == nil {
|
if ev == nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
@ -124,10 +134,18 @@ func SendRedaction(
|
||||||
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
|
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
return util.JSONResponse{
|
|
||||||
|
res := util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: redactionResponse{
|
JSON: redactionResponse{
|
||||||
EventID: e.EventID(),
|
EventID: e.EventID(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add response to transactionsCache
|
||||||
|
if txnID != nil {
|
||||||
|
txnCache.AddTransaction(device.AccessToken, *txnID, &res)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -479,7 +479,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return SendRedaction(req, device, vars["roomID"], vars["eventID"], cfg, rsAPI)
|
return SendRedaction(req, device, vars["roomID"], vars["eventID"], cfg, rsAPI, nil, nil)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPost, http.MethodOptions)
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
v3mux.Handle("/rooms/{roomID}/redact/{eventID}/{txnId}",
|
v3mux.Handle("/rooms/{roomID}/redact/{eventID}/{txnId}",
|
||||||
|
|
@ -488,7 +488,8 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return SendRedaction(req, device, vars["roomID"], vars["eventID"], cfg, rsAPI)
|
txnID := vars["txnId"]
|
||||||
|
return SendRedaction(req, device, vars["roomID"], vars["eventID"], cfg, rsAPI, &txnID, transactionsCache)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -362,6 +362,13 @@ func (a *KeyInternalAPI) processSelfSignatures(
|
||||||
for targetKeyID, signature := range forTargetUserID {
|
for targetKeyID, signature := range forTargetUserID {
|
||||||
switch sig := signature.CrossSigningBody.(type) {
|
switch sig := signature.CrossSigningBody.(type) {
|
||||||
case *gomatrixserverlib.CrossSigningKey:
|
case *gomatrixserverlib.CrossSigningKey:
|
||||||
|
for keyID := range sig.Keys {
|
||||||
|
split := strings.SplitN(string(keyID), ":", 2)
|
||||||
|
if len(split) > 1 && gomatrixserverlib.KeyID(split[1]) == targetKeyID {
|
||||||
|
targetKeyID = keyID // contains the ed25519: or other scheme
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
for originUserID, forOriginUserID := range sig.Signatures {
|
for originUserID, forOriginUserID := range sig.Signatures {
|
||||||
for originKeyID, originSig := range forOriginUserID {
|
for originKeyID, originSig := range forOriginUserID {
|
||||||
if err := a.DB.StoreCrossSigningSigsForTarget(
|
if err := a.DB.StoreCrossSigningSigsForTarget(
|
||||||
|
|
|
||||||
|
|
@ -33,8 +33,10 @@ CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs (
|
||||||
target_user_id TEXT NOT NULL,
|
target_user_id TEXT NOT NULL,
|
||||||
target_key_id TEXT NOT NULL,
|
target_key_id TEXT NOT NULL,
|
||||||
signature TEXT NOT NULL,
|
signature TEXT NOT NULL,
|
||||||
PRIMARY KEY (origin_user_id, target_user_id, target_key_id)
|
PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id);
|
||||||
`
|
`
|
||||||
|
|
||||||
const selectCrossSigningSigsForTargetSQL = "" +
|
const selectCrossSigningSigsForTargetSQL = "" +
|
||||||
|
|
@ -44,7 +46,7 @@ const selectCrossSigningSigsForTargetSQL = "" +
|
||||||
const upsertCrossSigningSigsForTargetSQL = "" +
|
const upsertCrossSigningSigsForTargetSQL = "" +
|
||||||
"INSERT INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" +
|
"INSERT INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" +
|
||||||
" VALUES($1, $2, $3, $4, $5)" +
|
" VALUES($1, $2, $3, $4, $5)" +
|
||||||
" ON CONFLICT (origin_user_id, target_user_id, target_key_id) DO UPDATE SET (origin_key_id, signature) = ($2, $5)"
|
" ON CONFLICT (origin_user_id, origin_key_id, target_user_id, target_key_id) DO UPDATE SET signature = $5"
|
||||||
|
|
||||||
const deleteCrossSigningSigsForTargetSQL = "" +
|
const deleteCrossSigningSigsForTargetSQL = "" +
|
||||||
"DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2"
|
"DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package deltas
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) {
|
||||||
|
m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey;
|
||||||
|
ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey;
|
||||||
|
ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, target_user_id, target_key_id);
|
||||||
|
|
||||||
|
DROP INDEX IF EXISTS keyserver_cross_signing_sigs_idx;
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to execute downgrade: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -54,6 +54,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
|
||||||
}
|
}
|
||||||
m := sqlutil.NewMigrations()
|
m := sqlutil.NewMigrations()
|
||||||
deltas.LoadRefactorKeyChanges(m)
|
deltas.LoadRefactorKeyChanges(m)
|
||||||
|
deltas.LoadFixCrossSigningSignatureIndexes(m)
|
||||||
if err = m.RunDeltas(db, dbProperties); err != nil {
|
if err = m.RunDeltas(db, dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,8 +33,10 @@ CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs (
|
||||||
target_user_id TEXT NOT NULL,
|
target_user_id TEXT NOT NULL,
|
||||||
target_key_id TEXT NOT NULL,
|
target_key_id TEXT NOT NULL,
|
||||||
signature TEXT NOT NULL,
|
signature TEXT NOT NULL,
|
||||||
PRIMARY KEY (origin_user_id, target_user_id, target_key_id)
|
PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id);
|
||||||
`
|
`
|
||||||
|
|
||||||
const selectCrossSigningSigsForTargetSQL = "" +
|
const selectCrossSigningSigsForTargetSQL = "" +
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,76 @@
|
||||||
|
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package deltas
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) {
|
||||||
|
m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp (
|
||||||
|
origin_user_id TEXT NOT NULL,
|
||||||
|
origin_key_id TEXT NOT NULL,
|
||||||
|
target_user_id TEXT NOT NULL,
|
||||||
|
target_key_id TEXT NOT NULL,
|
||||||
|
signature TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO keyserver_cross_signing_sigs_tmp (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)
|
||||||
|
SELECT origin_user_id, origin_key_id, target_user_id, target_key_id, signature FROM keyserver_cross_signing_sigs;
|
||||||
|
|
||||||
|
DROP TABLE keyserver_cross_signing_sigs;
|
||||||
|
ALTER TABLE keyserver_cross_signing_sigs_tmp RENAME TO keyserver_cross_signing_sigs;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error {
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp (
|
||||||
|
origin_user_id TEXT NOT NULL,
|
||||||
|
origin_key_id TEXT NOT NULL,
|
||||||
|
target_user_id TEXT NOT NULL,
|
||||||
|
target_key_id TEXT NOT NULL,
|
||||||
|
signature TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (origin_user_id, target_user_id, target_key_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO keyserver_cross_signing_sigs_tmp (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)
|
||||||
|
SELECT origin_user_id, origin_key_id, target_user_id, target_key_id, signature FROM keyserver_cross_signing_sigs;
|
||||||
|
|
||||||
|
DROP TABLE keyserver_cross_signing_sigs;
|
||||||
|
ALTER TABLE keyserver_cross_signing_sigs_tmp RENAME TO keyserver_cross_signing_sigs;
|
||||||
|
|
||||||
|
DELETE INDEX IF EXISTS keyserver_cross_signing_sigs_idx;
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to execute downgrade: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -53,6 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
|
||||||
|
|
||||||
m := sqlutil.NewMigrations()
|
m := sqlutil.NewMigrations()
|
||||||
deltas.LoadRefactorKeyChanges(m)
|
deltas.LoadRefactorKeyChanges(m)
|
||||||
|
deltas.LoadFixCrossSigningSignatureIndexes(m)
|
||||||
if err = m.RunDeltas(db, dbProperties); err != nil {
|
if err = m.RunDeltas(db, dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ type Database interface {
|
||||||
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
||||||
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
||||||
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
|
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
|
||||||
|
GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error)
|
||||||
|
|
||||||
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,8 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
@ -61,9 +63,13 @@ const selectMembershipCountSQL = "" +
|
||||||
" SELECT DISTINCT ON (room_id, user_id) room_id, user_id, membership FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC" +
|
" SELECT DISTINCT ON (room_id, user_id) room_id, user_id, membership FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC" +
|
||||||
") t WHERE t.membership = $3"
|
") t WHERE t.membership = $3"
|
||||||
|
|
||||||
|
const selectHeroesSQL = "" +
|
||||||
|
"SELECT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5"
|
||||||
|
|
||||||
type membershipsStatements struct {
|
type membershipsStatements struct {
|
||||||
upsertMembershipStmt *sql.Stmt
|
upsertMembershipStmt *sql.Stmt
|
||||||
selectMembershipCountStmt *sql.Stmt
|
selectMembershipCountStmt *sql.Stmt
|
||||||
|
selectHeroesStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
|
|
@ -72,13 +78,11 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
return s, sqlutil.StatementList{
|
||||||
return nil, err
|
{&s.upsertMembershipStmt, upsertMembershipSQL},
|
||||||
}
|
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
|
||||||
if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil {
|
{&s.selectHeroesStmt, selectHeroesSQL},
|
||||||
return nil, err
|
}.Prepare(db)
|
||||||
}
|
|
||||||
return s, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *membershipsStatements) UpsertMembership(
|
func (s *membershipsStatements) UpsertMembership(
|
||||||
|
|
@ -108,3 +112,23 @@ func (s *membershipsStatements) SelectMembershipCount(
|
||||||
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
|
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) SelectHeroes(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
|
||||||
|
) (heroes []string, err error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectHeroesStmt)
|
||||||
|
var rows *sql.Rows
|
||||||
|
rows, err = stmt.QueryContext(ctx, roomID, userID, pq.StringArray(memberships))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
|
||||||
|
var hero string
|
||||||
|
for rows.Next() {
|
||||||
|
if err = rows.Scan(&hero); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
heroes = append(heroes, hero)
|
||||||
|
}
|
||||||
|
return heroes, rows.Err()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -124,6 +124,10 @@ func (d *Database) MembershipCount(ctx context.Context, roomID, membership strin
|
||||||
return d.Memberships.SelectMembershipCount(ctx, nil, roomID, membership, pos)
|
return d.Memberships.SelectMembershipCount(ctx, nil, roomID, membership, pos)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) {
|
||||||
|
return d.Memberships.SelectHeroes(ctx, nil, roomID, userID, memberships)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
|
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
|
||||||
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
|
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
@ -61,10 +63,14 @@ const selectMembershipCountSQL = "" +
|
||||||
" SELECT * FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 GROUP BY user_id HAVING(max(stream_pos))" +
|
" SELECT * FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 GROUP BY user_id HAVING(max(stream_pos))" +
|
||||||
") t WHERE t.membership = $3"
|
") t WHERE t.membership = $3"
|
||||||
|
|
||||||
|
const selectHeroesSQL = "" +
|
||||||
|
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5"
|
||||||
|
|
||||||
type membershipsStatements struct {
|
type membershipsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
upsertMembershipStmt *sql.Stmt
|
upsertMembershipStmt *sql.Stmt
|
||||||
selectMembershipCountStmt *sql.Stmt
|
selectMembershipCountStmt *sql.Stmt
|
||||||
|
//selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
|
|
@ -75,13 +81,11 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
return s, sqlutil.StatementList{
|
||||||
return nil, err
|
{&s.upsertMembershipStmt, upsertMembershipSQL},
|
||||||
}
|
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
|
||||||
if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil {
|
// {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic
|
||||||
return nil, err
|
}.Prepare(db)
|
||||||
}
|
|
||||||
return s, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *membershipsStatements) UpsertMembership(
|
func (s *membershipsStatements) UpsertMembership(
|
||||||
|
|
@ -111,3 +115,36 @@ func (s *membershipsStatements) SelectMembershipCount(
|
||||||
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
|
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) SelectHeroes(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
|
||||||
|
) (heroes []string, err error) {
|
||||||
|
stmtSQL := strings.Replace(selectHeroesSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
|
||||||
|
stmt, err := s.db.PrepareContext(ctx, stmtSQL)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, stmt, "SelectHeroes: stmt.close() failed")
|
||||||
|
params := []interface{}{
|
||||||
|
roomID, userID,
|
||||||
|
}
|
||||||
|
for _, membership := range memberships {
|
||||||
|
params = append(params, membership)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt = sqlutil.TxStmt(txn, stmt)
|
||||||
|
var rows *sql.Rows
|
||||||
|
rows, err = stmt.QueryContext(ctx, params...)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
|
||||||
|
var hero string
|
||||||
|
for rows.Next() {
|
||||||
|
if err = rows.Scan(&hero); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
heroes = append(heroes, hero)
|
||||||
|
}
|
||||||
|
return heroes, rows.Err()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -170,6 +170,7 @@ type Receipts interface {
|
||||||
type Memberships interface {
|
type Memberships interface {
|
||||||
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
||||||
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
|
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
|
||||||
|
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type NotificationData interface {
|
type NotificationData interface {
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,16 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -30,6 +33,7 @@ type PDUStreamProvider struct {
|
||||||
workers atomic.Int32
|
workers atomic.Int32
|
||||||
// userID+deviceID -> lazy loading cache
|
// userID+deviceID -> lazy loading cache
|
||||||
lazyLoadCache *caching.LazyLoadCache
|
lazyLoadCache *caching.LazyLoadCache
|
||||||
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) worker() {
|
func (p *PDUStreamProvider) worker() {
|
||||||
|
|
@ -290,16 +294,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Work out how many members are in the room.
|
|
||||||
joinedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Join, latestPosition)
|
|
||||||
invitedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Invite, latestPosition)
|
|
||||||
|
|
||||||
switch delta.Membership {
|
switch delta.Membership {
|
||||||
case gomatrixserverlib.Join:
|
case gomatrixserverlib.Join:
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
if hasMembershipChange {
|
if hasMembershipChange {
|
||||||
jr.Summary.JoinedMemberCount = &joinedCount
|
p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
|
||||||
jr.Summary.InvitedMemberCount = &invitedCount
|
|
||||||
}
|
}
|
||||||
jr.Timeline.PrevBatch = &prevBatch
|
jr.Timeline.PrevBatch = &prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
|
@ -332,6 +331,45 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
return latestPosition, nil
|
return latestPosition, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
|
||||||
|
// Work out how many members are in the room.
|
||||||
|
joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
|
||||||
|
invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)
|
||||||
|
|
||||||
|
jr.Summary.JoinedMemberCount = &joinedCount
|
||||||
|
jr.Summary.InvitedMemberCount = &invitedCount
|
||||||
|
|
||||||
|
fetchStates := []gomatrixserverlib.StateKeyTuple{
|
||||||
|
{EventType: gomatrixserverlib.MRoomName},
|
||||||
|
{EventType: gomatrixserverlib.MRoomCanonicalAlias},
|
||||||
|
}
|
||||||
|
// Check if the room has a name or a canonical alias
|
||||||
|
latestState := &roomserverAPI.QueryLatestEventsAndStateResponse{}
|
||||||
|
err := p.rsAPI.QueryLatestEventsAndState(ctx, &roomserverAPI.QueryLatestEventsAndStateRequest{StateToFetch: fetchStates, RoomID: roomID}, latestState)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Check if the room has a name or canonical alias, if so, return.
|
||||||
|
for _, ev := range latestState.StateEvents {
|
||||||
|
switch ev.Type() {
|
||||||
|
case gomatrixserverlib.MRoomName:
|
||||||
|
if gjson.GetBytes(ev.Content(), "name").Str != "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case gomatrixserverlib.MRoomCanonicalAlias:
|
||||||
|
if gjson.GetBytes(ev.Content(), "alias").Str != "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
heroes, err := p.DB.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sort.Strings(heroes)
|
||||||
|
jr.Summary.Heroes = heroes
|
||||||
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
roomID string,
|
roomID string,
|
||||||
|
|
@ -416,9 +454,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
prevBatch.Decrement()
|
prevBatch.Decrement()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Work out how many members are in the room.
|
p.addRoomSummary(ctx, jr, roomID, device.UserID, r.From)
|
||||||
joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, r.From)
|
|
||||||
invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, r.From)
|
|
||||||
|
|
||||||
// We don't include a device here as we don't need to send down
|
// We don't include a device here as we don't need to send down
|
||||||
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
||||||
|
|
@ -439,8 +475,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
jr.Summary.JoinedMemberCount = &joinedCount
|
|
||||||
jr.Summary.InvitedMemberCount = &invitedCount
|
|
||||||
jr.Timeline.PrevBatch = prevBatch
|
jr.Timeline.PrevBatch = prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = limited
|
jr.Timeline.Limited = limited
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ func NewSyncStreamProviders(
|
||||||
PDUStreamProvider: &PDUStreamProvider{
|
PDUStreamProvider: &PDUStreamProvider{
|
||||||
StreamProvider: StreamProvider{DB: d},
|
StreamProvider: StreamProvider{DB: d},
|
||||||
lazyLoadCache: lazyLoadCache,
|
lazyLoadCache: lazyLoadCache,
|
||||||
|
rsAPI: rsAPI,
|
||||||
},
|
},
|
||||||
TypingStreamProvider: &TypingStreamProvider{
|
TypingStreamProvider: &TypingStreamProvider{
|
||||||
StreamProvider: StreamProvider{DB: d},
|
StreamProvider: StreamProvider{DB: d},
|
||||||
|
|
|
||||||
|
|
@ -713,4 +713,8 @@ Presence can be set from sync
|
||||||
/state returns M_NOT_FOUND for a rejected message event
|
/state returns M_NOT_FOUND for a rejected message event
|
||||||
/state_ids returns M_NOT_FOUND for a rejected message event
|
/state_ids returns M_NOT_FOUND for a rejected message event
|
||||||
/state returns M_NOT_FOUND for a rejected state event
|
/state returns M_NOT_FOUND for a rejected state event
|
||||||
/state_ids returns M_NOT_FOUND for a rejected state event
|
/state_ids returns M_NOT_FOUND for a rejected state event
|
||||||
|
PUT /rooms/:room_id/redact/:event_id/:txn_id is idempotent
|
||||||
|
Unnamed room comes with a name summary
|
||||||
|
Named room comes with just joined member count summary
|
||||||
|
Room summary only has 5 heroes
|
||||||
Loading…
Reference in a new issue