diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 655d1d103..bdc894ed1 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -57,6 +57,12 @@ type FederationSenderInternalAPI interface { request *PerformJoinRequest, response *PerformJoinResponse, ) + // Handle an instruction to peek a room on a remote server. + PerformJoin( + ctx context.Context, + request *PerformPeekRequest, + response *PerformPeekResponse, + ) // Handle an instruction to make_leave & send_leave with a remote server. PerformLeave( ctx context.Context, @@ -105,6 +111,16 @@ type PerformJoinResponse struct { LastError *gomatrix.HTTPError } +type PerformPeekRequest struct { + RoomID string `json:"room_id"` + // The sorted list of servers to try. Servers will be tried sequentially, after de-duplication. + ServerNames types.ServerNames `json:"server_names"` +} + +type PerformPeekResponse struct { + LastError *gomatrix.HTTPError +} + type PerformLeaveRequest struct { RoomID string `json:"room_id"` UserID string `json:"user_id"` diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index da8d41a74..0a2ff824a 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -186,7 +186,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( // Check that the send_join response was valid. joinCtx := perform.JoinContext(r.federation, r.keyRing) if err = joinCtx.CheckSendJoinResponse( - ctx, event, serverName, respMakeJoin, respSendJoin, + ctx, event, serverName, respSendJoin, ); err != nil { return fmt.Errorf("joinCtx.CheckSendJoinResponse: %w", err) } @@ -206,6 +206,156 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( return nil } +// PerformPeekRequest implements api.FederationSenderInternalAPI +func (r *FederationSenderInternalAPI) PerformPeek( + ctx context.Context, + request *api.PerformPeekRequest, + response *api.PerformPeekResponse, +) (err error) { + // Look up the supported room versions. + var supportedVersions []gomatrixserverlib.RoomVersion + for version := range version.SupportedRoomVersions() { + supportedVersions = append(supportedVersions, version) + } + + // Deduplicate the server names we were provided but keep the ordering + // as this encodes useful information about which servers are most likely + // to respond. + seenSet := make(map[gomatrixserverlib.ServerName]bool) + var uniqueList []gomatrixserverlib.ServerName + for _, srv := range request.ServerNames { + if seenSet[srv] { + continue + } + seenSet[srv] = true + uniqueList = append(uniqueList, srv) + } + request.ServerNames = uniqueList + + // Try each server that we were provided until we land on one that + // successfully completes the peek + var lastErr error + for _, serverName := range request.ServerNames { + if err := r.performPeekUsingServer( + ctx, + request.RoomID, + serverName, + supportedVersions, + ); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "server_name": serverName, + "room_id": request.RoomID, + }).Warnf("Failed to peek room through server") + lastErr = err + continue + } + + // We're all good. + return + } + + // If we reach here then we didn't complete a peek for some reason. + var httpErr gomatrix.HTTPError + if ok := errors.As(lastErr, &httpErr); ok { + httpErr.Message = string(httpErr.Contents) + // Clear the wrapped error, else serialising to JSON (in polylith mode) will fail + httpErr.WrappedError = nil + response.LastError = &httpErr + } else { + response.LastError = &gomatrix.HTTPError{ + Code: 0, + WrappedError: nil, + Message: lastErr.Error(), + } + } + + logrus.Errorf( + "failed to peek room %q through %d server(s): last error %s", + request.RoomID, len(request.ServerNames), lastErr, + ) +} + +func (r *FederationSenderInternalAPI) performPeekUsingServer( + ctx context.Context, + roomID string, + serverName gomatrixserverlib.ServerName, + supportedVersions []gomatrixserverlib.RoomVersion, +) error { + // check whether we're peeking already to try to avoid needlessly + // re-peeking on the server. we don't need a transaction for this, + // given this is a nice-to-have. + remotePeek, err := r.db.GetRemotePeek(ctx, serverName, roomID) + if err != nil { + return err + } + renewing := false + if remotePeek != nil { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + if (nowMilli > remotePeek.RenewedTimestamp + remotePeek.RenewalInterval) { + logrus.Infof("stale remote peek to %s for %s already exists; renewing", serverName, roomID) + renewing = true + } + else { + logrus.Infof("live remote peek to %s for %s already exists", serverName, roomID) + return nil + } + } + + // create a unique ID for this peek. + // for now we just use the room ID again. In future, if we ever + // support concurrent peeks to the same room with different filters + // then we would need to disambiguate further. + peekID := roomID + + // Try to perform a /peek using the information supplied in the + // request. + respPeek, err := r.federation.Peek( + ctx, + serverName, + roomID, + peekID, + supportedVersions, + ) + if err != nil { + r.statistics.ForServer(serverName).Failure() + return fmt.Errorf("r.federation.MakePeek: %w", err) + } + r.statistics.ForServer(serverName).Success() + + // Work out if we support the room version that has been supplied in + // the make_peek response. + if respPeek.RoomVersion == "" { + respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1 + } + if _, err = respPeek.RoomVersion.EventFormat(); err != nil { + return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err) + } + + // If we've got this far, the remote server is peeking. + if renewing { + if err = r.db.RenewRemotePeek(ctx, serverName, roomID, respPeek.RenewalInterval); err != nil { + return err + } + } + else { + if err = r.db.AddRemotePeek(ctx, serverName, roomID, respPeek.RenewalInterval); err != nil { + return err + } + } + + respState := respPeek.ToRespState() + // Send the newly returned state to the roomserver to update our local view. + if err = roomserverAPI.SendEventWithState( + ctx, r.rsAPI, + &respState, + event.Headered(respPeek.RoomVersion), nil, + ); err != nil { + return fmt.Errorf("r.producer.SendEventWithState: %w", err) + } + + return nil +} + // PerformLeaveRequest implements api.FederationSenderInternalAPI func (r *FederationSenderInternalAPI) PerformLeave( ctx context.Context, diff --git a/federationsender/internal/perform/join.go b/federationsender/internal/perform/join.go index 9a505d15b..22840c90d 100644 --- a/federationsender/internal/perform/join.go +++ b/federationsender/internal/perform/join.go @@ -1,3 +1,17 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package perform import ( @@ -28,7 +42,6 @@ func (r joinContext) CheckSendJoinResponse( ctx context.Context, event gomatrixserverlib.Event, server gomatrixserverlib.ServerName, - respMakeJoin gomatrixserverlib.RespMakeJoin, respSendJoin gomatrixserverlib.RespSendJoin, ) error { // A list of events that we have retried, if they were not included in diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index 734b368fe..f95db13cc 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -53,4 +53,9 @@ type Database interface { AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) + + AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error + RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error + GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error) + GetRemotePeeks(roomID string) ([]types.RemotePeek, error) } diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 4e3472590..f2c4ab098 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -163,3 +163,23 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) { return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) } + +func (d *Database) AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.RemotePeeks.InsertRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval) + }) +} + +func (d *Database) RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.RemotePeeks.RenewRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval) + }) +} + +func (d *Database) GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error) { + return d.RemotePeeks.SelectRemotePeek(context.TODO(), serverName, roomID) +} + +func (d *Database) GetRemotePeeks(roomID string) ([]types.RemotePeek, error) { + return d.RemotePeeks.SelectRemotePeeks(context.TODO(), roomID) +} diff --git a/federationsender/storage/sqlite3/remote_peeks_table.go b/federationsender/storage/sqlite3/remote_peeks_table.go new file mode 100644 index 000000000..bcb952e45 --- /dev/null +++ b/federationsender/storage/sqlite3/remote_peeks_table.go @@ -0,0 +1,171 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const remotePeeksSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_remote_peeks ( + room_id TEXT NOT NULL, + server_name TEXT NOT NULL, + creation_ts INTEGER NOT NULL, + renewed_ts INTEGER NOT NULL, + renewal_interval INTEGER NOT NULL, + UNIQUE (room_id, server_name) +); +` + +const insertRemotePeekSQL = "" + + "INSERT INTO federationsender_remote_peeks (room_id, server_name, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5)" + +const selectRemotePeekSQL = "" + + "SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2" + +const selectRemotePeeksSQL = "" + + "SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1" + +const renewRemotePeekSQL = "" + + "UPDATE federationsender_remote_peeks SET renewed_ts=$3, renewal_interval=$4 WHERE room_id = $1 and server_name = $2" + +const deleteRemotePeekSQL = "" + + "DELETE FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2" + +const deleteRemotePeeksSQL = "" + + "DELETE FROM federationsender_remote_peeks WHERE room_id = $1" + +type remotePeeksStatements struct { + db *sql.DB + insertRemotePeekStmt *sql.Stmt + selectRemotePeekStmt *sql.Stmt + selectRemotePeeksStmt *sql.Stmt + renewRemotePeekStmt *sql.Stmt + deleteRemotePeekStmt *sql.Stmt + deleteRemotePeeksStmt *sql.Stmt +} + +func NewSQLiteRemotePeeksTable(db *sql.DB) (s *remotePeeksStatements, err error) { + s = &remotePeeksStatements{ + db: db, + } + _, err = db.Exec(remotePeeksSchema) + if err != nil { + return + } + + if s.insertRemotePeekStmt, err = db.Prepare(insertRemotePeekSQL); err != nil { + return + } + if s.selectRemotePeekStmt, err = db.Prepare(selectRemotePeekSQL); err != nil { + return + } + if s.selectRemotePeeksStmt, err = db.Prepare(selectRemotePeeksSQL); err != nil { + return + } + if s.renewRemotePeekStmt, err = db.Prepare(renewRemotePeekSQL); err != nil { + return + } + if s.deleteRemotePeeksStmt, err = db.Prepare(deleteRemotePeeksSQL); err != nil { + return + } + if s.deleteRemotePeekStmt, err = db.Prepare(deleteRemotePeekSQL); err != nil { + return + } + return +} + +func (s *remotePeeksStatements) InsertRemotePeek( + ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + stmt := sqlutil.TxStmt(txn, s.insertRemotePeekStmt) + _, err := stmt.ExecContext(ctx, roomID, serverName, nowMilli, nowMilli, renewalInterval) + return +} + +func (s *remotePeeksStatements) RenewRemotePeek( + ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + _, err := sqlutil.TxStmt(txn, s.renewRemotePeekStmt).ExecContext(ctx, roomID, serverName, nowMilli, renewalInterval) + return +} + + +func (s *remotePeeksStatements) SelectRemotePeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID string, +) (remotePeek types.RemotePeek, err error) { + rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectRemotePeek: rows.close() failed") + remotePeek := types.RemotePeek{} + if err = rows.Scan( + &remotePeek.RoomID, + &remotePeek.ServerName, + &remotePeek.CreationTimestamp, + &remotePeek.RenewTimestamp, + &remotePeek.RenewalInterval, + ); err != nil { + return + } + return remotePeek, rows.Err() +} + +func (s *remotePeeksStatements) SelectRemotePeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (remotePeeks []types.RemotePeek, err error) { + rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectRemotePeeks: rows.close() failed") + + for rows.Next() { + remotePeek := types.RemotePeek{} + if err = rows.Scan( + &remotePeek.RoomID, + &remotePeek.ServerName, + &remotePeek.CreationTimestamp, + &remotePeek.RenewTimestamp, + &remotePeek.RenewalInterval, + ); err != nil { + return + } + remotePeeks = append(remotePeeks, remotePeek) + } + + return remotePeeks, rows.Err() +} + +func (s *remotePeeksStatements) DeleteRemotePeek( + ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, +) (err error) { + _, err := sqlutil.TxStmt(txn, s.deleteRemotePeekStmt).ExecContext(ctx, roomID, serverName) + return +} + +func (s *remotePeeksStatements) DeleteRemotePeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (err error) { + _, err := sqlutil.TxStmt(txn, s.deleteRemotePeeksStmt).ExecContext(ctx, roomID) + return +} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index ba467f026..c0317b25d 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -65,6 +65,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if err != nil { return nil, err } + remotePeeks, err := NewSQLiteRemotePeeksTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, @@ -74,6 +78,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { FederationSenderQueueJSON: queueJSON, FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, + FederationSenderRemotePeeks: remotePeeks, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index c6f8a2d52..61c9208b4 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -67,3 +67,11 @@ type FederationSenderBlacklist interface { SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error } + +type FederationSenderRemotePeeks interface { + InsertRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error) + RenewRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error) + SelectRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (remotePeeks []types.RemotePeek, err error) + DeleteRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName) (err error) + DeleteRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) +} diff --git a/federationsender/types/types.go b/federationsender/types/types.go index 398d32677..e8825d778 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -49,3 +49,12 @@ func (e EventIDMismatchError) Error() string { e.DatabaseID, e.RoomServerID, ) } + +type RemotePeek { + PeekID string + RoomID string + ServerName gomatrixserverlib.ServerName + CreatedTimestamp UnixMs + RenewedTimestamp UnixMs + RenewalInterval UnixMs +} \ No newline at end of file diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go index 7a212f9af..8f61ba1eb 100644 --- a/roomserver/internal/perform/perform_peek.go +++ b/roomserver/internal/perform/perform_peek.go @@ -152,11 +152,27 @@ func (r *Peeker) performPeekRoomByID( } } - // If the server name in the room ID isn't ours then it's a - // possible candidate for finding the room via federation. Add - // it to the list of servers to try. + // handle federated peeks if domain != r.Cfg.Matrix.ServerName { + // If the server name in the room ID isn't ours then it's a + // possible candidate for finding the room via federation. Add + // it to the list of servers to try. req.ServerNames = append(req.ServerNames, domain) + + // Try peeking by all of the supplied server names. + fedReq := fsAPI.PerformPeekRequest{ + RoomID: req.RoomIDOrAlias, // the room ID to try and peek + ServerNames: req.ServerNames, // the servers to try peeking via + } + fedRes := fsAPI.PerformPeekResponse{} + r.FSAPI.PerformPeek(ctx, &fedReq, &fedRes) + if fedRes.LastError != nil { + return &api.PerformError{ + Code: api.PerformErrRemote, + Msg: fedRes.LastError.Message, + RemoteCode: fedRes.LastError.Code, + } + } } // If this room isn't world_readable, we reject.