add inbound peeks table and hook it up

This commit is contained in:
Matthew Hodgson 2020-09-12 23:54:46 +01:00
parent 0dc422cdd8
commit 71732f2c28
11 changed files with 252 additions and 18 deletions

View file

@ -64,6 +64,9 @@ func Peek(
// TODO: Check history visibility // TODO: Check history visibility
// tell the peeking server to renew every hour
renewalInterval := int64(60 * 60 * 1000 * 1000)
var response api.PerformHandleRemotePeekResponse var response api.PerformHandleRemotePeekResponse
err := rsAPI.PerformHandleRemotePeek( err := rsAPI.PerformHandleRemotePeek(
httpReq.Context(), httpReq.Context(),
@ -71,6 +74,7 @@ func Peek(
RoomID: roomID, RoomID: roomID,
PeekID: peekID, PeekID: peekID,
ServerName: request.Origin(), ServerName: request.Origin(),
RenewalInterval: renewalInterval,
}, },
&response, &response,
) )
@ -89,7 +93,7 @@ func Peek(
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
RoomVersion: response.RoomVersion, RoomVersion: response.RoomVersion,
RenewalInterval: 60 * 60 * 1000 * 1000, // one hour RenewalInterval: renewalInterval,
}, },
} }
} }

View file

@ -115,10 +115,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// processMessage updates the list of currently joined hosts in the room // processRemotePeek adds a new inbound peek (replacing the existing one if any)
// and then sends the event to the hosts that were joined before the event. // causing the federationsender to start sending messages to the peeking server
func (s *OutputRoomEventConsumer) processRemotePeek(orp api.OutputNewRemotePeek) error { func (s *OutputRoomEventConsumer) processRemotePeek(orp api.OutputNewRemotePeek) error {
return nil return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
} }
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
@ -164,11 +164,6 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
return err return err
} }
// TODO: track what hosts are peeking (federationsender_received_peeks)
// TODO: rename federationsender_remote_peeks as federationsender_sent_peeks
// TODO: add peeking hosts to the joinedHosts list
// TODO: do housekeeping to evict unrenewed peeking hosts // TODO: do housekeeping to evict unrenewed peeking hosts
// TODO: implement query to let the fedapi check whether a given peek is live or not // TODO: implement query to let the fedapi check whether a given peek is live or not
@ -180,7 +175,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
} }
// joinedHostsAtEvent works out a list of matrix servers that were joined to // joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event. // the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because: // It is important to use the state at the event for sending messages because:
// 1) We shouldn't send messages to servers that weren't in the room. // 1) We shouldn't send messages to servers that weren't in the room.
// 2) If a server is kicked from the rooms it should still be told about the // 2) If a server is kicked from the rooms it should still be told about the
@ -231,6 +226,15 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
joined[joinedHost.ServerName] = true joined[joinedHost.ServerName] = true
} }
// handle peeking hosts
inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.RoomID())
if err != nil {
return nil, err
}
for _, inboundPeek := range inboundPeeks {
joined[inboundPeek.ServerName] = true
}
var result []gomatrixserverlib.ServerName var result []gomatrixserverlib.ServerName
for serverName, include := range joined { for serverName, include := range joined {
if include { if include {

View file

@ -55,8 +55,13 @@ type Database interface {
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error)
GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error)
AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error)
GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error)
} }

View file

@ -36,6 +36,7 @@ type Database struct {
FederationSenderRooms tables.FederationSenderRooms FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
} }
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@ -165,13 +166,13 @@ func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName)
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
} }
func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error { func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
}) })
} }
func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error { func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
}) })
@ -184,3 +185,23 @@ func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserve
func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) { func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID) return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
} }
func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
}
func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
}

View file

