diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 8cd944346..aac36cc76 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -75,7 +75,7 @@ func (r *FederationInternalAPI) PerformJoin( seenSet := make(map[gomatrixserverlib.ServerName]bool) var uniqueList []gomatrixserverlib.ServerName for _, srv := range request.ServerNames { - if seenSet[srv] { + if seenSet[srv] || srv == r.cfg.Matrix.ServerName { continue } seenSet[srv] = true diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 2281f4bbf..08bbfedb8 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -362,6 +362,13 @@ func (a *KeyInternalAPI) processSelfSignatures( for targetKeyID, signature := range forTargetUserID { switch sig := signature.CrossSigningBody.(type) { case *gomatrixserverlib.CrossSigningKey: + for keyID := range sig.Keys { + split := strings.SplitN(string(keyID), ":", 2) + if len(split) > 1 && gomatrixserverlib.KeyID(split[1]) == targetKeyID { + targetKeyID = keyID // contains the ed25519: or other scheme + break + } + } for originUserID, forOriginUserID := range sig.Signatures { for originKeyID, originSig := range forOriginUserID { if err := a.DB.StoreCrossSigningSigsForTarget( diff --git a/keyserver/storage/postgres/cross_signing_sigs_table.go b/keyserver/storage/postgres/cross_signing_sigs_table.go index 40633c05c..b101e7ce5 100644 --- a/keyserver/storage/postgres/cross_signing_sigs_table.go +++ b/keyserver/storage/postgres/cross_signing_sigs_table.go @@ -33,8 +33,10 @@ CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs ( target_user_id TEXT NOT NULL, target_key_id TEXT NOT NULL, signature TEXT NOT NULL, - PRIMARY KEY (origin_user_id, target_user_id, target_key_id) + PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id) ); + +CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); ` const selectCrossSigningSigsForTargetSQL = "" + @@ -44,7 +46,7 @@ const selectCrossSigningSigsForTargetSQL = "" + const upsertCrossSigningSigsForTargetSQL = "" + "INSERT INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" + " VALUES($1, $2, $3, $4, $5)" + - " ON CONFLICT (origin_user_id, target_user_id, target_key_id) DO UPDATE SET (origin_key_id, signature) = ($2, $5)" + " ON CONFLICT (origin_user_id, origin_key_id, target_user_id, target_key_id) DO UPDATE SET signature = $5" const deleteCrossSigningSigsForTargetSQL = "" + "DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2" diff --git a/keyserver/storage/postgres/deltas/2022042612000000_xsigning_idx.go b/keyserver/storage/postgres/deltas/2022042612000000_xsigning_idx.go new file mode 100644 index 000000000..12956e3b4 --- /dev/null +++ b/keyserver/storage/postgres/deltas/2022042612000000_xsigning_idx.go @@ -0,0 +1,52 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) { + m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes) +} + +func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey; + ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id); + + CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey; + ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, target_user_id, target_key_id); + + DROP INDEX IF EXISTS keyserver_cross_signing_sigs_idx; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/keyserver/storage/postgres/storage.go b/keyserver/storage/postgres/storage.go index 136986885..d4c7e2cc7 100644 --- a/keyserver/storage/postgres/storage.go +++ b/keyserver/storage/postgres/storage.go @@ -54,6 +54,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) } m := sqlutil.NewMigrations() deltas.LoadRefactorKeyChanges(m) + deltas.LoadFixCrossSigningSignatureIndexes(m) if err = m.RunDeltas(db, dbProperties); err != nil { return nil, err } diff --git a/keyserver/storage/sqlite3/cross_signing_sigs_table.go b/keyserver/storage/sqlite3/cross_signing_sigs_table.go index 29ee889fd..36d562b8a 100644 --- a/keyserver/storage/sqlite3/cross_signing_sigs_table.go +++ b/keyserver/storage/sqlite3/cross_signing_sigs_table.go @@ -33,8 +33,10 @@ CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs ( target_user_id TEXT NOT NULL, target_key_id TEXT NOT NULL, signature TEXT NOT NULL, - PRIMARY KEY (origin_user_id, target_user_id, target_key_id) + PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id) ); + +CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); ` const selectCrossSigningSigsForTargetSQL = "" + diff --git a/keyserver/storage/sqlite3/deltas/2022042612000000_xsigning_idx.go b/keyserver/storage/sqlite3/deltas/2022042612000000_xsigning_idx.go new file mode 100644 index 000000000..230e39fef --- /dev/null +++ b/keyserver/storage/sqlite3/deltas/2022042612000000_xsigning_idx.go @@ -0,0 +1,76 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) { + m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes) +} + +func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp ( + origin_user_id TEXT NOT NULL, + origin_key_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + target_key_id TEXT NOT NULL, + signature TEXT NOT NULL, + PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id) + ); + + INSERT INTO keyserver_cross_signing_sigs_tmp (origin_user_id, origin_key_id, target_user_id, target_key_id, signature) + SELECT origin_user_id, origin_key_id, target_user_id, target_key_id, signature FROM keyserver_cross_signing_sigs; + + DROP TABLE keyserver_cross_signing_sigs; + ALTER TABLE keyserver_cross_signing_sigs_tmp RENAME TO keyserver_cross_signing_sigs; + + CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp ( + origin_user_id TEXT NOT NULL, + origin_key_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + target_key_id TEXT NOT NULL, + signature TEXT NOT NULL, + PRIMARY KEY (origin_user_id, target_user_id, target_key_id) + ); + + INSERT INTO keyserver_cross_signing_sigs_tmp (origin_user_id, origin_key_id, target_user_id, target_key_id, signature) + SELECT origin_user_id, origin_key_id, target_user_id, target_key_id, signature FROM keyserver_cross_signing_sigs; + + DROP TABLE keyserver_cross_signing_sigs; + ALTER TABLE keyserver_cross_signing_sigs_tmp RENAME TO keyserver_cross_signing_sigs; + + DELETE INDEX IF EXISTS keyserver_cross_signing_sigs_idx; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/keyserver/storage/sqlite3/storage.go b/keyserver/storage/sqlite3/storage.go index 0e0adceef..84d4cdf55 100644 --- a/keyserver/storage/sqlite3/storage.go +++ b/keyserver/storage/sqlite3/storage.go @@ -53,6 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) m := sqlutil.NewMigrations() deltas.LoadRefactorKeyChanges(m) + deltas.LoadFixCrossSigningSignatureIndexes(m) if err = m.RunDeltas(db, dbProperties); err != nil { return nil, err } diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index b198b2292..6bcca48f4 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -88,6 +88,11 @@ func (s *PresenceConsumer) Start() error { } return } + if presence == nil { + presence = &types.PresenceInternal{ + UserID: userID, + } + } deviceRes := api.QueryDevicesResponse{} if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil { @@ -106,7 +111,9 @@ func (s *PresenceConsumer) Start() error { m.Header.Set(jetstream.UserID, presence.UserID) m.Header.Set("presence", presence.ClientFields.Presence) - m.Header.Set("status_msg", *presence.ClientFields.StatusMsg) + if presence.ClientFields.StatusMsg != nil { + m.Header.Set("status_msg", *presence.ClientFields.StatusMsg) + } m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS))) if err = msg.RespondMsg(m); err != nil { diff --git a/syncapi/routing/filter.go b/syncapi/routing/filter.go index baa4d841c..1a10bd649 100644 --- a/syncapi/routing/filter.go +++ b/syncapi/routing/filter.go @@ -44,8 +44,8 @@ func GetFilter( return jsonerror.InternalServerError() } - filter, err := syncDB.GetFilter(req.Context(), localpart, filterID) - if err != nil { + filter := gomatrixserverlib.DefaultFilter() + if err := syncDB.GetFilter(req.Context(), &filter, localpart, filterID); err != nil { //TODO better error handling. This error message is *probably* right, // but if there are obscure db errors, this will also be returned, // even though it is not correct. diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 0fea88da6..43aaa3588 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -81,7 +81,7 @@ type Database interface { // Returns a map following the format data[roomID] = []dataTypes // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error - GetAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) + GetAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, types.StreamPosition, error) // UpsertAccountData keeps track of new or updated account data, by saving the type // of the new/updated data, and the user ID and room ID the data is related to (empty) // room ID means the data isn't specific to any room) @@ -125,10 +125,10 @@ type Database interface { // CleanSendToDeviceUpdates removes all send-to-device messages BEFORE the specified // from position, preventing the send-to-device table from growing indefinitely. CleanSendToDeviceUpdates(ctx context.Context, userID, deviceID string, before types.StreamPosition) (err error) - // GetFilter looks up the filter associated with a given local user and filter ID. - // Returns a filter structure. Otherwise returns an error if no such filter exists + // GetFilter looks up the filter associated with a given local user and filter ID + // and populates the target filter. Otherwise returns an error if no such filter exists // or if there was an error talking to the database. - GetFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error) + GetFilter(ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string) error // PutFilter puts the passed filter into the database. // Returns the filterID as a string. Otherwise returns an error if something // goes wrong. diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go index 25bdb1da3..ec1919fca 100644 --- a/syncapi/storage/postgres/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -57,7 +57,7 @@ const insertAccountDataSQL = "" + " RETURNING id" const selectAccountDataInRangeSQL = "" + - "SELECT room_id, type FROM syncapi_account_data_type" + + "SELECT id, room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + @@ -103,8 +103,9 @@ func (s *accountDataStatements) SelectAccountDataInRange( userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter, -) (data map[string][]string, err error) { +) (data map[string][]string, pos types.StreamPosition, err error) { data = make(map[string][]string) + pos = r.Low() rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(), pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)), @@ -116,11 +117,12 @@ func (s *accountDataStatements) SelectAccountDataInRange( } defer internal.CloseAndLogIfError(ctx, rows, "selectAccountDataInRange: rows.close() failed") - for rows.Next() { - var dataType string - var roomID string + var dataType string + var roomID string + var id types.StreamPosition - if err = rows.Scan(&roomID, &dataType); err != nil { + for rows.Next() { + if err = rows.Scan(&id, &roomID, &dataType); err != nil { return } @@ -129,8 +131,11 @@ func (s *accountDataStatements) SelectAccountDataInRange( } else { data[roomID] = []string{dataType} } + if id > pos { + pos = id + } } - return data, rows.Err() + return data, pos, rows.Err() } func (s *accountDataStatements) SelectMaxAccountDataID( diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go index dfd3d6963..c82ef092f 100644 --- a/syncapi/storage/postgres/filter_table.go +++ b/syncapi/storage/postgres/filter_table.go @@ -73,21 +73,20 @@ func NewPostgresFilterTable(db *sql.DB) (tables.Filter, error) { } func (s *filterStatements) SelectFilter( - ctx context.Context, localpart string, filterID string, -) (*gomatrixserverlib.Filter, error) { + ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string, +) error { // Retrieve filter from database (stored as canonical JSON) var filterData []byte err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) if err != nil { - return nil, err + return err } // Unmarshal JSON into Filter struct - filter := gomatrixserverlib.DefaultFilter() - if err = json.Unmarshal(filterData, &filter); err != nil { - return nil, err + if err = json.Unmarshal(filterData, &target); err != nil { + return err } - return &filter, nil + return nil } func (s *filterStatements) InsertFilter( diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go index 49336c4eb..9f1e37f79 100644 --- a/syncapi/storage/postgres/presence_table.go +++ b/syncapi/storage/postgres/presence_table.go @@ -127,6 +127,9 @@ func (p *presenceStatements) GetPresenceForUser( } stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) + if err == sql.ErrNoRows { + return nil, nil + } result.ClientFields.Presence = result.Presence.String() return result, err } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 3c431db48..25aca50ae 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -265,7 +265,7 @@ func (d *Database) DeletePeeks( func (d *Database) GetAccountDataInRange( ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, -) (map[string][]string, error) { +) (map[string][]string, types.StreamPosition, error) { return d.AccountData.SelectAccountDataInRange(ctx, userID, r, accountDataFilterPart) } @@ -513,9 +513,9 @@ func (d *Database) StreamToTopologicalPosition( } func (d *Database) GetFilter( - ctx context.Context, localpart string, filterID string, -) (*gomatrixserverlib.Filter, error) { - return d.Filter.SelectFilter(ctx, localpart, filterID) + ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string, +) error { + return d.Filter.SelectFilter(ctx, target, localpart, filterID) } func (d *Database) PutFilter( diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 71a098177..2c7272ea8 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -43,7 +43,7 @@ const insertAccountDataSQL = "" + // further parameters are added by prepareWithFilters const selectAccountDataInRangeSQL = "" + - "SELECT room_id, type FROM syncapi_account_data_type" + + "SELECT id, room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" const selectMaxAccountDataIDSQL = "" + @@ -95,7 +95,8 @@ func (s *accountDataStatements) SelectAccountDataInRange( userID string, r types.Range, filter *gomatrixserverlib.EventFilter, -) (data map[string][]string, err error) { +) (data map[string][]string, pos types.StreamPosition, err error) { + pos = r.Low() data = make(map[string][]string) stmt, params, err := prepareWithFilters( s.db, nil, selectAccountDataInRangeSQL, @@ -112,11 +113,12 @@ func (s *accountDataStatements) SelectAccountDataInRange( } defer internal.CloseAndLogIfError(ctx, rows, "selectAccountDataInRange: rows.close() failed") - for rows.Next() { - var dataType string - var roomID string + var dataType string + var roomID string + var id types.StreamPosition - if err = rows.Scan(&roomID, &dataType); err != nil { + for rows.Next() { + if err = rows.Scan(&id, &roomID, &dataType); err != nil { return } @@ -125,9 +127,12 @@ func (s *accountDataStatements) SelectAccountDataInRange( } else { data[roomID] = []string{dataType} } + if id > pos { + pos = id + } } - return data, nil + return data, pos, nil } func (s *accountDataStatements) SelectMaxAccountDataID( diff --git a/syncapi/storage/sqlite3/filter_table.go b/syncapi/storage/sqlite3/filter_table.go index 0cfebef2a..6081a48b1 100644 --- a/syncapi/storage/sqlite3/filter_table.go +++ b/syncapi/storage/sqlite3/filter_table.go @@ -77,21 +77,20 @@ func NewSqliteFilterTable(db *sql.DB) (tables.Filter, error) { } func (s *filterStatements) SelectFilter( - ctx context.Context, localpart string, filterID string, -) (*gomatrixserverlib.Filter, error) { + ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string, +) error { // Retrieve filter from database (stored as canonical JSON) var filterData []byte err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) if err != nil { - return nil, err + return err } // Unmarshal JSON into Filter struct - filter := gomatrixserverlib.DefaultFilter() - if err = json.Unmarshal(filterData, &filter); err != nil { - return nil, err + if err = json.Unmarshal(filterData, &target); err != nil { + return err } - return &filter, nil + return nil } func (s *filterStatements) InsertFilter( diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go index 00b16458d..177a01bf3 100644 --- a/syncapi/storage/sqlite3/presence_table.go +++ b/syncapi/storage/sqlite3/presence_table.go @@ -142,6 +142,9 @@ func (p *presenceStatements) GetPresenceForUser( } stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) + if err == sql.ErrNoRows { + return nil, nil + } result.ClientFields.Presence = result.Presence.String() return result, err } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ac713dd5c..4ff4689ed 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -27,7 +27,7 @@ import ( type AccountData interface { InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error) // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. - SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) + SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, pos types.StreamPosition, err error) SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error) } @@ -157,7 +157,7 @@ type SendToDevice interface { } type Filter interface { - SelectFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error) + SelectFilter(ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string) error InsertFilter(ctx context.Context, filter *gomatrixserverlib.Filter, localpart string) (filterID string, err error) } diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 094c51485..2cddbcf04 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -43,7 +43,7 @@ func (p *AccountDataStreamProvider) IncrementalSync( To: to, } - dataTypes, err := p.DB.GetAccountDataInRange( + dataTypes, pos, err := p.DB.GetAccountDataInRange( ctx, req.Device.UserID, r, &req.Filter.AccountData, ) if err != nil { @@ -53,6 +53,12 @@ func (p *AccountDataStreamProvider) IncrementalSync( // Iterate over the rooms for roomID, dataTypes := range dataTypes { + // For a complete sync, make sure we're only including this room if + // that room was present in the joined rooms. + if from == 0 && roomID != "" && !req.IsRoomPresent(roomID) { + continue + } + // Request the missing data from the database for _, dataType := range dataTypes { dataReq := userapi.QueryAccountDataRequest{ @@ -95,5 +101,5 @@ func (p *AccountDataStreamProvider) IncrementalSync( } } - return to + return pos } diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 9a6c5c130..614b88d48 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -16,7 +16,6 @@ package streams import ( "context" - "database/sql" "encoding/json" "sync" @@ -80,11 +79,10 @@ func (p *PresenceStreamProvider) IncrementalSync( if _, ok := presences[roomUsers[i]]; ok { continue } + // Bear in mind that this might return nil, but at least populating + // a nil means that there's a map entry so we won't repeat this call. presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i]) if err != nil { - if err == sql.ErrNoRows { - continue - } req.Log.WithError(err).Error("unable to query presence for user") return from } @@ -93,8 +91,10 @@ func (p *PresenceStreamProvider) IncrementalSync( } lastPos := to - for i := range presences { - presence := presences[i] + for _, presence := range presences { + if presence == nil { + continue + } // Ignore users we don't share a room with if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) { continue diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index 9d7d479a2..f4e84c7d0 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -62,6 +62,12 @@ func (p *ReceiptStreamProvider) IncrementalSync( } for roomID, receipts := range receiptsByRoom { + // For a complete sync, make sure we're only including this room if + // that room was present in the joined rooms. + if from == 0 && !req.IsRoomPresent(roomID) { + continue + } + jr := *types.NewJoinResponse() if existing, ok := req.Response.Rooms.Join[roomID]; ok { jr = existing diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index f04f172d3..9d4740e93 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -18,6 +18,7 @@ import ( "database/sql" "encoding/json" "fmt" + "math" "net/http" "strconv" "time" @@ -47,6 +48,13 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } // TODO: read from stored filters too filter := gomatrixserverlib.DefaultFilter() + if since.IsEmpty() { + // Send as much account data down for complete syncs as possible + // by default, otherwise clients do weird things while waiting + // for the rest of the data to trickle down. + filter.AccountData.Limit = math.MaxInt32 + filter.Room.AccountData.Limit = math.MaxInt32 + } filterQuery := req.URL.Query().Get("filter") if filterQuery != "" { if filterQuery[0] == '{' { @@ -61,11 +69,9 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") return nil, fmt.Errorf("gomatrixserverlib.SplitID: %w", err) } - if f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery); err != nil && err != sql.ErrNoRows { + if err := syncDB.GetFilter(req.Context(), &filter, localpart, filterQuery); err != nil && err != sql.ErrNoRows { util.GetLogger(req.Context()).WithError(err).Error("syncDB.GetFilter failed") return nil, fmt.Errorf("syncDB.GetFilter: %w", err) - } else if f != nil { - filter = *f } } } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 703340997..76d550a65 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -127,14 +127,23 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user if !ok { // this should almost never happen return } + newPresence := types.PresenceInternal{ - ClientFields: types.PresenceClientResponse{ - Presence: presenceID.String(), - }, Presence: presenceID, UserID: userID, LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()), } + + // ensure we also send the current status_msg to federated servers and not nil + dbPresence, err := db.GetPresence(context.Background(), userID) + if err != nil && err != sql.ErrNoRows { + return + } + if dbPresence != nil { + newPresence.ClientFields = dbPresence.ClientFields + } + newPresence.ClientFields.Presence = presenceID.String() + defer rp.presence.Store(userID, newPresence) // avoid spamming presence updates when syncing existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence) @@ -145,13 +154,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user } } - // ensure we also send the current status_msg to federated servers and not nil - dbPresence, err := db.GetPresence(context.Background(), userID) - if err != nil && err != sql.ErrNoRows { - return - } - - if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil { + if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil { logrus.WithError(err).Error("Unable to publish presence message from sync") return } diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index e6777f643..a9ea234d0 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -25,6 +25,23 @@ type SyncRequest struct { IgnoredUsers IgnoredUsers } +func (r *SyncRequest) IsRoomPresent(roomID string) bool { + membership, ok := r.Rooms[roomID] + if !ok { + return false + } + switch membership { + case gomatrixserverlib.Join: + return true + case gomatrixserverlib.Invite: + return true + case gomatrixserverlib.Peek: + return true + default: + return false + } +} + type StreamProvider interface { Setup() diff --git a/sytest-whitelist b/sytest-whitelist index c9829606f..6af8d89ff 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -681,8 +681,6 @@ GET /presence/:user_id/status fetches initial status PUT /presence/:user_id/status updates my presence Presence change reports an event to myself Existing members see new members' presence -#Existing members see new member's presence -Newly joined room includes presence in incremental sync Get presence for newly joined members in incremental sync User sees their own presence in a sync User sees updates to presence from other users in the incremental sync.