WIP of implementing MSC2444

This commit is contained in:
Matthew Hodgson 2020-09-04 01:38:22 +01:00
parent 994cc18b9c
commit c1f1fcdc67
10 changed files with 418 additions and 5 deletions

View file

@ -57,6 +57,12 @@ type FederationSenderInternalAPI interface {
request *PerformJoinRequest,
response *PerformJoinResponse,
)
// Handle an instruction to peek a room on a remote server.
PerformJoin(
ctx context.Context,
request *PerformPeekRequest,
response *PerformPeekResponse,
)
// Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave(
ctx context.Context,
@ -105,6 +111,16 @@ type PerformJoinResponse struct {
LastError *gomatrix.HTTPError
}
type PerformPeekRequest struct {
RoomID string `json:"room_id"`
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
}
type PerformPeekResponse struct {
LastError *gomatrix.HTTPError
}
type PerformLeaveRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`

View file

@ -186,7 +186,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
if err = joinCtx.CheckSendJoinResponse(
ctx, event, serverName, respMakeJoin, respSendJoin,
ctx, event, serverName, respSendJoin,
); err != nil {
return fmt.Errorf("joinCtx.CheckSendJoinResponse: %w", err)
}
@ -206,6 +206,156 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
return nil
}
// PerformPeekRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformPeek(
ctx context.Context,
request *api.PerformPeekRequest,
response *api.PerformPeekResponse,
) (err error) {
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[gomatrixserverlib.ServerName]bool)
var uniqueList []gomatrixserverlib.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// Try each server that we were provided until we land on one that
// successfully completes the peek
var lastErr error
for _, serverName := range request.ServerNames {
if err := r.performPeekUsingServer(
ctx,
request.RoomID,
serverName,
supportedVersions,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to peek room through server")
lastErr = err
continue
}
// We're all good.
return
}
// If we reach here then we didn't complete a peek for some reason.
var httpErr gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
// Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
httpErr.WrappedError = nil
response.LastError = &httpErr
} else {
response.LastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: lastErr.Error(),
}
}
logrus.Errorf(
"failed to peek room %q through %d server(s): last error %s",
request.RoomID, len(request.ServerNames), lastErr,
)
}
func (r *FederationSenderInternalAPI) performPeekUsingServer(
ctx context.Context,
roomID string,
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
) error {
// check whether we're peeking already to try to avoid needlessly
// re-peeking on the server. we don't need a transaction for this,
// given this is a nice-to-have.
remotePeek, err := r.db.GetRemotePeek(ctx, serverName, roomID)
if err != nil {
return err
}
renewing := false
if remotePeek != nil {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
if (nowMilli > remotePeek.RenewedTimestamp + remotePeek.RenewalInterval) {
logrus.Infof("stale remote peek to %s for %s already exists; renewing", serverName, roomID)
renewing = true
}
else {
logrus.Infof("live remote peek to %s for %s already exists", serverName, roomID)
return nil
}
}
// create a unique ID for this peek.
// for now we just use the room ID again. In future, if we ever
// support concurrent peeks to the same room with different filters
// then we would need to disambiguate further.
peekID := roomID
// Try to perform a /peek using the information supplied in the
// request.
respPeek, err := r.federation.Peek(
ctx,
serverName,
roomID,
peekID,
supportedVersions,
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.MakePeek: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Work out if we support the room version that has been supplied in
// the make_peek response.
if respPeek.RoomVersion == "" {
respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1
}
if _, err = respPeek.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err)
}
// If we've got this far, the remote server is peeking.
if renewing {
if err = r.db.RenewRemotePeek(ctx, serverName, roomID, respPeek.RenewalInterval); err != nil {
return err
}
}
else {
if err = r.db.AddRemotePeek(ctx, serverName, roomID, respPeek.RenewalInterval); err != nil {
return err
}
}
respState := respPeek.ToRespState()
// Send the newly returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
&respState,
event.Headered(respPeek.RoomVersion), nil,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
return nil
}
// PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context,

View file

@ -1,3 +1,17 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform
import (
@ -28,7 +42,6 @@ func (r joinContext) CheckSendJoinResponse(
ctx context.Context,
event gomatrixserverlib.Event,
server gomatrixserverlib.ServerName,
respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin,
) error {
// A list of events that we have retried, if they were not included in

View file

@ -53,4 +53,9 @@ type Database interface {
AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error
RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error
GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error)
GetRemotePeeks(roomID string) ([]types.RemotePeek, error)
}

View file

@ -163,3 +163,23 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
}
func (d *Database) AddRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.RemotePeeks.InsertRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval)
})
}
func (d *Database) RenewRemotePeek(serverName gomatrixserverlib.ServerName, roomID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.RemotePeeks.RenewRemotePeek(context.TODO(), txn, serverName, roomID, renewalInterval)
})
}
func (d *Database) GetRemotePeek(serverName gomatrixserverlib.ServerName, roomID string) (types.RemotePeek, error) {
return d.RemotePeeks.SelectRemotePeek(context.TODO(), serverName, roomID)
}
func (d *Database) GetRemotePeeks(roomID string) ([]types.RemotePeek, error) {
return d.RemotePeeks.SelectRemotePeeks(context.TODO(), roomID)
}

View file

@ -0,0 +1,171 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const remotePeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_remote_peeks (
room_id TEXT NOT NULL,
server_name TEXT NOT NULL,
creation_ts INTEGER NOT NULL,
renewed_ts INTEGER NOT NULL,
renewal_interval INTEGER NOT NULL,
UNIQUE (room_id, server_name)
);
`
const insertRemotePeekSQL = "" +
"INSERT INTO federationsender_remote_peeks (room_id, server_name, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5)"
const selectRemotePeekSQL = "" +
"SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2"
const selectRemotePeeksSQL = "" +
"SELECT room_id, server_name, creation_ts, renewed_ts, renewal_interval FROM federationsender_remote_peeks WHERE room_id = $1"
const renewRemotePeekSQL = "" +
"UPDATE federationsender_remote_peeks SET renewed_ts=$3, renewal_interval=$4 WHERE room_id = $1 and server_name = $2"
const deleteRemotePeekSQL = "" +
"DELETE FROM federationsender_remote_peeks WHERE room_id = $1 and server_name = $2"
const deleteRemotePeeksSQL = "" +
"DELETE FROM federationsender_remote_peeks WHERE room_id = $1"
type remotePeeksStatements struct {
db *sql.DB
insertRemotePeekStmt *sql.Stmt
selectRemotePeekStmt *sql.Stmt
selectRemotePeeksStmt *sql.Stmt
renewRemotePeekStmt *sql.Stmt
deleteRemotePeekStmt *sql.Stmt
deleteRemotePeeksStmt *sql.Stmt
}
func NewSQLiteRemotePeeksTable(db *sql.DB) (s *remotePeeksStatements, err error) {
s = &remotePeeksStatements{
db: db,
}
_, err = db.Exec(remotePeeksSchema)
if err != nil {
return
}
if s.insertRemotePeekStmt, err = db.Prepare(insertRemotePeekSQL); err != nil {
return
}
if s.selectRemotePeekStmt, err = db.Prepare(selectRemotePeekSQL); err != nil {
return
}
if s.selectRemotePeeksStmt, err = db.Prepare(selectRemotePeeksSQL); err != nil {
return
}
if s.renewRemotePeekStmt, err = db.Prepare(renewRemotePeekSQL); err != nil {
return
}
if s.deleteRemotePeeksStmt, err = db.Prepare(deleteRemotePeeksSQL); err != nil {
return
}
if s.deleteRemotePeekStmt, err = db.Prepare(deleteRemotePeekSQL); err != nil {
return
}
return
}
func (s *remotePeeksStatements) InsertRemotePeek(
ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertRemotePeekStmt)
_, err := stmt.ExecContext(ctx, roomID, serverName, nowMilli, nowMilli, renewalInterval)
return
}
func (s *remotePeeksStatements) RenewRemotePeek(
ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err := sqlutil.TxStmt(txn, s.renewRemotePeekStmt).ExecContext(ctx, roomID, serverName, nowMilli, renewalInterval)
return
}
func (s *remotePeeksStatements) SelectRemotePeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID string,
) (remotePeek types.RemotePeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectRemotePeek: rows.close() failed")
remotePeek := types.RemotePeek{}
if err = rows.Scan(
&remotePeek.RoomID,
&remotePeek.ServerName,
&remotePeek.CreationTimestamp,
&remotePeek.RenewTimestamp,
&remotePeek.RenewalInterval,
); err != nil {
return
}
return remotePeek, rows.Err()
}
func (s *remotePeeksStatements) SelectRemotePeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (remotePeeks []types.RemotePeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectRemotePeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectRemotePeeks: rows.close() failed")
for rows.Next() {
remotePeek := types.RemotePeek{}
if err = rows.Scan(
&remotePeek.RoomID,
&remotePeek.ServerName,
&remotePeek.CreationTimestamp,
&remotePeek.RenewTimestamp,
&remotePeek.RenewalInterval,
); err != nil {
return
}
remotePeeks = append(remotePeeks, remotePeek)
}
return remotePeeks, rows.Err()
}
func (s *remotePeeksStatements) DeleteRemotePeek(
ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName,
) (err error) {
_, err := sqlutil.TxStmt(txn, s.deleteRemotePeekStmt).ExecContext(ctx, roomID, serverName)
return
}
func (s *remotePeeksStatements) DeleteRemotePeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (err error) {
_, err := sqlutil.TxStmt(txn, s.deleteRemotePeeksStmt).ExecContext(ctx, roomID)
return
}

