Peeking over federation via MSC2444 (#1391)

* a very very WIP first cut of peeking via MSC2753.

doesn't yet compile or work.
needs to actually add the peeking block into the sync response.
checking in now before it gets any bigger, and to gather any initial feedback on the vague shape of it.

* make PeekingDeviceSet private

* add server_name param

* blind stab at adding a `peek` section to /sync

* make it build

* make it launch

* add peeking to getResponseWithPDUsForCompleteSync

* cancel any peeks when we join a room

* spell out how to runoutside of docker if you want speed

* fix SQL

* remove unnecessary txn for SelectPeeks

* fix s/join/peek/ cargocult fail

* HACK: Track goroutine IDs to determine when we write by the wrong thread

To use: set `DENDRITE_TRACE_SQL=1` then grep for `unsafe`

* Track partition offsets and only log unsafe for non-selects

* Put redactions in the writer goroutine

* Update filters on writer goroutine

* wrap peek storage in goid hack

* use exclusive writer, and MarkPeeksAsOld more efficiently

* don't log ascii in binary at sql trace...

* strip out empty roomd deltas

* re-add txn to SelectPeeks

* re-add accidentally deleted field

* reject peeks for non-worldreadable rooms

* move perform_peek

* fix package

* correctly refactor perform_peek

* WIP of implementing MSC2444

* typo

* Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking"

This reverts commit 3cebd8dbfb, reversing
changes made to ed4b3a58a7.

* (almost) make it build

* clean up bad merge

* support SendEventWithState with optional event

* fix build & lint

* fix build & lint

* reinstate federated peeks in the roomserver (doh)

* fix sql thinko

* todo for authenticating state returned by /peek

* support returning current state from QueryStateAndAuthChain

* handle SS /peek

* reimplement SS /peek to prod the RS to tell the FS about the peek

* rename RemotePeeks as OutboundPeeks

* rename remote_peeks_table as outbound_peeks_table

* add perform_handle_remote_peek.go

* flesh out federation doc

* add inbound peeks table and hook it up

* rename ambiguous RemotePeek as InboundPeek

* rename FSAPI's PerformPeek as PerformOutboundPeek

* setup inbound peeks db correctly

* fix api.SendEventWithState with no event

* track latestevent on /peek

* go fmt

* document the peek send stream race better

* fix SendEventWithRewrite not to bail if handed a non-state event

* add fixme

* switch SS /peek to use SendEventWithRewrite

* fix comment

* use reverse topo ordering to find latest extrem

* support postgres for federated peeking

* go fmt

* back out bogus go.mod change

* Fix performOutboundPeekUsingServer

* Fix getAuthChain -> GetAuthChain

* Fix build issues

* Fix build again

* Fix getAuthChain -> GetAuthChain

* Don't repeat outbound peeks for the same room ID to the same servers

* Fix lint

* Don't omitempty to appease sytest

Co-authored-by: Kegan Dougal <kegan@matrix.org>
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
Matthew Hodgson 2021-01-22 14:55:08 +00:00 committed by GitHub
parent 5d8ec0ff1a
commit 0571d395b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 1501 additions and 66 deletions

View file

@ -1,19 +1,26 @@
## Peeking ## Peeking
Peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753). Local peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753).
Implementationwise, this means: Implementationwise, this means:
* Users call `/peek` and `/unpeek` on the clientapi from a given device. * Users call `/peek` and `/unpeek` on the clientapi from a given device.
* The clientapi delegates these via HTTP to the roomserver, which coordinates peeking in general for a given room * The clientapi delegates these via HTTP to the roomserver, which coordinates peeking in general for a given room
* The roomserver writes an NewPeek event into the kafka log headed to the syncserver * The roomserver writes an NewPeek event into the kafka log headed to the syncserver
* The syncserver tracks the existence of the local peek in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response. * The syncserver tracks the existence of the local peek in the syncapi_peeks table in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response.
Questions (given this is [my](https://github.com/ara4n) first time hacking on Dendrite): Peeking over federation is implemented as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444).
* The whole clientapi -> roomserver -> syncapi flow to initiate a peek seems very indirect. Is there a reason not to just let syncapi itself host the implementation of `/peek`?
In future, peeking over federation will be added as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444). For requests to peek our rooms ("inbound peeks"):
* The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated `/peek` * Remote servers call `/peek` on federationapi
* The `federationsender` tracks the existence of the remote peek in question * The federationapi queries the federationsender to check if this is renewing an inbound peek or not.
* If not, it hits the PerformInboundPeek on the roomserver to ask it for the current state of the room.
* The roomserver atomically (in theory) adds a NewInboundPeek to its kafka stream to tell the federationserver to start peeking.
* The federationsender receives the event, tracks the inbound peek in the federationsender_inbound_peeks table, and starts sending events to the peeking server.
* The federationsender evicts stale inbound peeks which haven't been renewed.
For peeking into other server's rooms ("outbound peeks"):
* The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated outbound `/peek`
* The `federationsender` tracks the existence of the outbound peek in in its federationsender_outbound_peeks table.
* The `federationsender` regularly renews the remote peek as long as there are still peeking devices syncing for it. * The `federationsender` regularly renews the remote peek as long as there are still peeking devices syncing for it.
* TBD: how do we tell if there are no devices currently syncing for a given peeked room? The syncserver needs to tell the roomserver * TBD: how do we tell if there are no devices currently syncing for a given peeked room? The syncserver needs to tell the roomserver
somehow who then needs to warn the federationsender. somehow who then needs to warn the federationsender.

View file

@ -0,0 +1,102 @@
// Copyright 2020 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// Peek implements the SS /peek API, handling inbound peeks
func Peek(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg *config.FederationAPI,
rsAPI api.RoomserverInternalAPI,
roomID, peekID string,
remoteVersions []gomatrixserverlib.RoomVersion,
) util.JSONResponse {
// TODO: check if we're just refreshing an existing peek by querying the federationsender
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}
// Check that the room that the peeking server is trying to peek is actually
// one of the room versions that they listed in their supported ?ver= in
// the peek URL.
remoteSupportsVersion := false
for _, v := range remoteVersions {
if v == verRes.RoomVersion {
remoteSupportsVersion = true
break
}
}
// If it isn't, stop trying to peek the room.
if !remoteSupportsVersion {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.IncompatibleRoomVersion(verRes.RoomVersion),
}
}
// TODO: Check history visibility
// tell the peeking server to renew every hour
renewalInterval := int64(60 * 60 * 1000 * 1000)
var response api.PerformInboundPeekResponse
err := rsAPI.PerformInboundPeek(
httpReq.Context(),
&api.PerformInboundPeekRequest{
RoomID: roomID,
PeekID: peekID,
ServerName: request.Origin(),
RenewalInterval: renewalInterval,
},
&response,
)
if err != nil {
resErr := util.ErrorResponse(err)
return resErr
}
if !response.RoomExists {
return util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
}
respPeek := gomatrixserverlib.RespPeek{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
RoomVersion: response.RoomVersion,
LatestEvent: response.LatestEvent.Unwrap(),
RenewalInterval: renewalInterval,
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: respPeek,
}
}