@ -0,0 +1,176 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const inboundPeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
room_id TEXT NOT NULL,
server_name TEXT NOT NULL,
peek_id TEXT NOT NULL,
creation_ts INTEGER NOT NULL,
renewed_ts INTEGER NOT NULL,
renewal_interval INTEGER NOT NULL,
UNIQUE (room_id, server_name, peek_id)
);
`
const insertInboundPeekSQL = "" +
"INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
const selectInboundPeekSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
const selectInboundPeeksSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
const renewInboundPeekSQL = "" +
"UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
const deleteInboundPeekSQL = "" +
"DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
const deleteInboundPeeksSQL = "" +
"DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
type inboundPeeksStatements struct {
db *sql.DB
insertInboundPeekStmt *sql.Stmt
selectInboundPeekStmt *sql.Stmt
selectInboundPeeksStmt *sql.Stmt
renewInboundPeekStmt *sql.Stmt
deleteInboundPeekStmt *sql.Stmt
deleteInboundPeeksStmt *sql.Stmt
}
func NewSQLiteInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
s = &inboundPeeksStatements{
db: db,
}
_, err = db.Exec(inboundPeeksSchema)
if err != nil {
return
}
if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
return
}
if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
return
}
if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
return
}
if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
return
}
if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
return
}
if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
return
}
return
}
func (s *inboundPeeksStatements) InsertInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
_, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
return
}
func (s *inboundPeeksStatements) RenewInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
return
}
func (s *inboundPeeksStatements) SelectInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (*types.InboundPeek, error) {
row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
inboundPeek := types.InboundPeek{}
err := row.Scan(
&inboundPeek.RoomID,
&inboundPeek.ServerName,
&inboundPeek.PeekID,
&inboundPeek.CreationTimestamp,
&inboundPeek.RenewedTimestamp,
&inboundPeek.RenewalInterval,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &inboundPeek, nil
}
func (s *inboundPeeksStatements) SelectInboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (inboundPeeks []types.InboundPeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
for rows.Next() {
inboundPeek := types.InboundPeek{}
if err = rows.Scan(
&inboundPeek.RoomID,
&inboundPeek.ServerName,
&inboundPeek.PeekID,
&inboundPeek.CreationTimestamp,
&inboundPeek.RenewedTimestamp,
&inboundPeek.RenewalInterval,
); err != nil {
return
}
inboundPeeks = append(inboundPeeks, inboundPeek)
}
return inboundPeeks, rows.Err()
}
func (s *inboundPeeksStatements) DeleteInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
return
}
func (s *inboundPeeksStatements) DeleteInboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
return
}

View file

@ -96,7 +96,7 @@ func NewSQLiteOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err er
} }
func (s *outboundPeeksStatements) InsertOutboundPeek( func (s *outboundPeeksStatements) InsertOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) { ) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond) nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt) stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
@ -105,7 +105,7 @@ func (s *outboundPeeksStatements) InsertOutboundPeek(
} }
func (s *outboundPeeksStatements) RenewOutboundPeek( func (s *outboundPeeksStatements) RenewOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) { ) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond) nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)

View file

@ -69,10 +69,19 @@ type FederationSenderBlacklist interface {
} }
type FederationSenderOutboundPeeks interface { type FederationSenderOutboundPeeks interface {
InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error) InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error) RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error) SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error) SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error)
DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
} }
type FederationSenderInboundPeeks interface {
InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error)
DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
}

View file

@ -50,6 +50,7 @@ func (e EventIDMismatchError) Error() string {
) )
} }
// tracks peeks we're performing on another server over federation
type OutboundPeek struct { type OutboundPeek struct {
PeekID string PeekID string
RoomID string RoomID string
@ -58,3 +59,13 @@ type OutboundPeek struct {
RenewedTimestamp int64 RenewedTimestamp int64
RenewalInterval int64 RenewalInterval int64
} }
// tracks peeks other servers are performing on us over federation
type InboundPeek struct {
PeekID string
RoomID string
ServerName gomatrixserverlib.ServerName
CreationTimestamp int64
RenewedTimestamp int64
RenewalInterval int64
}

View file

@ -219,4 +219,6 @@ type OutputNewRemotePeek struct {
RoomID string RoomID string
PeekID string PeekID string
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
// how often we told the peeking server to renew the peek
RenewalInterval int64
} }

View file

@ -165,6 +165,7 @@ type PerformHandleRemotePeekRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
PeekID string `json:"peek_id"` PeekID string `json:"peek_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"` ServerName gomatrixserverlib.ServerName `json:"server_name"`
RenewalInterval int64 `json:"renewal_interval"`
} }
type PerformHandleRemotePeekResponse struct { type PerformHandleRemotePeekResponse struct {

View file

@ -110,6 +110,7 @@ func (r *HandleRemotePeeker) PerformHandleRemotePeek(
RoomID: request.RoomID, RoomID: request.RoomID,
PeekID: request.PeekID, PeekID: request.PeekID,
ServerName: request.ServerName, ServerName: request.ServerName,
RenewalInterval: request.RenewalInterval,
}, },
}, },
}) })