This commit is contained in:
Matthew Hodgson 2020-09-23 00:08:23 +01:00
parent 20e2cb4b7e
commit 3202c7e76f
13 changed files with 106 additions and 109 deletions

View file

@ -71,9 +71,9 @@ func Peek(
err := rsAPI.PerformInboundPeek( err := rsAPI.PerformInboundPeek(
httpReq.Context(), httpReq.Context(),
&api.PerformInboundPeekRequest{ &api.PerformInboundPeekRequest{
RoomID: roomID, RoomID: roomID,
PeekID: peekID, PeekID: peekID,
ServerName: request.Origin(), ServerName: request.Origin(),
RenewalInterval: renewalInterval, RenewalInterval: renewalInterval,
}, },
&response, &response,
@ -88,10 +88,10 @@ func Peek(
} }
respPeek := gomatrixserverlib.RespPeek{ respPeek := gomatrixserverlib.RespPeek{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
RoomVersion: response.RoomVersion, RoomVersion: response.RoomVersion,
LatestEvent: response.LatestEvent.Unwrap(), LatestEvent: response.LatestEvent.Unwrap(),
RenewalInterval: renewalInterval, RenewalInterval: renewalInterval,
} }
@ -100,4 +100,3 @@ func Peek(
JSON: respPeek, JSON: respPeek,
} }
} }

View file

@ -119,7 +119,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// causing the federationsender to start sending messages to the peeking server // causing the federationsender to start sending messages to the peeking server
func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error { func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error {
// FIXME: do something with orp.LatestEventID to prevent races // FIXME: do something with orp.LatestEventID to prevent races
return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
} }
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room

View file

@ -304,7 +304,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
renewing := false renewing := false
if outboundPeek != nil { if outboundPeek != nil {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond) nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
if nowMilli > outboundPeek.RenewedTimestamp + outboundPeek.RenewalInterval { if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval {
logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID) logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
renewing = true renewing = true
} else { } else {

View file

@ -20,7 +20,7 @@ const (
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest" FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"

View file

@ -56,7 +56,7 @@ const deleteInboundPeeksSQL = "" +
"DELETE FROM federationsender_inbound_peeks WHERE room_id = $1" "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
type inboundPeeksStatements struct { type inboundPeeksStatements struct {
db *sql.DB db *sql.DB
insertInboundPeekStmt *sql.Stmt insertInboundPeekStmt *sql.Stmt
selectInboundPeekStmt *sql.Stmt selectInboundPeekStmt *sql.Stmt
selectInboundPeeksStmt *sql.Stmt selectInboundPeeksStmt *sql.Stmt

View file

@ -56,7 +56,7 @@ const deleteOutboundPeeksSQL = "" +
"DELETE FROM federationsender_outbound_peeks WHERE room_id = $1" "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
type outboundPeeksStatements struct { type outboundPeeksStatements struct {
db *sql.DB db *sql.DB
insertOutboundPeekStmt *sql.Stmt insertOutboundPeekStmt *sql.Stmt
selectOutboundPeekStmt *sql.Stmt selectOutboundPeekStmt *sql.Stmt
selectOutboundPeeksStmt *sql.Stmt selectOutboundPeeksStmt *sql.Stmt

View file

@ -78,10 +78,10 @@ type FederationSenderOutboundPeeks interface {
} }
type FederationSenderInboundPeeks interface { type FederationSenderInboundPeeks interface {
InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error) SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error) SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error)
DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
} }

View file

@ -187,4 +187,4 @@ type RoomserverInternalAPI interface {
req *RemoveRoomAliasRequest, req *RemoveRoomAliasRequest,
response *RemoveRoomAliasResponse, response *RemoveRoomAliasResponse,
) error ) error
} }

View file