View file

@ -229,7 +229,37 @@ func Setup(
}, },
)).Methods(http.MethodGet) )).Methods(http.MethodGet)
v1fedmux.Handle("/make_join/{roomID}/{eventID}", httputil.MakeFedAPI( v1fedmux.Handle("/peek/{roomID}/{peekID}", httputil.MakeFedAPI(
"federation_peek", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
if roomserverAPI.IsServerBannedFromRoom(httpReq.Context(), rsAPI, vars["roomID"], request.Origin()) {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Forbidden by server ACLs"),
}
}
roomID := vars["roomID"]
peekID := vars["peekID"]
queryVars := httpReq.URL.Query()
remoteVersions := []gomatrixserverlib.RoomVersion{}
if vers, ok := queryVars["ver"]; ok {
// The remote side supplied a ?ver= so use that to build up the list
// of supported room versions
for _, v := range vers {
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v))
}
} else {
// The remote side didn't supply a ?ver= so just assume that they only
// support room version 1
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1)
}
return Peek(
httpReq, request, cfg, rsAPI, roomID, peekID, remoteVersions,
)
},
)).Methods(http.MethodPut, http.MethodDelete)
v1fedmux.Handle("/make_join/{roomID}/{userID}", httputil.MakeFedAPI(
"federation_make_join", cfg.Matrix.ServerName, keys, wakeup, "federation_make_join", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
if roomserverAPI.IsServerBannedFromRoom(httpReq.Context(), rsAPI, vars["roomID"], request.Origin()) { if roomserverAPI.IsServerBannedFromRoom(httpReq.Context(), rsAPI, vars["roomID"], request.Origin()) {
@ -239,11 +269,11 @@ func Setup(
} }
} }
roomID := vars["roomID"] roomID := vars["roomID"]
eventID := vars["eventID"] userID := vars["userID"]
queryVars := httpReq.URL.Query() queryVars := httpReq.URL.Query()
remoteVersions := []gomatrixserverlib.RoomVersion{} remoteVersions := []gomatrixserverlib.RoomVersion{}
if vers, ok := queryVars["ver"]; ok { if vers, ok := queryVars["ver"]; ok {
// The remote side supplied a ?=ver so use that to build up the list // The remote side supplied a ?ver= so use that to build up the list
// of supported room versions // of supported room versions
for _, v := range vers { for _, v := range vers {
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v)) remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v))
@ -255,7 +285,7 @@ func Setup(
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1) remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1)
} }
return MakeJoin( return MakeJoin(
httpReq, request, cfg, rsAPI, roomID, eventID, remoteVersions, httpReq, request, cfg, rsAPI, roomID, userID, remoteVersions,
) )
}, },
)).Methods(http.MethodGet) )).Methods(http.MethodGet)

View file

