(almost) make it build

This commit is contained in:
Matthew Hodgson 2020-09-11 21:04:33 +01:00
parent f236e8290d
commit baee97bff7
8 changed files with 60 additions and 36 deletions

View file

@ -62,7 +62,7 @@ type FederationSenderInternalAPI interface {
ctx context.Context, ctx context.Context,
request *PerformPeekRequest, request *PerformPeekRequest,
response *PerformPeekResponse, response *PerformPeekResponse,
) ) error
// Handle an instruction to make_leave & send_leave with a remote server. // Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave( PerformLeave(
ctx context.Context, ctx context.Context,

View file

@ -212,7 +212,7 @@ func (r *FederationSenderInternalAPI) PerformPeek(
ctx context.Context, ctx context.Context,
request *api.PerformPeekRequest, request *api.PerformPeekRequest,
response *api.PerformPeekResponse, response *api.PerformPeekResponse,
) (err error) { ) error {
// Look up the supported room versions. // Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() { for version := range version.SupportedRoomVersions() {
@ -252,7 +252,7 @@ func (r *FederationSenderInternalAPI) PerformPeek(
} }
// We're all good. // We're all good.
return return nil
} }
// If we reach here then we didn't complete a peek for some reason. // If we reach here then we didn't complete a peek for some reason.
@ -274,6 +274,8 @@ func (r *FederationSenderInternalAPI) PerformPeek(
"failed to peek room %q through %d server(s): last error %s", "failed to peek room %q through %d server(s): last error %s",
request.RoomID, len(request.ServerNames), lastErr, request.RoomID, len(request.ServerNames), lastErr,
) )
return lastErr
} }
func (r *FederationSenderInternalAPI) performPeekUsingServer( func (r *FederationSenderInternalAPI) performPeekUsingServer(
@ -282,10 +284,16 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion, supportedVersions []gomatrixserverlib.RoomVersion,
) error { ) error {
// create a unique ID for this peek.
// for now we just use the room ID again. In future, if we ever
// support concurrent peeks to the same room with different filters
// then we would need to disambiguate further.
peekID := roomID
// check whether we're peeking already to try to avoid needlessly // check whether we're peeking already to try to avoid needlessly
// re-peeking on the server. we don't need a transaction for this, // re-peeking on the server. we don't need a transaction for this,
// given this is a nice-to-have. // given this is a nice-to-have.
remotePeek, err := r.db.GetRemotePeek(ctx, serverName, roomID) remotePeek, err := r.db.GetRemotePeek(ctx, roomID, serverName, peekID)
if err != nil { if err != nil {
return err return err
} }
@ -302,12 +310,6 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer(
} }
} }
// create a unique ID for this peek.
// for now we just use the room ID again. In future, if we ever
// support concurrent peeks to the same room with different filters
// then we would need to disambiguate further.
peekID := roomID
// Try to perform a /peek using the information supplied in the // Try to perform a /peek using the information supplied in the
// request. // request.
respPeek, err := r.federation.Peek( respPeek, err := r.federation.Peek(

View file

@ -20,6 +20,7 @@ const (
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
FederationSenderPerformPeekRequestPath = "/federationsender/performPeekRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
@ -72,6 +73,19 @@ func (h *httpFederationSenderInternalAPI) PerformInvite(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// Handle starting a peek on a remote server.
func (h *httpFederationSenderInternalAPI) PerformPeek(
ctx context.Context,
request *api.PerformPeekRequest,
response *api.PerformPeekResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPeekRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformPeekRequestPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpFederationSenderInternalAPI) PerformServersAlive( func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context, ctx context.Context,
request *api.PerformServersAliveRequest, request *api.PerformServersAliveRequest,

View file

@ -54,8 +54,8 @@ type Database interface {
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error) GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string) (types.RemotePeek, error)
GetRemotePeeks(roomID string) ([]types.RemotePeek, error) GetRemotePeeks(roomID string) ([]types.RemotePeek, error)
} }

View file

@ -35,6 +35,7 @@ type Database struct {
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
FederationSenderRooms tables.FederationSenderRooms FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderRemotePeeks tables.FederationSenderRemotePeeks
} }
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@ -164,22 +165,22 @@ func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName)
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
} }
func (d *Database) AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error { func (d *Database) AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.RemotePeeks.InsertRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval) return d.FederationSenderRemotePeeks.InsertRemotePeek(context.TODO(), txn, serverName, roomID, peekID, renewalInterval)
}) })
} }
func (d *Database) RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error { func (d *Database) RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.RemotePeeks.RenewRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval) return d.FederationSenderRemotePeeks.RenewRemotePeek(context.TODO(), txn, serverName, roomID, peekID, renewalInterval)
}) })
} }
func (d *Database) GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error) { func (d *Database) GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID, peekID string) (types.RemotePeek, error) {
return d.RemotePeeks.SelectRemotePeek(context.TODO(), serverName, roomID) return d.FederationSenderRemotePeeks.SelectRemotePeek(context.TODO(), serverName, roomID, peekID)
} }
func (d *Database) GetRemotePeeks(roomID string) ([]types.RemotePeek, error) { func (d *Database) GetRemotePeeks(roomID string) ([]types.RemotePeek, error) {
return d.RemotePeeks.SelectRemotePeeks(context.TODO(), roomID) return d.FederationSenderRemotePeeks.SelectRemotePeeks(context.TODO(), roomID)
} }

