rename RemotePeeks as OutboundPeeks

This commit is contained in:
Matthew Hodgson 2020-09-12 22:59:25 +01:00
parent 0ae0d11446
commit 4e96e62923
8 changed files with 47 additions and 223 deletions

View file

@ -167,7 +167,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// TODO: track what hosts are peeking (federationsender_received_peeks)
// TODO: rename federationsender_remote_peeks as federationsender_sent_peeks
// TOOD: add peeking hosts to the joinedHosts list
// TODO: add peeking hosts to the joinedHosts list
// TODO: do housekeeping to evict unrenewed peeking hosts

View file

@ -294,18 +294,18 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer(
// check whether we're peeking already to try to avoid needlessly
// re-peeking on the server. we don't need a transaction for this,
// given this is a nice-to-have.
remotePeek, err := r.db.GetRemotePeek(ctx, serverName, roomID, peekID)
outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
if err != nil {
return err
}
renewing := false
if remotePeek != nil {
if outboundPeek != nil {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
if nowMilli > remotePeek.RenewedTimestamp+remotePeek.RenewalInterval {
logrus.Infof("stale remote peek to %s for %s already exists; renewing", serverName, roomID)
if nowMilli > outboundPeek.RenewedTimestamp + outboundPeek.RenewalInterval {
logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
renewing = true
} else {
logrus.Infof("live remote peek to %s for %s already exists", serverName, roomID)
logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
return nil
}
}
@ -339,11 +339,11 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer(
// If we've got this far, the remote server is peeking.
if renewing {
if err = r.db.RenewRemotePeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
} else {
if err = r.db.AddRemotePeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
}

View file

@ -55,8 +55,8 @@ type Database interface {
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
AddRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
RenewRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
GetRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.RemotePeek, error)
GetRemotePeeks(ctx context.Context, roomID string) ([]types.RemotePeek, error)
AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error)
GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error)
}

View file

@ -27,15 +27,15 @@ import (
)
type Database struct {
DB *sql.DB
Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderRemotePeeks tables.FederationSenderRemotePeeks
DB *sql.DB
Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
}
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@ -165,22 +165,22 @@ func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName)
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
}
func (d *Database) AddRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderRemotePeeks.InsertRemotePeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) RenewRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderRemotePeeks.RenewRemotePeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) GetRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.RemotePeek, error) {
return d.FederationSenderRemotePeeks.SelectRemotePeek(ctx, nil, serverName, roomID, peekID)
func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
}
func (d *Database) GetRemotePeeks(ctx context.Context, roomID string) ([]types.RemotePeek, error) {
return d.FederationSenderRemotePeeks.SelectRemotePeeks(ctx, nil, roomID)
func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
}

View file

@ -1,176 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const remotePeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_remote_peeks (
room_id TEXT NOT NULL,
server_name TEXT NOT NULL,
peek_id TEXT NOT NULL,
creation_ts INTEGER NOT NULL,
renewed_ts INTEGER NOT NULL,
renewal_interval INTEGER NOT NULL,
UNIQUE (room_id, server_name, peek_id)
);
`
const insertRemotePeekSQL = "" +
"INSERT INTO federationsender_remote_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
const selectRemotePeekSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
const selectRemotePeeksSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1"
const renewRemotePeekSQL = "" +
"UPDATE federationsender_remote_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
const deleteRemotePeekSQL = "" +
"DELETE FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2"
const deleteRemotePeeksSQL = "" +
"DELETE FROM federationsender_remote_peeks WHERE room_id = $1"
type remotePeeksStatements struct {
db *sql.DB
insertRemotePeekStmt *sql.Stmt
selectRemotePeekStmt *sql.Stmt
selectRemotePeeksStmt *sql.Stmt
renewRemotePeekStmt *sql.Stmt
deleteRemotePeekStmt *sql.Stmt
deleteRemotePeeksStmt *sql.Stmt
}
func NewSQLiteRemotePeeksTable(db *sql.DB) (s *remotePeeksStatements, err error) {
s = &remotePeeksStatements{
db: db,
}
_, err = db.Exec(remotePeeksSchema)
if err != nil {
return
}
if s.insertRemotePeekStmt, err = db.Prepare(insertRemotePeekSQL); err != nil {
return
}
if s.selectRemotePeekStmt, err = db.Prepare(selectRemotePeekSQL); err != nil {
return
}
if s.selectRemotePeeksStmt, err = db.Prepare(selectRemotePeeksSQL); err != nil {
return
}
if s.renewRemotePeekStmt, err = db.Prepare(renewRemotePeekSQL); err != nil {
return
}
if s.deleteRemotePeeksStmt, err = db.Prepare(deleteRemotePeeksSQL); err != nil {
return
}
if s.deleteRemotePeekStmt, err = db.Prepare(deleteRemotePeekSQL); err != nil {
return
}
return
}
func (s *remotePeeksStatements) InsertRemotePeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertRemotePeekStmt)
_, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
return
}
func (s *remotePeeksStatements) RenewRemotePeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err = sqlutil.TxStmt(txn, s.renewRemotePeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
return
}
func (s *remotePeeksStatements) SelectRemotePeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (*types.RemotePeek, error) {
row := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryRowContext(ctx, roomID)
remotePeek := types.RemotePeek{}
err := row.Scan(
&remotePeek.RoomID,
&remotePeek.ServerName,
&remotePeek.PeekID,
&remotePeek.CreationTimestamp,
&remotePeek.RenewedTimestamp,
&remotePeek.RenewalInterval,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &remotePeek, nil
}
func (s *remotePeeksStatements) SelectRemotePeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (remotePeeks []types.RemotePeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectRemotePeeks: rows.close() failed")
for rows.Next() {
remotePeek := types.RemotePeek{}
if err = rows.Scan(
&remotePeek.RoomID,
&remotePeek.ServerName,
&remotePeek.PeekID,
&remotePeek.CreationTimestamp,
&remotePeek.RenewedTimestamp,
&remotePeek.RenewalInterval,
); err != nil {
return
}
remotePeeks = append(remotePeeks, remotePeek)
}
return remotePeeks, rows.Err()
}
func (s *remotePeeksStatements) DeleteRemotePeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteRemotePeekStmt).ExecContext(ctx, roomID, serverName, peekID)
return
}
func (s *remotePeeksStatements) DeleteRemotePeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteRemotePeeksStmt).ExecContext(ctx, roomID)
return
}

View file

@ -65,20 +65,20 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if err != nil {
return nil, err
}
remotePeeks, err := NewSQLiteRemotePeeksTable(d.db)
outboundPeeks, err := NewSQLiteOutboundPeeksTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist,
FederationSenderRemotePeeks: remotePeeks,
DB: d.db,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist,
FederationSenderOutboundPeeks: outboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err

View file

@ -68,11 +68,11 @@ type FederationSenderBlacklist interface {
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
}
type FederationSenderRemotePeeks interface {
InsertRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
RenewRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
SelectRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (remotePeek *types.RemotePeek, err error)
SelectRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (remotePeeks []types.RemotePeek, err error)
DeleteRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
type FederationSenderOutboundPeeks interface {
InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error)
DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
}

View file

@ -50,7 +50,7 @@ func (e EventIDMismatchError) Error() string {
)
}
type RemotePeek struct {
type OutboundPeek struct {
PeekID string
RoomID string
ServerName gomatrixserverlib.ServerName