@ -62,6 +62,12 @@ type FederationSenderInternalAPI interface {
request *PerformJoinRequest, request *PerformJoinRequest,
response *PerformJoinResponse, response *PerformJoinResponse,
) )
// Handle an instruction to peek a room on a remote server.
PerformOutboundPeek(
ctx context.Context,
request *PerformOutboundPeekRequest,
response *PerformOutboundPeekResponse,
) error
// Handle an instruction to make_leave & send_leave with a remote server. // Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave( PerformLeave(
ctx context.Context, ctx context.Context,
@ -111,6 +117,16 @@ type PerformJoinResponse struct {
LastError *gomatrix.HTTPError LastError *gomatrix.HTTPError
} }
type PerformOutboundPeekRequest struct {
RoomID string `json:"room_id"`
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
}
type PerformOutboundPeekResponse struct {
LastError *gomatrix.HTTPError
}
type PerformLeaveRequest struct { type PerformLeaveRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
UserID string `json:"user_id"` UserID string `json:"user_id"`

View file

@ -111,6 +111,14 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
} }
return nil return nil
} }
case api.OutputTypeNewInboundPeek:
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{
"event": output.NewInboundPeek,
log.ErrorKey: err,
}).Panicf("roomserver output log: remote peek event failure")
return nil
}
default: default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
@ -121,6 +129,23 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
// causing the federationsender to start sending messages to the peeking server
func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error {
// FIXME: there's a race here - we should start /sending new peeked events
// atomically after the orp.LatestEventID to ensure there are no gaps between
// the peek beginning and the send stream beginning.
//
// We probably need to track orp.LatestEventID on the inbound peek, but it's
// unclear how we then use that to prevent the race when we start the send
// stream.
//
// This is making the tests flakey.
return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
}
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
@ -164,6 +189,10 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
return err return err
} }
// TODO: do housekeeping to evict unrenewed peeking hosts
// TODO: implement query to let the fedapi check whether a given peek is live or not
// Send the event. // Send the event.
return s.queues.SendEvent( return s.queues.SendEvent(
ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
@ -171,7 +200,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
} }
// joinedHostsAtEvent works out a list of matrix servers that were joined to // joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event. // the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because: // It is important to use the state at the event for sending messages because:
// 1) We shouldn't send messages to servers that weren't in the room. // 1) We shouldn't send messages to servers that weren't in the room.
// 2) If a server is kicked from the rooms it should still be told about the // 2) If a server is kicked from the rooms it should still be told about the
@ -222,6 +251,15 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
joined[joinedHost.ServerName] = true joined[joinedHost.ServerName] = true
} }
// handle peeking hosts
inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID())
if err != nil {
return nil, err
}
for _, inboundPeek := range inboundPeeks {
joined[inboundPeek.ServerName] = true
}
var result []gomatrixserverlib.ServerName var result []gomatrixserverlib.ServerName
for serverName, include := range joined { for serverName, include := range joined {
if include { if include {

View file

@ -234,7 +234,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// Check that the send_join response was valid. // Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing) joinCtx := perform.JoinContext(r.federation, r.keyRing)
respState, err := joinCtx.CheckSendJoinResponse( respState, err := joinCtx.CheckSendJoinResponse(
ctx, event, serverName, respMakeJoin, respSendJoin, ctx, event, serverName, respSendJoin,
) )
if err != nil { if err != nil {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
@ -266,6 +266,172 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
return nil return nil
} }
// PerformOutboundPeekRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformOutboundPeek(
ctx context.Context,
request *api.PerformOutboundPeekRequest,
response *api.PerformOutboundPeekResponse,
) error {
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[gomatrixserverlib.ServerName]bool)
var uniqueList []gomatrixserverlib.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// See if there's an existing outbound peek for this room ID with
// one of the specified servers.
if peeks, err := r.db.GetOutboundPeeks(ctx, request.RoomID); err == nil {
for _, peek := range peeks {
if _, ok := seenSet[peek.ServerName]; ok {
return nil
}
}
}
// Try each server that we were provided until we land on one that
// successfully completes the peek
var lastErr error
for _, serverName := range request.ServerNames {
if err := r.performOutboundPeekUsingServer(
ctx,
request.RoomID,
serverName,
supportedVersions,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to peek room through server")
lastErr = err
continue
}
// We're all good.
return nil
}
// If we reach here then we didn't complete a peek for some reason.
var httpErr gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
// Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
httpErr.WrappedError = nil
response.LastError = &httpErr
} else {
response.LastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: lastErr.Error(),
}
}
logrus.Errorf(
"failed to peek room %q through %d server(s): last error %s",
request.RoomID, len(request.ServerNames), lastErr,
)
return lastErr
}
func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
ctx context.Context,
roomID string,
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
) error {
// create a unique ID for this peek.
// for now we just use the room ID again. In future, if we ever
// support concurrent peeks to the same room with different filters
// then we would need to disambiguate further.
peekID := roomID
// check whether we're peeking already to try to avoid needlessly
// re-peeking on the server. we don't need a transaction for this,
// given this is a nice-to-have.
outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
if err != nil {
return err
}
renewing := false
if outboundPeek != nil {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval {
logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
renewing = true
} else {
logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
return nil
}
}
// Try to perform an outbound /peek using the information supplied in the
// request.
respPeek, err := r.federation.Peek(
ctx,
serverName,
roomID,
peekID,
supportedVersions,
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.Peek: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Work out if we support the room version that has been supplied in
// the peek response.
if respPeek.RoomVersion == "" {
respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1
}
if _, err = respPeek.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err)
}
// TODO: authenticate the state returned (check its auth events etc)
// the equivalent of CheckSendJoinResponse()
// If we've got this far, the remote server is peeking.
if renewing {
if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
} else {
if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
}
respState := respPeek.ToRespState()
// logrus.Warnf("got respPeek %#v", respPeek)
// Send the newly returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
roomserverAPI.KindNew,
&respState,
respPeek.LatestEvent.Headered(respPeek.RoomVersion),
nil,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
return nil
}
// PerformLeaveRequest implements api.FederationSenderInternalAPI // PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformLeave( func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context, ctx context.Context,

View file

