From baee97bff7b61cba075fa11e087731ffc0ad42de Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 11 Sep 2020 21:04:33 +0100 Subject: [PATCH] (almost) make it build --- federationsender/api/api.go | 2 +- federationsender/internal/perform.go | 20 +++++++------- federationsender/inthttp/client.go | 14 ++++++++++ federationsender/storage/interface.go | 6 ++--- federationsender/storage/shared/storage.go | 15 ++++++----- .../storage/sqlite3/remote_peeks_table.go | 27 ++++++++++--------- federationsender/storage/tables/interface.go | 7 ++--- federationsender/types/types.go | 5 +++- 8 files changed, 60 insertions(+), 36 deletions(-) diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 343d8301b..dc0856723 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -62,7 +62,7 @@ type FederationSenderInternalAPI interface { ctx context.Context, request *PerformPeekRequest, response *PerformPeekResponse, - ) + ) error // Handle an instruction to make_leave & send_leave with a remote server. PerformLeave( ctx context.Context, diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 5d887beee..92dfa89a5 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -212,7 +212,7 @@ func (r *FederationSenderInternalAPI) PerformPeek( ctx context.Context, request *api.PerformPeekRequest, response *api.PerformPeekResponse, -) (err error) { +) error { // Look up the supported room versions. var supportedVersions []gomatrixserverlib.RoomVersion for version := range version.SupportedRoomVersions() { @@ -252,7 +252,7 @@ func (r *FederationSenderInternalAPI) PerformPeek( } // We're all good. - return + return nil } // If we reach here then we didn't complete a peek for some reason. @@ -274,6 +274,8 @@ func (r *FederationSenderInternalAPI) PerformPeek( "failed to peek room %q through %d server(s): last error %s", request.RoomID, len(request.ServerNames), lastErr, ) + + return lastErr } func (r *FederationSenderInternalAPI) performPeekUsingServer( @@ -282,10 +284,16 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer( serverName gomatrixserverlib.ServerName, supportedVersions []gomatrixserverlib.RoomVersion, ) error { + // create a unique ID for this peek. + // for now we just use the room ID again. In future, if we ever + // support concurrent peeks to the same room with different filters + // then we would need to disambiguate further. + peekID := roomID + // check whether we're peeking already to try to avoid needlessly // re-peeking on the server. we don't need a transaction for this, // given this is a nice-to-have. - remotePeek, err := r.db.GetRemotePeek(ctx, serverName, roomID) + remotePeek, err := r.db.GetRemotePeek(ctx, roomID, serverName, peekID) if err != nil { return err } @@ -302,12 +310,6 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer( } } - // create a unique ID for this peek. - // for now we just use the room ID again. In future, if we ever - // support concurrent peeks to the same room with different filters - // then we would need to disambiguate further. - peekID := roomID - // Try to perform a /peek using the information supplied in the // request. respPeek, err := r.federation.Peek( diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 5bfe6089d..88e9e3d92 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -20,6 +20,7 @@ const ( FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" + FederationSenderPerformPeekRequestPath = "/federationsender/performPeekRequest" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" @@ -72,6 +73,19 @@ func (h *httpFederationSenderInternalAPI) PerformInvite( return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } +// Handle starting a peek on a remote server. +func (h *httpFederationSenderInternalAPI) PerformPeek( + ctx context.Context, + request *api.PerformPeekRequest, + response *api.PerformPeekResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPeekRequest") + defer span.Finish() + + apiURL := h.federationSenderURL + FederationSenderPerformPeekRequestPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + func (h *httpFederationSenderInternalAPI) PerformServersAlive( ctx context.Context, request *api.PerformServersAliveRequest, diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index f95db13cc..75de07f4d 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -54,8 +54,8 @@ type Database interface { RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) - AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error - RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error - GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error) + AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error + RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error + GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string) (types.RemotePeek, error) GetRemotePeeks(roomID string) ([]types.RemotePeek, error) } diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 8de66cc71..4fa66ab0a 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -35,6 +35,7 @@ type Database struct { FederationSenderJoinedHosts tables.FederationSenderJoinedHosts FederationSenderRooms tables.FederationSenderRooms FederationSenderBlacklist tables.FederationSenderBlacklist + FederationSenderRemotePeeks tables.FederationSenderRemotePeeks } // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. @@ -164,22 +165,22 @@ func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) } -func (d *Database) AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error { +func (d *Database) AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.RemotePeeks.InsertRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval) + return d.FederationSenderRemotePeeks.InsertRemotePeek(context.TODO(), txn, serverName, roomID, peekID, renewalInterval) }) } -func (d *Database) RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error { +func (d *Database) RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.RemotePeeks.RenewRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval) + return d.FederationSenderRemotePeeks.RenewRemotePeek(context.TODO(), txn, serverName, roomID, peekID, renewalInterval) }) } -func (d *Database) GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error) { - return d.RemotePeeks.SelectRemotePeek(context.TODO(), serverName, roomID) +func (d *Database) GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string) (types.RemotePeek, error) { + return d.FederationSenderRemotePeeks.SelectRemotePeek(context.TODO(), serverName, roomID, peekID) } func (d *Database) GetRemotePeeks(roomID string) ([]types.RemotePeek, error) { - return d.RemotePeeks.SelectRemotePeeks(context.TODO(), roomID) + return d.FederationSenderRemotePeeks.SelectRemotePeeks(context.TODO(), roomID) } diff --git a/federationsender/storage/sqlite3/remote_peeks_table.go b/federationsender/storage/sqlite3/remote_peeks_table.go index bcb952e45..5b05cded1 100644 --- a/federationsender/storage/sqlite3/remote_peeks_table.go +++ b/federationsender/storage/sqlite3/remote_peeks_table.go @@ -26,24 +26,25 @@ const remotePeeksSchema = ` CREATE TABLE IF NOT EXISTS federationsender_remote_peeks ( room_id TEXT NOT NULL, server_name TEXT NOT NULL, + peek_id TEXT NOT NULL, creation_ts INTEGER NOT NULL, renewed_ts INTEGER NOT NULL, renewal_interval INTEGER NOT NULL, - UNIQUE (room_id, server_name) + UNIQUE (room_id, server_name, peek_id) ); ` const insertRemotePeekSQL = "" + - "INSERT INTO federationsender_remote_peeks (room_id, server_name, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5)" + "INSERT INTO federationsender_remote_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)" const selectRemotePeekSQL = "" + - "SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2" + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3" const selectRemotePeeksSQL = "" + - "SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1" + "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1" const renewRemotePeekSQL = "" + - "UPDATE federationsender_remote_peeks SET renewed_ts=$3, renewal_interval=$4 WHERE room_id = $1 and server_name = $2" + "UPDATE federationsender_remote_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5" const deleteRemotePeekSQL = "" + "DELETE FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2" @@ -92,25 +93,25 @@ func NewSQLiteRemotePeeksTable(db *sql.DB) (s *remotePeeksStatements, err error) } func (s *remotePeeksStatements) InsertRemotePeek( - ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int, ) (err error) { nowMilli := time.Now().UnixNano() / int64(time.Millisecond) stmt := sqlutil.TxStmt(txn, s.insertRemotePeekStmt) - _, err := stmt.ExecContext(ctx, roomID, serverName, nowMilli, nowMilli, renewalInterval) + _, err := stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval) return } func (s *remotePeeksStatements) RenewRemotePeek( - ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int, ) (err error) { nowMilli := time.Now().UnixNano() / int64(time.Millisecond) - _, err := sqlutil.TxStmt(txn, s.renewRemotePeekStmt).ExecContext(ctx, roomID, serverName, nowMilli, renewalInterval) + _, err := sqlutil.TxStmt(txn, s.renewRemotePeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID) return } func (s *remotePeeksStatements) SelectRemotePeek( - ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID string, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, ) (remotePeek types.RemotePeek, err error) { rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID) if err != nil { @@ -121,6 +122,7 @@ func (s *remotePeeksStatements) SelectRemotePeek( if err = rows.Scan( &remotePeek.RoomID, &remotePeek.ServerName, + &remotePeek.PeekID, &remotePeek.CreationTimestamp, &remotePeek.RenewTimestamp, &remotePeek.RenewalInterval, @@ -144,6 +146,7 @@ func (s *remotePeeksStatements) SelectRemotePeeks( if err = rows.Scan( &remotePeek.RoomID, &remotePeek.ServerName, + &remotePeek.PeekID, &remotePeek.CreationTimestamp, &remotePeek.RenewTimestamp, &remotePeek.RenewalInterval, @@ -157,9 +160,9 @@ func (s *remotePeeksStatements) SelectRemotePeeks( } func (s *remotePeeksStatements) DeleteRemotePeek( - ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, ) (err error) { - _, err := sqlutil.TxStmt(txn, s.deleteRemotePeekStmt).ExecContext(ctx, roomID, serverName) + _, err := sqlutil.TxStmt(txn, s.deleteRemotePeekStmt).ExecContext(ctx, roomID, serverName, peekID) return } diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 61c9208b4..85a6ba98b 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -69,9 +69,10 @@ type FederationSenderBlacklist interface { } type FederationSenderRemotePeeks interface { - InsertRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error) - RenewRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error) + InsertRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error) + RenewRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error) + SelectRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (remotePeek types.RemotePeek, err error) SelectRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (remotePeeks []types.RemotePeek, err error) - DeleteRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName) (err error) + DeleteRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) DeleteRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) } diff --git a/federationsender/types/types.go b/federationsender/types/types.go index e8825d778..08182262a 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -50,7 +50,10 @@ func (e EventIDMismatchError) Error() string { ) } -type RemotePeek { +// UnixMs is the milliseconds since the Unix epoch +type UnixMs int64 + +type RemotePeek struct { PeekID string RoomID string ServerName gomatrixserverlib.ServerName