diff --git a/clientapi/auth/user_interactive.go b/clientapi/auth/user_interactive.go index a6ee47454..9dad49a39 100644 --- a/clientapi/auth/user_interactive.go +++ b/clientapi/auth/user_interactive.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "net/http" + "sync" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/mapsutil" @@ -103,6 +104,7 @@ type userInteractiveFlow struct { // the user already has a valid access token, but we want to double-check // that it isn't stolen by re-authenticating them. type UserInteractive struct { + sync.RWMutex Flows []userInteractiveFlow // Map of login type to implementation Types map[string]Type @@ -144,6 +146,8 @@ func NewUserInteractive( } func (u *UserInteractive) IsSingleStageFlow(authType string) bool { + u.RLock() + defer u.RUnlock() for _, f := range u.Flows { if len(f.Stages) == 1 && f.Stages[0] == authType { return true @@ -153,12 +157,16 @@ func (u *UserInteractive) IsSingleStageFlow(authType string) bool { } func (u *UserInteractive) AddCompletedStage(sessionID, authType string) { + u.Lock() // TODO: Handle multi-stage flows delete(u.Sessions, sessionID) + u.Unlock() } func (u *UserInteractive) DeleteSession(sessionID string) { + u.Lock() delete(u.Sessions, sessionID) + u.Unlock() } type Challenge struct { @@ -170,8 +178,13 @@ type Challenge struct { } // Challenge returns an HTTP 401 with the supported flows for authenticating -func (u *UserInteractive) Challenge(sessionID string) *util.JSONResponse { +func (u *UserInteractive) challenge(sessionID string) *util.JSONResponse { + u.RLock() paramsCopy := mapsutil.MapCopy(u.Params) + completed := u.Sessions[sessionID] + flows := u.Flows + u.RUnlock() + for key, element := range paramsCopy { p := GetAuthParams(element) if p != nil { @@ -184,8 +197,8 @@ func (u *UserInteractive) Challenge(sessionID string) *util.JSONResponse { return &util.JSONResponse{ Code: 401, JSON: Challenge{ - Completed: u.Sessions[sessionID], - Flows: u.Flows, + Completed: completed, + Flows: flows, Session: sessionID, Params: paramsCopy, }, @@ -200,8 +213,10 @@ func (u *UserInteractive) NewSession() *util.JSONResponse { res := jsonerror.InternalServerError() return &res } + u.Lock() u.Sessions[sessionID] = []string{} - return u.Challenge(sessionID) + u.Unlock() + return u.challenge(sessionID) } // ResponseWithChallenge mixes together a JSON body (e.g an error with errcode/message) with the @@ -214,7 +229,7 @@ func (u *UserInteractive) ResponseWithChallenge(sessionID string, response inter return &ise } _ = json.Unmarshal(b, &mixedObjects) - challenge := u.Challenge(sessionID) + challenge := u.challenge(sessionID) b, err = json.Marshal(challenge.JSON) if err != nil { ise := jsonerror.InternalServerError() @@ -243,7 +258,11 @@ func (u *UserInteractive) Verify(ctx context.Context, bodyBytes []byte, device * // extract the type so we know which login type to use authType := gjson.GetBytes(bodyBytes, "auth.type").Str + + u.RLock() loginType, ok := u.Types[authType] + u.RUnlock() + if !ok { return nil, &util.JSONResponse{ Code: http.StatusBadRequest, @@ -253,7 +272,12 @@ func (u *UserInteractive) Verify(ctx context.Context, bodyBytes []byte, device * // retrieve the session sessionID := gjson.GetBytes(bodyBytes, "auth.session").Str - if _, ok = u.Sessions[sessionID]; !ok { + + u.RLock() + _, ok = u.Sessions[sessionID] + u.RUnlock() + + if !ok { // if the login type is part of a single stage flow then allow them to omit the session ID if !u.IsSingleStageFlow(authType) { return nil, &util.JSONResponse{ diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 97bcc12a5..ff01b1952 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -15,6 +15,8 @@ package federationapi import ( + "time" + "github.com/gorilla/mux" "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" @@ -167,5 +169,16 @@ func NewInternalAPI( if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start presence consumer") } + + var cleanExpiredEDUs func() + cleanExpiredEDUs = func() { + logrus.Infof("Cleaning expired EDUs") + if err := federationDB.DeleteExpiredEDUs(base.Context()); err != nil { + logrus.WithError(err).Error("Failed to clean expired EDUs") + } + time.AfterFunc(time.Hour, cleanExpiredEDUs) + } + time.AfterFunc(time.Minute, cleanExpiredEDUs) + return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues, keyRing) } diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index b6edec5da..0d937ffaf 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -127,6 +127,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share oq.destination, // the destination server name receipt, // NIDs from federationapi_queue_json table event.Type, + nil, // this will use the default expireEDUTypes map ); err != nil { logrus.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination) return diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go index 29254948b..b8109b432 100644 --- a/federationapi/storage/interface.go +++ b/federationapi/storage/interface.go @@ -16,6 +16,7 @@ package storage import ( "context" + "time" "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/federationapi/types" @@ -38,7 +39,7 @@ type Database interface { GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error) AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error - AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt, eduType string) error + AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt, eduType string, expireEDUTypes map[string]time.Duration) error CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error @@ -70,4 +71,6 @@ type Database interface { // Query the notary for the server keys for the given server. If `optKeyIDs` is not empty, multiple server keys may be returned (between 1 - len(optKeyIDs)) // such that the combination of all server keys will include all the `optKeyIDs`. GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error) + // DeleteExpiredEDUs cleans up expired EDUs + DeleteExpiredEDUs(ctx context.Context) error } diff --git a/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go b/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go new file mode 100644 index 000000000..53a7a025e --- /dev/null +++ b/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go @@ -0,0 +1,44 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus ADD COLUMN IF NOT EXISTS expires_at BIGINT NOT NULL DEFAULT 0;") + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) + if err != nil { + return fmt.Errorf("failed to update queue_edus: %w", err) + } + return nil +} + +func DownAddexpiresat(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;") + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/federationapi/storage/postgres/queue_edus_table.go b/federationapi/storage/postgres/queue_edus_table.go index 1fedf0ef1..d6507e13b 100644 --- a/federationapi/storage/postgres/queue_edus_table.go +++ b/federationapi/storage/postgres/queue_edus_table.go @@ -19,9 +19,11 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/federationapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/gomatrixserverlib" ) const queueEDUsSchema = ` @@ -31,7 +33,9 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( -- The domain part of the user ID the EDU event is for. server_name TEXT NOT NULL, -- The JSON NID from the federationsender_queue_edus_json table. - json_nid BIGINT NOT NULL + json_nid BIGINT NOT NULL, + -- The expiry time of this edu, if any. + expires_at BIGINT NOT NULL DEFAULT 0 ); CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx @@ -43,8 +47,8 @@ CREATE INDEX IF NOT EXISTS federationsender_queue_edus_server_name_idx ` const insertQueueEDUSQL = "" + - "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid)" + - " VALUES ($1, $2, $3)" + "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid, expires_at)" + + " VALUES ($1, $2, $3, $4)" const deleteQueueEDUSQL = "" + "DELETE FROM federationsender_queue_edus WHERE server_name = $1 AND json_nid = ANY($2)" @@ -65,6 +69,12 @@ const selectQueueEDUCountSQL = "" + const selectQueueServerNamesSQL = "" + "SELECT DISTINCT server_name FROM federationsender_queue_edus" +const selectExpiredEDUsSQL = "" + + "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" + +const deleteExpiredEDUsSQL = "" + + "DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" + type queueEDUsStatements struct { db *sql.DB insertQueueEDUStmt *sql.Stmt @@ -73,6 +83,8 @@ type queueEDUsStatements struct { selectQueueEDUReferenceJSONCountStmt *sql.Stmt selectQueueEDUCountStmt *sql.Stmt selectQueueEDUServerNamesStmt *sql.Stmt + selectExpiredEDUsStmt *sql.Stmt + deleteExpiredEDUsStmt *sql.Stmt } func NewPostgresQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { @@ -81,27 +93,34 @@ func NewPostgresQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { } _, err = s.db.Exec(queueEDUsSchema) if err != nil { - return + return s, err } - if s.insertQueueEDUStmt, err = s.db.Prepare(insertQueueEDUSQL); err != nil { - return + + m := sqlutil.NewMigrator(db) + m.AddMigrations( + sqlutil.Migration{ + Version: "federationapi: add expiresat column", + Up: deltas.UpAddexpiresat, + }, + ) + if err := m.Up(context.Background()); err != nil { + return s, err } - if s.deleteQueueEDUStmt, err = s.db.Prepare(deleteQueueEDUSQL); err != nil { - return - } - if s.selectQueueEDUStmt, err = s.db.Prepare(selectQueueEDUSQL); err != nil { - return - } - if s.selectQueueEDUReferenceJSONCountStmt, err = s.db.Prepare(selectQueueEDUReferenceJSONCountSQL); err != nil { - return - } - if s.selectQueueEDUCountStmt, err = s.db.Prepare(selectQueueEDUCountSQL); err != nil { - return - } - if s.selectQueueEDUServerNamesStmt, err = s.db.Prepare(selectQueueServerNamesSQL); err != nil { - return - } - return + + return s, nil +} + +func (s *queueEDUsStatements) Prepare() error { + return sqlutil.StatementList{ + {&s.insertQueueEDUStmt, insertQueueEDUSQL}, + {&s.deleteQueueEDUStmt, deleteQueueEDUSQL}, + {&s.selectQueueEDUStmt, selectQueueEDUSQL}, + {&s.selectQueueEDUReferenceJSONCountStmt, selectQueueEDUReferenceJSONCountSQL}, + {&s.selectQueueEDUCountStmt, selectQueueEDUCountSQL}, + {&s.selectQueueEDUServerNamesStmt, selectQueueServerNamesSQL}, + {&s.selectExpiredEDUsStmt, selectExpiredEDUsSQL}, + {&s.deleteExpiredEDUsStmt, deleteExpiredEDUsSQL}, + }.Prepare(s.db) } func (s *queueEDUsStatements) InsertQueueEDU( @@ -110,6 +129,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType string, serverName gomatrixserverlib.ServerName, nid int64, + expiresAt gomatrixserverlib.Timestamp, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext( @@ -117,6 +137,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType, // the EDU type serverName, // destination server name nid, // JSON blob NID + expiresAt, // timestamp of expiry ) return err } @@ -150,7 +171,7 @@ func (s *queueEDUsStatements) SelectQueueEDUs( } result = append(result, nid) } - return result, nil + return result, rows.Err() } func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( @@ -200,3 +221,33 @@ func (s *queueEDUsStatements) SelectQueueEDUServerNames( return result, rows.Err() } + +func (s *queueEDUsStatements) SelectExpiredEDUs( + ctx context.Context, txn *sql.Tx, + expiredBefore gomatrixserverlib.Timestamp, +) ([]int64, error) { + stmt := sqlutil.TxStmt(txn, s.selectExpiredEDUsStmt) + rows, err := stmt.QueryContext(ctx, expiredBefore) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectExpiredEDUs: rows.close() failed") + var result []int64 + var nid int64 + for rows.Next() { + if err = rows.Scan(&nid); err != nil { + return nil, err + } + result = append(result, nid) + } + return result, rows.Err() +} + +func (s *queueEDUsStatements) DeleteExpiredEDUs( + ctx context.Context, txn *sql.Tx, + expiredBefore gomatrixserverlib.Timestamp, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteExpiredEDUsStmt) + _, err := stmt.ExecContext(ctx, expiredBefore) + return err +} diff --git a/federationapi/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go index 7c2883c1b..6e208d096 100644 --- a/federationapi/storage/postgres/storage.go +++ b/federationapi/storage/postgres/storage.go @@ -91,6 +91,9 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, if err != nil { return nil, err } + if err = queueEDUs.Prepare(); err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, ServerName: serverName, diff --git a/federationapi/storage/shared/storage_edus.go b/federationapi/storage/shared/storage_edus.go index 02a23338f..b62e5d9c5 100644 --- a/federationapi/storage/shared/storage_edus.go +++ b/federationapi/storage/shared/storage_edus.go @@ -20,10 +20,21 @@ import ( "encoding/json" "errors" "fmt" + "time" "github.com/matrix-org/gomatrixserverlib" ) +// defaultExpiry for EDUs if not listed below +var defaultExpiry = time.Hour * 24 + +// defaultExpireEDUTypes contains EDUs which can/should be expired after a given time +// if the target server isn't reachable for some reason. +var defaultExpireEDUTypes = map[string]time.Duration{ + gomatrixserverlib.MTyping: time.Minute, + gomatrixserverlib.MPresence: time.Minute * 10, +} + // AssociateEDUWithDestination creates an association that the // destination queues will use to determine which JSON blobs to send // to which servers. @@ -32,7 +43,21 @@ func (d *Database) AssociateEDUWithDestination( serverName gomatrixserverlib.ServerName, receipt *Receipt, eduType string, + expireEDUTypes map[string]time.Duration, ) error { + if expireEDUTypes == nil { + expireEDUTypes = defaultExpireEDUTypes + } + expiresAt := gomatrixserverlib.AsTimestamp(time.Now().Add(defaultExpiry)) + if duration, ok := expireEDUTypes[eduType]; ok { + // Keep EDUs for at least x minutes before deleting them + expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration)) + } + // We forcibly set m.direct_to_device events to 0, as we always want them + // to be delivered. (required for E2EE) + if eduType == gomatrixserverlib.MDirectToDevice { + expiresAt = 0 + } return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationQueueEDUs.InsertQueueEDU( ctx, // context @@ -40,6 +65,7 @@ func (d *Database) AssociateEDUWithDestination( eduType, // EDU type for coalescing serverName, // destination server name receipt.nid, // NID from the federationapi_queue_json table + expiresAt, // The timestamp this EDU will expire ); err != nil { return fmt.Errorf("InsertQueueEDU: %w", err) } @@ -150,3 +176,26 @@ func (d *Database) GetPendingEDUServerNames( ) ([]gomatrixserverlib.ServerName, error) { return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil) } + +// DeleteExpiredEDUs deletes expired EDUs +func (d *Database) DeleteExpiredEDUs(ctx context.Context) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + expiredBefore := gomatrixserverlib.AsTimestamp(time.Now()) + jsonNIDs, err := d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore) + if err != nil { + return err + } + if len(jsonNIDs) == 0 { + return nil + } + for i := range jsonNIDs { + d.Cache.EvictFederationQueuedEDU(jsonNIDs[i]) + } + + if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil { + return err + } + + return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore) + }) +} diff --git a/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go new file mode 100644 index 000000000..c5030163b --- /dev/null +++ b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go @@ -0,0 +1,68 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus RENAME TO federationsender_queue_edus_old;") + if err != nil { + return fmt.Errorf("failed to rename table: %w", err) + } + + _, err = tx.ExecContext(ctx, ` +CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( + edu_type TEXT NOT NULL, + server_name TEXT NOT NULL, + json_nid BIGINT NOT NULL, + expires_at BIGINT NOT NULL DEFAULT 0 +); + +CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx + ON federationsender_queue_edus (json_nid, server_name); +`) + if err != nil { + return fmt.Errorf("failed to create new table: %w", err) + } + _, err = tx.ExecContext(ctx, ` +INSERT + INTO federationsender_queue_edus ( + edu_type, server_name, json_nid, expires_at + ) SELECT edu_type, server_name, json_nid, 0 FROM federationsender_queue_edus_old; +`) + if err != nil { + return fmt.Errorf("failed to update queue_edus: %w", err) + } + _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) + if err != nil { + return fmt.Errorf("failed to update queue_edus: %w", err) + } + return nil +} + +func DownAddexpiresat(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;") + if err != nil { + return fmt.Errorf("failed to rename table: %w", err) + } + return nil +} diff --git a/federationapi/storage/sqlite3/queue_edus_table.go b/federationapi/storage/sqlite3/queue_edus_table.go index f4c84f094..8e7e7901f 100644 --- a/federationapi/storage/sqlite3/queue_edus_table.go +++ b/federationapi/storage/sqlite3/queue_edus_table.go @@ -20,9 +20,11 @@ import ( "fmt" "strings" + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/gomatrixserverlib" ) const queueEDUsSchema = ` @@ -32,7 +34,9 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( -- The domain part of the user ID the EDU event is for. server_name TEXT NOT NULL, -- The JSON NID from the federationsender_queue_edus_json table. - json_nid BIGINT NOT NULL + json_nid BIGINT NOT NULL, + -- The expiry time of this edu, if any. + expires_at BIGINT NOT NULL DEFAULT 0 ); CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx @@ -44,8 +48,8 @@ CREATE INDEX IF NOT EXISTS federationsender_queue_edus_server_name_idx ` const insertQueueEDUSQL = "" + - "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid)" + - " VALUES ($1, $2, $3)" + "INSERT INTO federationsender_queue_edus (edu_type, server_name, json_nid, expires_at)" + + " VALUES ($1, $2, $3, $4)" const deleteQueueEDUsSQL = "" + "DELETE FROM federationsender_queue_edus WHERE server_name = $1 AND json_nid IN ($2)" @@ -66,13 +70,22 @@ const selectQueueEDUCountSQL = "" + const selectQueueServerNamesSQL = "" + "SELECT DISTINCT server_name FROM federationsender_queue_edus" +const selectExpiredEDUsSQL = "" + + "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" + +const deleteExpiredEDUsSQL = "" + + "DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" + type queueEDUsStatements struct { - db *sql.DB - insertQueueEDUStmt *sql.Stmt + db *sql.DB + insertQueueEDUStmt *sql.Stmt + // deleteQueueEDUStmt *sql.Stmt - prepared at runtime due to variadic selectQueueEDUStmt *sql.Stmt selectQueueEDUReferenceJSONCountStmt *sql.Stmt selectQueueEDUCountStmt *sql.Stmt selectQueueEDUServerNamesStmt *sql.Stmt + selectExpiredEDUsStmt *sql.Stmt + deleteExpiredEDUsStmt *sql.Stmt } func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { @@ -81,24 +94,33 @@ func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { } _, err = db.Exec(queueEDUsSchema) if err != nil { - return + return s, err } - if s.insertQueueEDUStmt, err = db.Prepare(insertQueueEDUSQL); err != nil { - return + + m := sqlutil.NewMigrator(db) + m.AddMigrations( + sqlutil.Migration{ + Version: "federationapi: add expiresat column", + Up: deltas.UpAddexpiresat, + }, + ) + if err := m.Up(context.Background()); err != nil { + return s, err } - if s.selectQueueEDUStmt, err = db.Prepare(selectQueueEDUSQL); err != nil { - return - } - if s.selectQueueEDUReferenceJSONCountStmt, err = db.Prepare(selectQueueEDUReferenceJSONCountSQL); err != nil { - return - } - if s.selectQueueEDUCountStmt, err = db.Prepare(selectQueueEDUCountSQL); err != nil { - return - } - if s.selectQueueEDUServerNamesStmt, err = db.Prepare(selectQueueServerNamesSQL); err != nil { - return - } - return + + return s, nil +} + +func (s *queueEDUsStatements) Prepare() error { + return sqlutil.StatementList{ + {&s.insertQueueEDUStmt, insertQueueEDUSQL}, + {&s.selectQueueEDUStmt, selectQueueEDUSQL}, + {&s.selectQueueEDUReferenceJSONCountStmt, selectQueueEDUReferenceJSONCountSQL}, + {&s.selectQueueEDUCountStmt, selectQueueEDUCountSQL}, + {&s.selectQueueEDUServerNamesStmt, selectQueueServerNamesSQL}, + {&s.selectExpiredEDUsStmt, selectExpiredEDUsSQL}, + {&s.deleteExpiredEDUsStmt, deleteExpiredEDUsSQL}, + }.Prepare(s.db) } func (s *queueEDUsStatements) InsertQueueEDU( @@ -107,6 +129,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType string, serverName gomatrixserverlib.ServerName, nid int64, + expiresAt gomatrixserverlib.Timestamp, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext( @@ -114,6 +137,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType, // the EDU type serverName, // destination server name nid, // JSON blob NID + expiresAt, // timestamp of expiry ) return err } @@ -159,7 +183,7 @@ func (s *queueEDUsStatements) SelectQueueEDUs( } result = append(result, nid) } - return result, nil + return result, rows.Err() } func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( @@ -209,3 +233,33 @@ func (s *queueEDUsStatements) SelectQueueEDUServerNames( return result, rows.Err() } + +func (s *queueEDUsStatements) SelectExpiredEDUs( + ctx context.Context, txn *sql.Tx, + expiredBefore gomatrixserverlib.Timestamp, +) ([]int64, error) { + stmt := sqlutil.TxStmt(txn, s.selectExpiredEDUsStmt) + rows, err := stmt.QueryContext(ctx, expiredBefore) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectExpiredEDUs: rows.close() failed") + var result []int64 + var nid int64 + for rows.Next() { + if err = rows.Scan(&nid); err != nil { + return nil, err + } + result = append(result, nid) + } + return result, rows.Err() +} + +func (s *queueEDUsStatements) DeleteExpiredEDUs( + ctx context.Context, txn *sql.Tx, + expiredBefore gomatrixserverlib.Timestamp, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteExpiredEDUsStmt) + _, err := stmt.ExecContext(ctx, expiredBefore) + return err +} diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index 9594aaec6..c89cb6bea 100644 --- a/federationapi/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -90,6 +90,9 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, if err != nil { return nil, err } + if err = queueEDUs.Prepare(); err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, ServerName: serverName, diff --git a/federationapi/storage/storage_test.go b/federationapi/storage/storage_test.go new file mode 100644 index 000000000..7eba2cbee --- /dev/null +++ b/federationapi/storage/storage_test.go @@ -0,0 +1,81 @@ +package storage_test + +import ( + "context" + "testing" + "time" + + "github.com/matrix-org/gomatrixserverlib" + "github.com/stretchr/testify/assert" + + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/test" + "github.com/matrix-org/dendrite/test/testrig" +) + +func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { + b, baseClose := testrig.CreateBaseDendrite(t, dbType) + connStr, dbClose := test.PrepareDBConnectionString(t, dbType) + db, err := storage.NewDatabase(b, &config.DatabaseOptions{ + ConnectionString: config.DataSource(connStr), + }, b.Caches, b.Cfg.Global.ServerName) + if err != nil { + t.Fatalf("NewDatabase returned %s", err) + } + return db, func() { + dbClose() + baseClose() + } +} + +func TestExpireEDUs(t *testing.T) { + var expireEDUTypes = map[string]time.Duration{ + gomatrixserverlib.MReceipt: time.Millisecond, + } + + ctx := context.Background() + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, close := mustCreateFederationDatabase(t, dbType) + defer close() + // insert some data + for i := 0; i < 100; i++ { + receipt, err := db.StoreJSON(ctx, "{}") + assert.NoError(t, err) + + err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MReceipt, expireEDUTypes) + assert.NoError(t, err) + } + // add data without expiry + receipt, err := db.StoreJSON(ctx, "{}") + assert.NoError(t, err) + + // m.read_marker gets the default expiry of 24h, so won't be deleted further down in this test + err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, "m.read_marker", expireEDUTypes) + assert.NoError(t, err) + + // Delete expired EDUs + err = db.DeleteExpiredEDUs(ctx) + assert.NoError(t, err) + + // verify the data is gone + data, err := db.GetPendingEDUs(ctx, "localhost", 100) + assert.NoError(t, err) + assert.Equal(t, 1, len(data)) + + // check that m.direct_to_device is never expired + receipt, err = db.StoreJSON(ctx, "{}") + assert.NoError(t, err) + + err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MDirectToDevice, expireEDUTypes) + assert.NoError(t, err) + + err = db.DeleteExpiredEDUs(ctx) + assert.NoError(t, err) + + // We should get two EDUs, the m.read_marker and the m.direct_to_device + data, err = db.GetPendingEDUs(ctx, "localhost", 100) + assert.NoError(t, err) + assert.Equal(t, 2, len(data)) + }) +} diff --git a/federationapi/storage/tables/interface.go b/federationapi/storage/tables/interface.go index 19357393d..3c116a1d0 100644 --- a/federationapi/storage/tables/interface.go +++ b/federationapi/storage/tables/interface.go @@ -34,12 +34,15 @@ type FederationQueuePDUs interface { } type FederationQueueEDUs interface { - InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64) error + InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64, expiresAt gomatrixserverlib.Timestamp) error DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) SelectQueueEDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) SelectQueueEDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) + SelectExpiredEDUs(ctx context.Context, txn *sql.Tx, expiredBefore gomatrixserverlib.Timestamp) ([]int64, error) + DeleteExpiredEDUs(ctx context.Context, txn *sql.Tx, expiredBefore gomatrixserverlib.Timestamp) error + Prepare() error } type FederationQueueJSON interface { diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 6a18df506..31c6187cb 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -39,21 +39,13 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( return from } - if len(events) > 0 { - // Clean up old send-to-device messages from before this stream position. - if err := p.DB.CleanSendToDeviceUpdates(req.Context, req.Device.UserID, req.Device.ID, from); err != nil { - req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") - return from - } - - // Add the updates into the sync response. - for _, event := range events { - // skip ignored user events - if _, ok := req.IgnoredUsers.List[event.Sender]; ok { - continue - } - req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) + // Add the updates into the sync response. + for _, event := range events { + // skip ignored user events + if _, ok := req.IgnoredUsers.List[event.Sender]; ok { + continue } + req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) } return lastPos diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index b6b4779a5..d908a9629 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -25,6 +25,11 @@ import ( "sync" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/jsonerror" keyapi "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -35,10 +40,6 @@ import ( "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync @@ -251,6 +252,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() + // Clean up old send-to-device messages from before this stream position. + // This is needed to avoid sending the same message multiple times + if err = rp.db.CleanSendToDeviceUpdates(syncReq.Context, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Since.SendToDevicePosition); err != nil { + syncReq.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") + } + // loop until we get some data for { startTime := time.Now() diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 159fa08b6..39b085d9c 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -21,9 +21,10 @@ import ( "strconv" "strings" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/tidwall/gjson" + + "github.com/matrix-org/dendrite/roomserver/api" ) var ( @@ -330,23 +331,23 @@ type Response struct { NextBatch StreamingToken `json:"next_batch"` AccountData struct { Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"` - } `json:"account_data"` + } `json:"account_data,omitempty"` Presence struct { Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"` - } `json:"presence"` + } `json:"presence,omitempty"` Rooms struct { - Join map[string]JoinResponse `json:"join"` - Peek map[string]JoinResponse `json:"peek"` - Invite map[string]InviteResponse `json:"invite"` - Leave map[string]LeaveResponse `json:"leave"` - } `json:"rooms"` + Join map[string]JoinResponse `json:"join,omitempty"` + Peek map[string]JoinResponse `json:"peek,omitempty"` + Invite map[string]InviteResponse `json:"invite,omitempty"` + Leave map[string]LeaveResponse `json:"leave,omitempty"` + } `json:"rooms,omitempty"` ToDevice struct { - Events []gomatrixserverlib.SendToDeviceEvent `json:"events"` - } `json:"to_device"` + Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"` + } `json:"to_device,omitempty"` DeviceLists struct { Changed []string `json:"changed,omitempty"` Left []string `json:"left,omitempty"` - } `json:"device_lists"` + } `json:"device_lists,omitempty"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` }