@ -230,13 +230,13 @@ type OutputNewPeek struct {
// An OutputNewInboundPeek is written whenever a server starts peeking into a room // An OutputNewInboundPeek is written whenever a server starts peeking into a room
type OutputNewInboundPeek struct { type OutputNewInboundPeek struct {
RoomID string RoomID string
PeekID string PeekID string
// the event ID at which the peek begins (so we can avoid // the event ID at which the peek begins (so we can avoid
// a race between tracking the state returned by /peek and emitting subsequent // a race between tracking the state returned by /peek and emitting subsequent
// peeked events) // peeked events)
LatestEventID string LatestEventID string
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
// how often we told the peeking server to renew the peek // how often we told the peeking server to renew the peek
RenewalInterval int64 RenewalInterval int64
} }

View file

@ -161,11 +161,11 @@ type PerformPublishResponse struct {
} }
type PerformInboundPeekRequest struct { type PerformInboundPeekRequest struct {
UserID string `json:"user_id"` UserID string `json:"user_id"`
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
PeekID string `json:"peek_id"` PeekID string `json:"peek_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"` ServerName gomatrixserverlib.ServerName `json:"server_name"`
RenewalInterval int64 `json:"renewal_interval"` RenewalInterval int64 `json:"renewal_interval"`
} }
type PerformInboundPeekResponse struct { type PerformInboundPeekResponse struct {
@ -180,4 +180,4 @@ type PerformInboundPeekResponse struct {
AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
// The event at which this state was captured // The event at which this state was captured
LatestEvent gomatrixserverlib.HeaderedEvent `json:"latest_event"` LatestEvent gomatrixserverlib.HeaderedEvent `json:"latest_event"`
} }

View file

@ -93,8 +93,8 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.InboundPeeker = &perform.InboundPeeker{ r.InboundPeeker = &perform.InboundPeeker{
DB: r.DB, DB: r.DB,
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.Leaver = &perform.Leaver{ r.Leaver = &perform.Leaver{
Cfg: r.Cfg, Cfg: r.Cfg,
@ -142,4 +142,3 @@ func (r *RoomserverInternalAPI) PerformLeave(
} }
return r.WriteOutputEvents(req.RoomID, outputEvents) return r.WriteOutputEvents(req.RoomID, outputEvents)
} }

View file

@ -18,18 +18,18 @@ import (
"context" "context"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
type InboundPeeker struct { type InboundPeeker struct {
DB storage.Database DB storage.Database
Inputer *input.Inputer Inputer *input.Inputer
} }
@ -42,86 +42,85 @@ type InboundPeeker struct {
// fed sender can start sending peeked events without a race between the state // fed sender can start sending peeked events without a race between the state
// snapshot and the stream of peeked events. // snapshot and the stream of peeked events.
func (r *InboundPeeker) PerformInboundPeek( func (r *InboundPeeker) PerformInboundPeek(
ctx context.Context, ctx context.Context,
request *api.PerformInboundPeekRequest, request *api.PerformInboundPeekRequest,
response *api.PerformInboundPeekResponse, response *api.PerformInboundPeekResponse,
) error { ) error {
info, err := r.DB.RoomInfo(ctx, request.RoomID) info, err := r.DB.RoomInfo(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }
if info == nil || info.IsStub { if info == nil || info.IsStub {
return nil return nil
} }
response.RoomExists = true response.RoomExists = true
response.RoomVersion = info.RoomVersion response.RoomVersion = info.RoomVersion
var stateEvents []gomatrixserverlib.Event var stateEvents []gomatrixserverlib.Event
var currentStateSnapshotNID types.StateSnapshotNID var currentStateSnapshotNID types.StateSnapshotNID
latestEventRefs, currentStateSnapshotNID, _, err := latestEventRefs, currentStateSnapshotNID, _, err :=
r.DB.LatestEventIDs(ctx, info.RoomNID) r.DB.LatestEventIDs(ctx, info.RoomNID)
if err != nil { if err != nil {
return err return err
} }
// XXX: is this actually the latest of the latest events? // XXX: is this actually the latest of the latest events?
latestEvents, err := r.DB.EventsFromIDs(ctx, []string{ latestEventRefs[0].EventID }) latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID})
if err != nil { if err != nil {
return err return err
} }
response.LatestEvent = latestEvents[0].Headered(info.RoomVersion) response.LatestEvent = latestEvents[0].Headered(info.RoomVersion)
// XXX: do we actually need to do a state resolution here? // XXX: do we actually need to do a state resolution here?
roomState := state.NewStateResolution(r.DB, *info) roomState := state.NewStateResolution(r.DB, *info)
var stateEntries []types.StateEntry var stateEntries []types.StateEntry
stateEntries, err = roomState.LoadStateAtSnapshot( stateEntries, err = roomState.LoadStateAtSnapshot(
ctx, currentStateSnapshotNID, ctx, currentStateSnapshotNID,
) )
if err != nil { if err != nil {
return err return err
} }
stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries) stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries)
if err != nil { if err != nil {
return err return err
} }
// get the auth event IDs for the current state events // get the auth event IDs for the current state events
var authEventIDs []string var authEventIDs []string
for _, se := range stateEvents { for _, se := range stateEvents {
authEventIDs = append(authEventIDs, se.AuthEventIDs()...) authEventIDs = append(authEventIDs, se.AuthEventIDs()...)
} }
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
authEvents, err := query.GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) authEvents, err := query.GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil { if err != nil {
return err return err
} }
for _, event := range stateEvents { for _, event := range stateEvents {
response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion)) response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion))
} }
for _, event := range authEvents { for _, event := range authEvents {
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
} }
// FIXME: there's a race here - we really should be atomically telling the // FIXME: there's a race here - we really should be atomically telling the
// federationsender to start sending peek events alongside having captured // federationsender to start sending peek events alongside having captured
// the current state, but it's unclear if/how we can do that. // the current state, but it's unclear if/how we can do that.
err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
{ {
Type: api.OutputTypeNewInboundPeek, Type: api.OutputTypeNewInboundPeek,
NewInboundPeek: &api.OutputNewInboundPeek{ NewInboundPeek: &api.OutputNewInboundPeek{
RoomID: request.RoomID, RoomID: request.RoomID,
PeekID: request.PeekID, PeekID: request.PeekID,
LatestEventID: latestEvents[0].EventID(), LatestEventID: latestEvents[0].EventID(),
ServerName: request.ServerName, ServerName: request.ServerName,
RenewalInterval: request.RenewalInterval, RenewalInterval: request.RenewalInterval,
}, },
}, },
}) })
return err return err
} }

View file

@ -25,13 +25,13 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations // Perform operations
RoomserverPerformInvitePath = "/roomserver/performInvite" RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformPeekPath = "/roomserver/performPeek" RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformJoinPath = "/roomserver/performJoin" RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave" RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill" RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish" RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
// Query operations // Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"