diff --git a/federationapi/routing/peek.go b/federationapi/routing/peek.go index 423f84731..a6d0739c6 100644 --- a/federationapi/routing/peek.go +++ b/federationapi/routing/peek.go @@ -64,6 +64,9 @@ func Peek( // TODO: Check history visibility + // tell the peeking server to renew every hour + renewalInterval := int64(60 * 60 * 1000 * 1000) + var response api.PerformHandleRemotePeekResponse err := rsAPI.PerformHandleRemotePeek( httpReq.Context(), @@ -71,6 +74,7 @@ func Peek( RoomID: roomID, PeekID: peekID, ServerName: request.Origin(), + RenewalInterval: renewalInterval, }, &response, ) @@ -89,7 +93,7 @@ func Peek( StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), RoomVersion: response.RoomVersion, - RenewalInterval: 60 * 60 * 1000 * 1000, // one hour + RenewalInterval: renewalInterval, }, } } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 16cab6201..6215edb24 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -115,10 +115,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } -// processMessage updates the list of currently joined hosts in the room -// and then sends the event to the hosts that were joined before the event. +// processRemotePeek adds a new inbound peek (replacing the existing one if any) +// causing the federationsender to start sending messages to the peeking server func (s *OutputRoomEventConsumer) processRemotePeek(orp api.OutputNewRemotePeek) error { - return nil + return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) } // processMessage updates the list of currently joined hosts in the room @@ -164,11 +164,6 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err return err } - // TODO: track what hosts are peeking (federationsender_received_peeks) - // TODO: rename federationsender_remote_peeks as federationsender_sent_peeks - - // TODO: add peeking hosts to the joinedHosts list - // TODO: do housekeeping to evict unrenewed peeking hosts // TODO: implement query to let the fedapi check whether a given peek is live or not @@ -180,7 +175,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err } // joinedHostsAtEvent works out a list of matrix servers that were joined to -// the room at the event. +// the room at the event (including peeking ones) // It is important to use the state at the event for sending messages because: // 1) We shouldn't send messages to servers that weren't in the room. // 2) If a server is kicked from the rooms it should still be told about the @@ -231,6 +226,15 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( joined[joinedHost.ServerName] = true } + // handle peeking hosts + inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.RoomID()) + if err != nil { + return nil, err + } + for _, inboundPeek := range inboundPeeks { + joined[inboundPeek.ServerName] = true + } + var result []gomatrixserverlib.ServerName for serverName, include := range joined { if include { diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index a5bad5765..035c0d265 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -55,8 +55,13 @@ type Database interface { RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) - AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error - RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error + AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error + RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) + + AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error + RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error + GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) + GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) } diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 7f2dea015..021322d97 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -36,6 +36,7 @@ type Database struct { FederationSenderRooms tables.FederationSenderRooms FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks + FederationSenderInboundPeeks tables.FederationSenderInboundPeeks } // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. @@ -165,13 +166,13 @@ func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) } -func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error { +func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) }) } -func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error { +func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) }) @@ -184,3 +185,23 @@ func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserve func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) { return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID) } + +func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + }) +} + +func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + }) +} + +func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) { + return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID) +} + +func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) { + return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID) +} diff --git a/federationsender/storage/sqlite3/inbound_peeks_table.go b/federationsender/storage/sqlite3/inbound_peeks_table.go new file mode 100644 index 000000000..2f1269ac0 --- /dev/null +++ b/federationsender/storage/sqlite3/inbound_peeks_table.go @@ -0,0 +1,176 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const inboundPeeksSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks ( + room_id TEXT NOT NULL, + server_name TEXT NOT NULL, + peek_id TEXT NOT NULL, + creation_ts INTEGER NOT NULL, + renewed_ts INTEGER NOT NULL, + renewal_interval INTEGER NOT NULL, + UNIQUE (room_id, server_name, peek_id) +); +` + +const insertInboundPeekSQL = "" + + "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)" + +const selectInboundPeekSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3" + +const selectInboundPeeksSQL = "" + + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1" + +const renewInboundPeekSQL = "" + + "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5" + +const deleteInboundPeekSQL = "" + + "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2" + +const deleteInboundPeeksSQL = "" + + "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1" + +type inboundPeeksStatements struct { + db *sql.DB + insertInboundPeekStmt *sql.Stmt + selectInboundPeekStmt *sql.Stmt + selectInboundPeeksStmt *sql.Stmt + renewInboundPeekStmt *sql.Stmt + deleteInboundPeekStmt *sql.Stmt + deleteInboundPeeksStmt *sql.Stmt +} + +func NewSQLiteInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) { + s = &inboundPeeksStatements{ + db: db, + } + _, err = db.Exec(inboundPeeksSchema) + if err != nil { + return + } + + if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil { + return + } + if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil { + return + } + if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil { + return + } + if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil { + return + } + if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil { + return + } + if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil { + return + } + return +} + +func (s *inboundPeeksStatements) InsertInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt) + _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) + return +} + +func (s *inboundPeeksStatements) RenewInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, +) (err error) { + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) + return +} + +func (s *inboundPeeksStatements) SelectInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (*types.InboundPeek, error) { + row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID) + inboundPeek := types.InboundPeek{} + err := row.Scan( + &inboundPeek.RoomID, + &inboundPeek.ServerName, + &inboundPeek.PeekID, + &inboundPeek.CreationTimestamp, + &inboundPeek.RenewedTimestamp, + &inboundPeek.RenewalInterval, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &inboundPeek, nil +} + +func (s *inboundPeeksStatements) SelectInboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (inboundPeeks []types.InboundPeek, err error) { + rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed") + + for rows.Next() { + inboundPeek := types.InboundPeek{} + if err = rows.Scan( + &inboundPeek.RoomID, + &inboundPeek.ServerName, + &inboundPeek.PeekID, + &inboundPeek.CreationTimestamp, + &inboundPeek.RenewedTimestamp, + &inboundPeek.RenewalInterval, + ); err != nil { + return + } + inboundPeeks = append(inboundPeeks, inboundPeek) + } + + return inboundPeeks, rows.Err() +} + +func (s *inboundPeeksStatements) DeleteInboundPeek( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID) + return +} + +func (s *inboundPeeksStatements) DeleteInboundPeeks( + ctx context.Context, txn *sql.Tx, roomID string, +) (err error) { + _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID) + return +} diff --git a/federationsender/storage/sqlite3/outbound_peeks_table.go b/federationsender/storage/sqlite3/outbound_peeks_table.go index f3a81e7fb..67edda4d3 100644 --- a/federationsender/storage/sqlite3/outbound_peeks_table.go +++ b/federationsender/storage/sqlite3/outbound_peeks_table.go @@ -96,7 +96,7 @@ func NewSQLiteOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err er } func (s *outboundPeeksStatements) InsertOutboundPeek( - ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, ) (err error) { nowMilli := time.Now().UnixNano() / int64(time.Millisecond) stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt) @@ -105,7 +105,7 @@ func (s *outboundPeeksStatements) InsertOutboundPeek( } func (s *outboundPeeksStatements) RenewOutboundPeek( - ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64, ) (err error) { nowMilli := time.Now().UnixNano() / int64(time.Millisecond) _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index e4654f8a9..a2069f078 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -69,10 +69,19 @@ type FederationSenderBlacklist interface { } type FederationSenderOutboundPeeks interface { - InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error) - RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error) + InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error) SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error) DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) } + +type FederationSenderInboundPeeks interface { + InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error) + SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error) + DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) + DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) +} diff --git a/federationsender/types/types.go b/federationsender/types/types.go index 34390fac1..90da310c9 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -50,6 +50,7 @@ func (e EventIDMismatchError) Error() string { ) } +// tracks peeks we're performing on another server over federation type OutboundPeek struct { PeekID string RoomID string @@ -58,3 +59,13 @@ type OutboundPeek struct { RenewedTimestamp int64 RenewalInterval int64 } + +// tracks peeks other servers are performing on us over federation +type InboundPeek struct { + PeekID string + RoomID string + ServerName gomatrixserverlib.ServerName + CreationTimestamp int64 + RenewedTimestamp int64 + RenewalInterval int64 +} diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 4163cc2d0..ec4186a5e 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -219,4 +219,6 @@ type OutputNewRemotePeek struct { RoomID string PeekID string ServerName gomatrixserverlib.ServerName + // how often we told the peeking server to renew the peek + RenewalInterval int64 } diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index a8967dac1..b7c713330 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -165,6 +165,7 @@ type PerformHandleRemotePeekRequest struct { RoomID string `json:"room_id"` PeekID string `json:"peek_id"` ServerName gomatrixserverlib.ServerName `json:"server_name"` + RenewalInterval int64 `json:"renewal_interval"` } type PerformHandleRemotePeekResponse struct { diff --git a/roomserver/internal/perform/perform_handle_remote_peek.go b/roomserver/internal/perform/perform_handle_remote_peek.go index bcacdc726..a2aec73a7 100644 --- a/roomserver/internal/perform/perform_handle_remote_peek.go +++ b/roomserver/internal/perform/perform_handle_remote_peek.go @@ -110,6 +110,7 @@ func (r *HandleRemotePeeker) PerformHandleRemotePeek( RoomID: request.RoomID, PeekID: request.PeekID, ServerName: request.ServerName, + RenewalInterval: request.RenewalInterval, }, }, })