View file

@ -65,6 +65,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if err != nil {
return nil, err
}
remotePeeks, err := NewSQLiteRemotePeeksTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
@ -74,6 +78,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist,
FederationSenderRemotePeeks: remotePeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err

View file

@ -67,3 +67,11 @@ type FederationSenderBlacklist interface {
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
}
type FederationSenderRemotePeeks interface {
InsertRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error)
RenewRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName, renewalInterval int) (err error)
SelectRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (remotePeeks []types.RemotePeek, err error)
DeleteRemotePeek(ctx context.Context, txn *sql.Tx, roomID string, serverName gomatrixserverlib.ServerName) (err error)
DeleteRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
}

View file

@ -49,3 +49,12 @@ func (e EventIDMismatchError) Error() string {
e.DatabaseID, e.RoomServerID,
)
}
type RemotePeek {
PeekID string
RoomID string
ServerName gomatrixserverlib.ServerName
CreatedTimestamp UnixMs
RenewedTimestamp UnixMs
RenewalInterval UnixMs
}

View file

@ -152,11 +152,27 @@ func (r *Peeker) performPeekRoomByID(
}
}
// If the server name in the room ID isn't ours then it's a
// possible candidate for finding the room via federation. Add
// it to the list of servers to try.
// handle federated peeks
if domain != r.Cfg.Matrix.ServerName {
// If the server name in the room ID isn't ours then it's a
// possible candidate for finding the room via federation. Add
// it to the list of servers to try.
req.ServerNames = append(req.ServerNames, domain)
// Try peeking by all of the supplied server names.
fedReq := fsAPI.PerformPeekRequest{
RoomID: req.RoomIDOrAlias, // the room ID to try and peek
ServerNames: req.ServerNames, // the servers to try peeking via
}
fedRes := fsAPI.PerformPeekResponse{}
r.FSAPI.PerformPeek(ctx, &fedReq, &fedRes)
if fedRes.LastError != nil {
return &api.PerformError{
Code: api.PerformErrRemote,
Msg: fedRes.LastError.Message,
RemoteCode: fedRes.LastError.Code,
}
}
}
// If this room isn't world_readable, we reject.