@ -1,3 +1,17 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform package perform
import ( import (
@ -28,7 +42,6 @@ func (r joinContext) CheckSendJoinResponse(
ctx context.Context, ctx context.Context,
event *gomatrixserverlib.Event, event *gomatrixserverlib.Event,
server gomatrixserverlib.ServerName, server gomatrixserverlib.ServerName,
respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin, respSendJoin gomatrixserverlib.RespSendJoin,
) (*gomatrixserverlib.RespState, error) { ) (*gomatrixserverlib.RespState, error) {
// A list of events that we have retried, if they were not included in // A list of events that we have retried, if they were not included in

View file

@ -20,6 +20,7 @@ const (
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
@ -76,6 +77,19 @@ func (h *httpFederationSenderInternalAPI) PerformInvite(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// Handle starting a peek on a remote server.
func (h *httpFederationSenderInternalAPI) PerformOutboundPeek(
ctx context.Context,
request *api.PerformOutboundPeekRequest,
response *api.PerformOutboundPeekResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformOutboundPeekRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformOutboundPeekRequestPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpFederationSenderInternalAPI) PerformServersAlive( func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context, ctx context.Context,
request *api.PerformServersAliveRequest, request *api.PerformServersAliveRequest,

View file

@ -51,7 +51,18 @@ type Database interface {
GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
// these don't have contexts passed in as we want things to happen regardless of the request context
AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error)
GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error)
AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error)
GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error)
} }

View file

@ -0,0 +1,176 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const inboundPeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
room_id TEXT NOT NULL,
server_name TEXT NOT NULL,
peek_id TEXT NOT NULL,
creation_ts BIGINT NOT NULL,
renewed_ts BIGINT NOT NULL,
renewal_interval BIGINT NOT NULL,
UNIQUE (room_id, server_name, peek_id)
);
`
const insertInboundPeekSQL = "" +
"INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
const selectInboundPeekSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
const selectInboundPeeksSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
const renewInboundPeekSQL = "" +
"UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
const deleteInboundPeekSQL = "" +
"DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
const deleteInboundPeeksSQL = "" +
"DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
type inboundPeeksStatements struct {
db *sql.DB
insertInboundPeekStmt *sql.Stmt
selectInboundPeekStmt *sql.Stmt
selectInboundPeeksStmt *sql.Stmt
renewInboundPeekStmt *sql.Stmt
deleteInboundPeekStmt *sql.Stmt
deleteInboundPeeksStmt *sql.Stmt
}
func NewPostgresInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
s = &inboundPeeksStatements{
db: db,
}
_, err = db.Exec(inboundPeeksSchema)
if err != nil {
return
}
if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
return
}
if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
return
}
if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
return
}
if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
return
}
if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
return
}
if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
return
}
return
}
func (s *inboundPeeksStatements) InsertInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
_, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
return
}
func (s *inboundPeeksStatements) RenewInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
return
}
func (s *inboundPeeksStatements) SelectInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (*types.InboundPeek, error) {
row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
inboundPeek := types.InboundPeek{}
err := row.Scan(
&inboundPeek.RoomID,
&inboundPeek.ServerName,
&inboundPeek.PeekID,
&inboundPeek.CreationTimestamp,
&inboundPeek.RenewedTimestamp,
&inboundPeek.RenewalInterval,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &inboundPeek, nil
}
func (s *inboundPeeksStatements) SelectInboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (inboundPeeks []types.InboundPeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
for rows.Next() {
inboundPeek := types.InboundPeek{}
if err = rows.Scan(
&inboundPeek.RoomID,
&inboundPeek.ServerName,
&inboundPeek.PeekID,
&inboundPeek.CreationTimestamp,
&inboundPeek.RenewedTimestamp,
&inboundPeek.RenewalInterval,
); err != nil {
return
}
inboundPeeks = append(inboundPeeks, inboundPeek)
}
return inboundPeeks, rows.Err()
}
func (s *inboundPeeksStatements) DeleteInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
return
}
func (s *inboundPeeksStatements) DeleteInboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
return
}

View file

@ -0,0 +1,176 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const outboundPeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
room_id TEXT NOT NULL,
server_name TEXT NOT NULL,
peek_id TEXT NOT NULL,
creation_ts BIGINT NOT NULL,
renewed_ts BIGINT NOT NULL,
renewal_interval BIGINT NOT NULL,
UNIQUE (room_id, server_name, peek_id)
);
`
const insertOutboundPeekSQL = "" +
"INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
const selectOutboundPeekSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
const selectOutboundPeeksSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
const renewOutboundPeekSQL = "" +
"UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
const deleteOutboundPeekSQL = "" +
"DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
const deleteOutboundPeeksSQL = "" +
"DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
type outboundPeeksStatements struct {
db *sql.DB
insertOutboundPeekStmt *sql.Stmt
selectOutboundPeekStmt *sql.Stmt
selectOutboundPeeksStmt *sql.Stmt
renewOutboundPeekStmt *sql.Stmt
deleteOutboundPeekStmt *sql.Stmt
deleteOutboundPeeksStmt *sql.Stmt
}
func NewPostgresOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
s = &outboundPeeksStatements{
db: db,
}
_, err = db.Exec(outboundPeeksSchema)
if err != nil {
return
}
if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
return
}
if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
return
}
if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
return
}
if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
return
}
if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
return
}
if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
return
}
return
}
func (s *outboundPeeksStatements) InsertOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
_, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
return
}
func (s *outboundPeeksStatements) RenewOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
return
}
func (s *outboundPeeksStatements) SelectOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (*types.OutboundPeek, error) {
row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
outboundPeek := types.OutboundPeek{}
err := row.Scan(
&outboundPeek.RoomID,
&outboundPeek.ServerName,
&outboundPeek.PeekID,
&outboundPeek.CreationTimestamp,
&outboundPeek.RenewedTimestamp,
&outboundPeek.RenewalInterval,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &outboundPeek, nil
}
func (s *outboundPeeksStatements) SelectOutboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (outboundPeeks []types.OutboundPeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
for rows.Next() {
outboundPeek := types.OutboundPeek{}
if err = rows.Scan(
&outboundPeek.RoomID,
&outboundPeek.ServerName,
&outboundPeek.PeekID,
&outboundPeek.CreationTimestamp,
&outboundPeek.RenewedTimestamp,
&outboundPeek.RenewalInterval,
); err != nil {
return
}
outboundPeeks = append(outboundPeeks, outboundPeek)
}
return outboundPeeks, rows.Err()
}
func (s *outboundPeeksStatements) DeleteOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
return
}
func (s *outboundPeeksStatements) DeleteOutboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
return
}

View file

@ -64,16 +64,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, err return nil, err
} }
inboundPeeks, err := NewPostgresInboundPeeksTable(d.db)
if err != nil {
return nil, err
}
outboundPeeks, err := NewPostgresOutboundPeeksTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Cache: cache, Cache: cache,
Writer: d.writer, Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts, FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON, FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms, FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist, FederationSenderBlacklist: blacklist,
FederationSenderInboundPeeks: inboundPeeks,
FederationSenderOutboundPeeks: outboundPeeks,
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err return nil, err

View file

@ -27,15 +27,17 @@ import (
) )
type Database struct { type Database struct {
DB *sql.DB DB *sql.DB
Cache caching.FederationSenderCache Cache caching.FederationSenderCache
Writer sqlutil.Writer Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON FederationSenderQueueJSON tables.FederationSenderQueueJSON
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
FederationSenderRooms tables.FederationSenderRooms FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
} }
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@ -173,3 +175,43 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) { func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
} }
func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
}
func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
}
func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}
func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
}
func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
}

View file

@ -0,0 +1,176 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const inboundPeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
room_id TEXT NOT NULL,
server_name TEXT NOT NULL,
peek_id TEXT NOT NULL,
creation_ts INTEGER NOT NULL,
renewed_ts INTEGER NOT NULL,
renewal_interval INTEGER NOT NULL,
UNIQUE (room_id, server_name, peek_id)
);
`
const insertInboundPeekSQL = "" +
"INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
const selectInboundPeekSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
const selectInboundPeeksSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
const renewInboundPeekSQL = "" +
"UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
const deleteInboundPeekSQL = "" +
"DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
const deleteInboundPeeksSQL = "" +
"DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
type inboundPeeksStatements struct {
db *sql.DB
insertInboundPeekStmt *sql.Stmt
selectInboundPeekStmt *sql.Stmt
selectInboundPeeksStmt *sql.Stmt
renewInboundPeekStmt *sql.Stmt
deleteInboundPeekStmt *sql.Stmt
deleteInboundPeeksStmt *sql.Stmt
}
func NewSQLiteInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
s = &inboundPeeksStatements{
db: db,
}
_, err = db.Exec(inboundPeeksSchema)
if err != nil {
return
}
if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
return
}
if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
return
}
if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
return
}
if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
return
}
if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
return
}
if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
return
}
return
}
func (s *inboundPeeksStatements) InsertInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
_, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
return
}
func (s *inboundPeeksStatements) RenewInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
return
}
func (s *inboundPeeksStatements) SelectInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (*types.InboundPeek, error) {
row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
inboundPeek := types.InboundPeek{}
err := row.Scan(
&inboundPeek.RoomID,
&inboundPeek.ServerName,
&inboundPeek.PeekID,
&inboundPeek.CreationTimestamp,
&inboundPeek.RenewedTimestamp,
&inboundPeek.RenewalInterval,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &inboundPeek, nil
}
func (s *inboundPeeksStatements) SelectInboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (inboundPeeks []types.InboundPeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
for rows.Next() {
inboundPeek := types.InboundPeek{}
if err = rows.Scan(
&inboundPeek.RoomID,
&inboundPeek.ServerName,
&inboundPeek.PeekID,
&inboundPeek.CreationTimestamp,
&inboundPeek.RenewedTimestamp,
&inboundPeek.RenewalInterval,
); err != nil {
return
}
inboundPeeks = append(inboundPeeks, inboundPeek)
}
return inboundPeeks, rows.Err()
}
func (s *inboundPeeksStatements) DeleteInboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
return
}
func (s *inboundPeeksStatements) DeleteInboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
return
}

View file

@ -0,0 +1,176 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const outboundPeeksSchema = `
CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
room_id TEXT NOT NULL,
server_name TEXT NOT NULL,
peek_id TEXT NOT NULL,
creation_ts INTEGER NOT NULL,
renewed_ts INTEGER NOT NULL,
renewal_interval INTEGER NOT NULL,
UNIQUE (room_id, server_name, peek_id)
);
`
const insertOutboundPeekSQL = "" +
"INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
const selectOutboundPeekSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
const selectOutboundPeeksSQL = "" +
"SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
const renewOutboundPeekSQL = "" +
"UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
const deleteOutboundPeekSQL = "" +
"DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
const deleteOutboundPeeksSQL = "" +
"DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
type outboundPeeksStatements struct {
db *sql.DB
insertOutboundPeekStmt *sql.Stmt
selectOutboundPeekStmt *sql.Stmt
selectOutboundPeeksStmt *sql.Stmt
renewOutboundPeekStmt *sql.Stmt
deleteOutboundPeekStmt *sql.Stmt
deleteOutboundPeeksStmt *sql.Stmt
}
func NewSQLiteOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
s = &outboundPeeksStatements{
db: db,
}
_, err = db.Exec(outboundPeeksSchema)
if err != nil {
return
}
if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
return
}
if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
return
}
if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
return
}
if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
return
}
if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
return
}
if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
return
}
return
}
func (s *outboundPeeksStatements) InsertOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
_, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
return
}
func (s *outboundPeeksStatements) RenewOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
) (err error) {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
_, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
return
}
func (s *outboundPeeksStatements) SelectOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (*types.OutboundPeek, error) {
row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
outboundPeek := types.OutboundPeek{}
err := row.Scan(
&outboundPeek.RoomID,
&outboundPeek.ServerName,
&outboundPeek.PeekID,
&outboundPeek.CreationTimestamp,
&outboundPeek.RenewedTimestamp,
&outboundPeek.RenewalInterval,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &outboundPeek, nil
}
func (s *outboundPeeksStatements) SelectOutboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (outboundPeeks []types.OutboundPeek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
for rows.Next() {
outboundPeek := types.OutboundPeek{}
if err = rows.Scan(
&outboundPeek.RoomID,
&outboundPeek.ServerName,
&outboundPeek.PeekID,
&outboundPeek.CreationTimestamp,
&outboundPeek.RenewedTimestamp,
&outboundPeek.RenewalInterval,
); err != nil {
return
}
outboundPeeks = append(outboundPeeks, outboundPeek)
}
return outboundPeeks, rows.Err()
}
func (s *outboundPeeksStatements) DeleteOutboundPeek(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
return
}
func (s *outboundPeeksStatements) DeleteOutboundPeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) (err error) {
_, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
return
}

View file

@ -66,16 +66,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, err return nil, err
} }
outboundPeeks, err := NewSQLiteOutboundPeeksTable(d.db)
if err != nil {
return nil, err
}
inboundPeeks, err := NewSQLiteInboundPeeksTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Cache: cache, Cache: cache,
Writer: d.writer, Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts, FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON, FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms, FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist, FederationSenderBlacklist: blacklist,
FederationSenderOutboundPeeks: outboundPeeks,
FederationSenderInboundPeeks: inboundPeeks,
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err return nil, err

View file

@ -67,3 +67,21 @@ type FederationSenderBlacklist interface {
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
} }
type FederationSenderOutboundPeeks interface {
InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error)
DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
}
type FederationSenderInboundPeeks interface {
InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error)
DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
}

View file

@ -49,3 +49,23 @@ func (e EventIDMismatchError) Error() string {
e.DatabaseID, e.RoomServerID, e.DatabaseID, e.RoomServerID,
) )
} }
// tracks peeks we're performing on another server over federation
type OutboundPeek struct {
PeekID string
RoomID string
ServerName gomatrixserverlib.ServerName
CreationTimestamp int64
RenewedTimestamp int64
RenewalInterval int64
}
// tracks peeks other servers are performing on us over federation
type InboundPeek struct {
PeekID string
RoomID string
ServerName gomatrixserverlib.ServerName
CreationTimestamp int64
RenewedTimestamp int64
RenewalInterval int64
}

2
go.mod
View file

@ -41,7 +41,7 @@ require (
go.uber.org/atomic v1.6.0 go.uber.org/atomic v1.6.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b golang.org/x/net v0.0.0-20200528225125-3c3fba18258b
golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78 // indirect golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
gopkg.in/h2non/bimg.v1 v1.1.4 gopkg.in/h2non/bimg.v1 v1.1.4
gopkg.in/yaml.v2 v2.3.0 gopkg.in/yaml.v2 v2.3.0
) )

4
go.sum
View file

@ -995,8 +995,8 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78 h1:nVuTkr9L6Bq62qpUqKo/RnZCFfzDBL0bYo6w9OJUqZY= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View file

@ -56,6 +56,12 @@ type RoomserverInternalAPI interface {
res *PerformPublishResponse, res *PerformPublishResponse,
) )
PerformInboundPeek(
ctx context.Context,
req *PerformInboundPeekRequest,
res *PerformInboundPeekResponse,
) error
QueryPublishedRooms( QueryPublishedRooms(
ctx context.Context, ctx context.Context,
req *QueryPublishedRoomsRequest, req *QueryPublishedRoomsRequest,

View file

@ -88,6 +88,16 @@ func (t *RoomserverInternalAPITrace) PerformPublish(
util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res)) util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res))
} }
func (t *RoomserverInternalAPITrace) PerformInboundPeek(
ctx context.Context,
req *PerformInboundPeekRequest,
res *PerformInboundPeekResponse,
) error {
err := t.Impl.PerformInboundPeek(ctx, req, res)
util.GetLogger(ctx).Infof("PerformInboundPeek req=%+v res=%+v", js(req), js(res))
return err
}
func (t *RoomserverInternalAPITrace) QueryPublishedRooms( func (t *RoomserverInternalAPITrace) QueryPublishedRooms(
ctx context.Context, ctx context.Context,
req *QueryPublishedRoomsRequest, req *QueryPublishedRoomsRequest,

View file

@ -51,6 +51,8 @@ const (
// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek // OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
OutputTypeNewPeek OutputType = "new_peek" OutputTypeNewPeek OutputType = "new_peek"
// OutputTypeNewInboundPeek indicates that the kafka event is an OutputNewInboundPeek
OutputTypeNewInboundPeek OutputType = "new_inbound_peek"
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek // OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
OutputTypeRetirePeek OutputType = "retire_peek" OutputTypeRetirePeek OutputType = "retire_peek"
) )
@ -72,6 +74,8 @@ type OutputEvent struct {
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
// The content of event with type OutputTypeNewPeek // The content of event with type OutputTypeNewPeek
NewPeek *OutputNewPeek `json:"new_peek,omitempty"` NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
// The content of event with type OutputTypeNewInboundPeek
NewInboundPeek *OutputNewInboundPeek `json:"new_inbound_peek,omitempty"`
// The content of event with type OutputTypeRetirePeek // The content of event with type OutputTypeRetirePeek
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"` RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
} }
@ -245,6 +249,19 @@ type OutputNewPeek struct {
DeviceID string DeviceID string
} }
// An OutputNewInboundPeek is written whenever a server starts peeking into a room
type OutputNewInboundPeek struct {
RoomID string
PeekID string
// the event ID at which the peek begins (so we can avoid
// a race between tracking the state returned by /peek and emitting subsequent
// peeked events)
LatestEventID string
ServerName gomatrixserverlib.ServerName
// how often we told the peeking server to renew the peek
RenewalInterval int64
}
// An OutputRetirePeek is written whenever a user stops peeking into a room. // An OutputRetirePeek is written whenever a user stops peeking into a room.
type OutputRetirePeek struct { type OutputRetirePeek struct {
RoomID string RoomID string

View file

@ -172,6 +172,28 @@ type PerformPublishResponse struct {
Error *PerformError Error *PerformError
} }
type PerformInboundPeekRequest struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
PeekID string `json:"peek_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"`
RenewalInterval int64 `json:"renewal_interval"`
}
type PerformInboundPeekResponse struct {
// Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// The current state and auth chain events.
// The lists will be in an arbitrary order.
StateEvents []*gomatrixserverlib.HeaderedEvent `json:"state_events"`
AuthChainEvents []*gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
// The event at which this state was captured
LatestEvent *gomatrixserverlib.HeaderedEvent `json:"latest_event"`
}
// PerformForgetRequest is a request to PerformForget // PerformForgetRequest is a request to PerformForget
type PerformForgetRequest struct { type PerformForgetRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`

View file

@ -221,7 +221,7 @@ type QueryStateAndAuthChainRequest struct {
// The room ID to query the state in. // The room ID to query the state in.
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
// The list of prev events for the event. Used to calculate the state at // The list of prev events for the event. Used to calculate the state at
// the event // the event.
PrevEventIDs []string `json:"prev_event_ids"` PrevEventIDs []string `json:"prev_event_ids"`
// The list of auth events for the event. Used to calculate the auth chain // The list of auth events for the event. Used to calculate the auth chain
AuthEventIDs []string `json:"auth_event_ids"` AuthEventIDs []string `json:"auth_event_ids"`

View file

@ -43,7 +43,7 @@ func SendEvents(
// SendEventWithState writes an event with the specified kind to the roomserver // SendEventWithState writes an event with the specified kind to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is // with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs // marked as `true` in haveEventIDs.
func SendEventWithState( func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,

View file

@ -24,6 +24,7 @@ type RoomserverInternalAPI struct {
*perform.Inviter *perform.Inviter
*perform.Joiner *perform.Joiner
*perform.Peeker *perform.Peeker
*perform.InboundPeeker
*perform.Unpeeker *perform.Unpeeker
*perform.Leaver *perform.Leaver
*perform.Publisher *perform.Publisher
@ -97,6 +98,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.InboundPeeker = &perform.InboundPeeker{
DB: r.DB,
Inputer: r.Inputer,
}
r.Unpeeker = &perform.Unpeeker{ r.Unpeeker = &perform.Unpeeker{
ServerName: r.Cfg.Matrix.ServerName, ServerName: r.Cfg.Matrix.ServerName,
Cfg: r.Cfg, Cfg: r.Cfg,

View file

@ -0,0 +1,129 @@
// Copyright 2020 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform
import (
"context"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type InboundPeeker struct {
DB storage.Database
Inputer *input.Inputer
}
// PerformInboundPeek handles peeking into matrix rooms, including over
// federation by talking to the federationsender. called when a remote server
// initiates a /peek over federation.
//
// It should atomically figure out the current state of the room (for the
// response to /peek) while adding the new inbound peek to the kafka stream so the
// fed sender can start sending peeked events without a race between the state
// snapshot and the stream of peeked events.
func (r *InboundPeeker) PerformInboundPeek(
ctx context.Context,
request *api.PerformInboundPeekRequest,
response *api.PerformInboundPeekResponse,
) error {
info, err := r.DB.RoomInfo(ctx, request.RoomID)
if err != nil {
return err
}
if info == nil || info.IsStub {
return nil
}
response.RoomExists = true
response.RoomVersion = info.RoomVersion
var stateEvents []*gomatrixserverlib.Event
var currentStateSnapshotNID types.StateSnapshotNID
latestEventRefs, currentStateSnapshotNID, _, err :=
r.DB.LatestEventIDs(ctx, info.RoomNID)
if err != nil {
return err
}
latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID})
if err != nil {
return err
}
var sortedLatestEvents []*gomatrixserverlib.Event
for _, ev := range latestEvents {
sortedLatestEvents = append(sortedLatestEvents, ev.Event)
}
sortedLatestEvents = gomatrixserverlib.ReverseTopologicalOrdering(
sortedLatestEvents,
gomatrixserverlib.TopologicalOrderByPrevEvents,
)
response.LatestEvent = sortedLatestEvents[0].Headered(info.RoomVersion)
// XXX: do we actually need to do a state resolution here?
roomState := state.NewStateResolution(r.DB, *info)
var stateEntries []types.StateEntry
stateEntries, err = roomState.LoadStateAtSnapshot(
ctx, currentStateSnapshotNID,
)
if err != nil {
return err
}
stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries)
if err != nil {
return err
}
// get the auth event IDs for the current state events
var authEventIDs []string
for _, se := range stateEvents {
authEventIDs = append(authEventIDs, se.AuthEventIDs()...)
}
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
authEvents, err := query.GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil {
return err
}
for _, event := range stateEvents {
response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion))
}
for _, event := range authEvents {
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
}
err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
{
Type: api.OutputTypeNewInboundPeek,
NewInboundPeek: &api.OutputNewInboundPeek{
RoomID: request.RoomID,
PeekID: request.PeekID,
LatestEventID: latestEvents[0].EventID(),
ServerName: request.ServerName,
RenewalInterval: request.RenewalInterval,
},
},
})
return err
}

View file

@ -151,11 +151,28 @@ func (r *Peeker) performPeekRoomByID(
} }
} }
// If the server name in the room ID isn't ours then it's a // handle federated peeks
// possible candidate for finding the room via federation. Add // FIXME: don't create an outbound peek if we already have one going.
// it to the list of servers to try.
if domain != r.Cfg.Matrix.ServerName { if domain != r.Cfg.Matrix.ServerName {
// If the server name in the room ID isn't ours then it's a
// possible candidate for finding the room via federation. Add
// it to the list of servers to try.
req.ServerNames = append(req.ServerNames, domain) req.ServerNames = append(req.ServerNames, domain)
// Try peeking by all of the supplied server names.
fedReq := fsAPI.PerformOutboundPeekRequest{
RoomID: req.RoomIDOrAlias, // the room ID to try and peek
ServerNames: req.ServerNames, // the servers to try peeking via
}
fedRes := fsAPI.PerformOutboundPeekResponse{}
_ = r.FSAPI.PerformOutboundPeek(ctx, &fedReq, &fedRes)
if fedRes.LastError != nil {
return "", &api.PerformError{
Code: api.PerformErrRemote,
Msg: fedRes.LastError.Message,
RemoteCode: fedRes.LastError.Code,
}
}
} }
// If this room isn't world_readable, we reject. // If this room isn't world_readable, we reject.

View file

@ -107,7 +107,7 @@ func (r *Queryer) QueryStateAfterEvents(
} }
authEventIDs = util.UniqueStrings(authEventIDs) authEventIDs = util.UniqueStrings(authEventIDs)
authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil { if err != nil {
return fmt.Errorf("getAuthChain: %w", err) return fmt.Errorf("getAuthChain: %w", err)
} }
@ -447,10 +447,12 @@ func (r *Queryer) QueryStateAndAuthChain(
response.RoomExists = true response.RoomExists = true
response.RoomVersion = info.RoomVersion response.RoomVersion = info.RoomVersion
stateEvents, err := r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs) var stateEvents []*gomatrixserverlib.Event
stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
if err != nil { if err != nil {
return err return err
} }
response.PrevEventsExist = true response.PrevEventsExist = true
// add the auth event IDs for the current state events too // add the auth event IDs for the current state events too
@ -461,7 +463,7 @@ func (r *Queryer) QueryStateAndAuthChain(
} }
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil { if err != nil {
return err return err
} }
@ -510,11 +512,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomIn
type eventsFromIDs func(context.Context, []string) ([]types.Event, error) type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
// getAuthChain fetches the auth chain for the given auth events. An auth chain // GetAuthChain fetches the auth chain for the given auth events. An auth chain
// is the list of all events that are referenced in the auth_events section, and // is the list of all events that are referenced in the auth_events section, and
// all their auth_events, recursively. The returned set of events contain the // all their auth_events, recursively. The returned set of events contain the
// given events. Will *not* error if we don't have all auth events. // given events. Will *not* error if we don't have all auth events.
func getAuthChain( func GetAuthChain(
ctx context.Context, fn eventsFromIDs, authEventIDs []string, ctx context.Context, fn eventsFromIDs, authEventIDs []string,
) ([]*gomatrixserverlib.Event, error) { ) ([]*gomatrixserverlib.Event, error) {
// List of event IDs to fetch. On each pass, these events will be requested // List of event IDs to fetch. On each pass, these events will be requested
@ -718,7 +720,7 @@ func (r *Queryer) QueryServerBannedFromRoom(ctx context.Context, req *api.QueryS
} }
func (r *Queryer) QueryAuthChain(ctx context.Context, req *api.QueryAuthChainRequest, res *api.QueryAuthChainResponse) error { func (r *Queryer) QueryAuthChain(ctx context.Context, req *api.QueryAuthChainRequest, res *api.QueryAuthChainResponse) error {
chain, err := getAuthChain(ctx, r.DB.EventsFromIDs, req.EventIDs) chain, err := GetAuthChain(ctx, r.DB.EventsFromIDs, req.EventIDs)
if err != nil { if err != nil {
return err return err
} }

View file

@ -106,7 +106,7 @@ func TestGetAuthChainSingle(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err) t.Fatalf("Failed to add events to db: %v", err)
} }
result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e"}) result, err := GetAuthChain(context.TODO(), db.EventsFromIDs, []string{"e"})
if err != nil { if err != nil {
t.Fatalf("getAuthChain failed: %v", err) t.Fatalf("getAuthChain failed: %v", err)
} }
@ -139,7 +139,7 @@ func TestGetAuthChainMultiple(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err) t.Fatalf("Failed to add events to db: %v", err)
} }
result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e", "f"}) result, err := GetAuthChain(context.TODO(), db.EventsFromIDs, []string{"e", "f"})
if err != nil { if err != nil {
t.Fatalf("getAuthChain failed: %v", err) t.Fatalf("getAuthChain failed: %v", err)
} }

View file

@ -26,14 +26,15 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations // Perform operations
RoomserverPerformInvitePath = "/roomserver/performInvite" RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformPeekPath = "/roomserver/performPeek" RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformUnpeekPath = "/roomserver/performUnpeek" RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
RoomserverPerformJoinPath = "/roomserver/performJoin" RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave" RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill" RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish" RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformForgetPath = "/roomserver/performForget" RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
RoomserverPerformForgetPath = "/roomserver/performForget"
// Query operations // Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -216,6 +217,18 @@ func (h *httpRoomserverInternalAPI) PerformPeek(
} }
} }
func (h *httpRoomserverInternalAPI) PerformInboundPeek(
ctx context.Context,
request *api.PerformInboundPeekRequest,
response *api.PerformInboundPeekResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInboundPeek")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformInboundPeekPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpRoomserverInternalAPI) PerformUnpeek( func (h *httpRoomserverInternalAPI) PerformUnpeek(
ctx context.Context, ctx context.Context,
request *api.PerformUnpeekRequest, request *api.PerformUnpeekRequest,

View file

@ -72,6 +72,19 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(RoomserverPerformInboundPeekPath,
httputil.MakeInternalAPI("performInboundPeek", func(req *http.Request) util.JSONResponse {
var request api.PerformInboundPeekRequest
var response api.PerformInboundPeekResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.PerformInboundPeek(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(RoomserverPerformPeekPath, internalAPIMux.Handle(RoomserverPerformPeekPath,
httputil.MakeInternalAPI("performUnpeek", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("performUnpeek", func(req *http.Request) util.JSONResponse {
var request api.PerformUnpeekRequest var request api.PerformUnpeekRequest

View file

@ -372,7 +372,7 @@ type Response struct {
Leave map[string]LeaveResponse `json:"leave"` Leave map[string]LeaveResponse `json:"leave"`
} `json:"rooms"` } `json:"rooms"`
ToDevice struct { ToDevice struct {
Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"` Events []gomatrixserverlib.SendToDeviceEvent `json:"events"`
} `json:"to_device"` } `json:"to_device"`
DeviceLists struct { DeviceLists struct {
Changed []string `json:"changed,omitempty"` Changed []string `json:"changed,omitempty"`