View file

@ -26,24 +26,25 @@ const remotePeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_remote_peeks ( CREATE TABLE IF NOT EXISTS federationsender_remote_peeks (
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
server_name TEXT NOT NULL, server_name TEXT NOT NULL,
peek_id TEXT NOT NULL,
creation_ts INTEGER NOT NULL, creation_ts INTEGER NOT NULL,
renewed_ts INTEGER NOT NULL, renewed_ts INTEGER NOT NULL,
renewal_interval INTEGER NOT NULL, renewal_interval INTEGER NOT NULL,
UNIQUE (room_id, server_name) UNIQUE (room_id, server_name, peek_id)
); );
` `
const insertRemotePeekSQL = "" + const insertRemotePeekSQL = "" +
"INSERT INTO federationsender_remote_peeks (room_id, server_name, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5)" "INSERT INTO federationsender_remote_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
const selectRemotePeekSQL = "" + const selectRemotePeekSQL = "" +
"SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2" "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
const selectRemotePeeksSQL = "" + const selectRemotePeeksSQL = "" +
"SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1" "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1"
const renewRemotePeekSQL = "" + const renewRemotePeekSQL = "" +
"UPDATE federationsender_remote_peeks SET renewed_ts=$3, renewal_interval=$4 WHERE room_id = $1 and server_name = $2" "UPDATE federationsender_remote_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
const deleteRemotePeekSQL = "" + const deleteRemotePeekSQL = "" +
"DELETE FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2" "DELETE FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2"
@ -92,25 +93,25 @@ func NewSQLiteRemotePeeksTable(db *sql.DB) (s *remotePeeksStatements, err error)
} }
func (s *remotePeeksStatements) InsertRemotePeek( func (s *remotePeeksStatements) InsertRemotePeek(
ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int,
) (err error) { ) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond) nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertRemotePeekStmt) stmt := sqlutil.TxStmt(txn, s.insertRemotePeekStmt)
_, err := stmt.ExecContext(ctx, roomID, serverName, nowMilli, nowMilli, renewalInterval) _, err := stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
return return
} }
func (s *remotePeeksStatements) RenewRemotePeek( func (s *remotePeeksStatements) RenewRemotePeek(
ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int,
) (err error) { ) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond) nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err := sqlutil.TxStmt(txn, s.renewRemotePeekStmt).ExecContext(ctx, roomID, serverName, nowMilli, renewalInterval) _, err := sqlutil.TxStmt(txn, s.renewRemotePeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
return return
} }
func (s *remotePeeksStatements) SelectRemotePeek( func (s *remotePeeksStatements) SelectRemotePeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID string, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (remotePeek types.RemotePeek, err error) { ) (remotePeek types.RemotePeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID) rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID)
if err != nil { if err != nil {
@ -121,6 +122,7 @@ func (s *remotePeeksStatements) SelectRemotePeek(
if err = rows.Scan( if err = rows.Scan(
&remotePeek.RoomID, &remotePeek.RoomID,
&remotePeek.ServerName, &remotePeek.ServerName,
&remotePeek.PeekID,
&remotePeek.CreationTimestamp, &remotePeek.CreationTimestamp,
&remotePeek.RenewTimestamp, &remotePeek.RenewTimestamp,
&remotePeek.RenewalInterval, &remotePeek.RenewalInterval,
@ -144,6 +146,7 @@ func (s *remotePeeksStatements) SelectRemotePeeks(
if err = rows.Scan( if err = rows.Scan(
&remotePeek.RoomID, &remotePeek.RoomID,
&remotePeek.ServerName, &remotePeek.ServerName,
&remotePeek.PeekID,
&remotePeek.CreationTimestamp, &remotePeek.CreationTimestamp,
&remotePeek.RenewTimestamp, &remotePeek.RenewTimestamp,
&remotePeek.RenewalInterval, &remotePeek.RenewalInterval,
@ -157,9 +160,9 @@ func (s *remotePeeksStatements) SelectRemotePeeks(
} }
func (s *remotePeeksStatements) DeleteRemotePeek( func (s *remotePeeksStatements) DeleteRemotePeek(
ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (err error) { ) (err error) {
_, err := sqlutil.TxStmt(txn, s.deleteRemotePeekStmt).ExecContext(ctx, roomID, serverName) _, err := sqlutil.TxStmt(txn, s.deleteRemotePeekStmt).ExecContext(ctx, roomID, serverName, peekID)
return return
} }

View file

@ -69,9 +69,10 @@ type FederationSenderBlacklist interface {
} }
type FederationSenderRemotePeeks interface { type FederationSenderRemotePeeks interface {
InsertRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error) InsertRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
RenewRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error) RenewRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
SelectRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (remotePeek types.RemotePeek, err error)
SelectRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (remotePeeks []types.RemotePeek, err error) SelectRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (remotePeeks []types.RemotePeek, err error)
DeleteRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName) (err error) DeleteRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) DeleteRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
} }

View file

@ -50,7 +50,10 @@ func (e EventIDMismatchError) Error() string {
) )
} }
type RemotePeek { // UnixMs is the milliseconds since the Unix epoch
type UnixMs int64
type RemotePeek struct {
PeekID string PeekID string
RoomID string RoomID string
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName