Merge branch 'master' into master

This commit is contained in:
Neil Alexander 2020-04-28 12:54:49 +01:00 committed by GitHub
commit 18020594d2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 500 additions and 224 deletions

View file

@ -127,18 +127,18 @@ func SendMembership(
returnData = struct { returnData = struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
}{roomID} }{roomID}
fallthrough
default: default:
} _, err = producer.SendEvents(
req.Context(),
_, err = producer.SendEvents( []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
req.Context(), cfg.Matrix.ServerName,
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, nil,
cfg.Matrix.ServerName, )
nil, if err != nil {
) util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
if err != nil { return jsonerror.InternalServerError()
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") }
return jsonerror.InternalServerError()
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -149,12 +149,12 @@ func main() {
federation := createFederationClient(base) federation := createFederationClient(base)
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := roomserver.SetupRoomServerComponent(&base.Base) alias, input, query := roomserver.SetupRoomServerComponent(&base.Base, keyRing)
eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query) fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
&base.Base, deviceDB, accountDB, &base.Base, deviceDB, accountDB,

View file

@ -26,10 +26,10 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
_, _, query := base.CreateHTTPRoomserverAPIs() _, input, query := base.CreateHTTPRoomserverAPIs()
federationsender.SetupFederationSenderComponent( federationsender.SetupFederationSenderComponent(
base, federation, query, base, federation, query, input,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender)) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender))

View file

@ -57,12 +57,12 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := roomserver.SetupRoomServerComponent(base) alias, input, query := roomserver.SetupRoomServerComponent(base, keyRing)
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query) fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,

View file

@ -18,6 +18,7 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
) )
@ -25,8 +26,11 @@ func main() {
cfg := basecomponent.ParseFlags() cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "RoomServerAPI") base := basecomponent.NewBaseDendrite(cfg, "RoomServerAPI")
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
keyDB := base.CreateKeyDB()
federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
roomserver.SetupRoomServerComponent(base) roomserver.SetupRoomServerComponent(base, keyRing)
base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer)) base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer))

View file

@ -24,7 +24,7 @@ import (
func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) { func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) {
decoded := make(map[string]string, len(vmap)) decoded := make(map[string]string, len(vmap))
for key, value := range vmap { for key, value := range vmap {
decodedVal, err := url.QueryUnescape(value) decodedVal, err := url.PathUnescape(value)
if err != nil { if err != nil {
return make(map[string]string), err return make(map[string]string), err
} }

View file

@ -69,6 +69,7 @@ func Backfill(
// Populate the request. // Populate the request.
req := api.QueryBackfillRequest{ req := api.QueryBackfillRequest{
RoomID: roomID,
EarliestEventsIDs: eIDs, EarliestEventsIDs: eIDs,
ServerName: request.Origin(), ServerName: request.Origin(),
} }
@ -97,7 +98,10 @@ func Backfill(
} }
var eventJSONs []json.RawMessage var eventJSONs []json.RawMessage
for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(evs) { for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(
evs,
gomatrixserverlib.TopologicalOrderByPrevEvents,
) {
eventJSONs = append(eventJSONs, e.JSON()) eventJSONs = append(eventJSONs, e.JSON())
} }

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/query" "github.com/matrix-org/dendrite/federationsender/query"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
@ -34,13 +35,16 @@ func SetupFederationSenderComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsQueryAPI roomserverAPI.RoomserverQueryAPI, rsQueryAPI roomserverAPI.RoomserverQueryAPI,
rsInputAPI roomserverAPI.RoomserverInputAPI,
) api.FederationSenderQueryAPI { ) api.FederationSenderQueryAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties()) federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties())
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db") logrus.WithError(err).Panic("failed to connect to federation sender db")
} }
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) roomserverProducer := producers.NewRoomserverProducer(rsInputAPI, base.Cfg.Matrix.ServerName)
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation, roomserverProducer)
rsConsumer := consumers.NewOutputRoomEventConsumer( rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues, base.Cfg, base.KafkaConsumer, queues,

View file

@ -0,0 +1,66 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"context"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// RoomserverProducer produces events for the roomserver to consume.
type RoomserverProducer struct {
InputAPI api.RoomserverInputAPI
serverName gomatrixserverlib.ServerName
}
// NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(
inputAPI api.RoomserverInputAPI, serverName gomatrixserverlib.ServerName,
) *RoomserverProducer {
return &RoomserverProducer{
InputAPI: inputAPI,
serverName: serverName,
}
}
// SendInviteResponse drops an invite response back into the roomserver so that users
// already in the room will be notified of the new invite. The invite response is signed
// by the remote side.
func (c *RoomserverProducer) SendInviteResponse(
ctx context.Context, res gomatrixserverlib.RespInviteV2, roomVersion gomatrixserverlib.RoomVersion,
) (string, error) {
ev := res.Event.Headered(roomVersion)
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: ev,
AuthEventIDs: ev.AuthEventIDs(),
SendAsServer: string(c.serverName),
TransactionID: nil,
}
return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire})
}
// SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse
err = c.InputAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
}

View file

@ -21,6 +21,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -32,6 +33,7 @@ import (
// ensures that only one request is in flight to a given destination // ensures that only one request is in flight to a given destination
// at a time. // at a time.
type destinationQueue struct { type destinationQueue struct {
rsProducer *producers.RoomserverProducer
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName
@ -165,18 +167,38 @@ func (oq *destinationQueue) nextInvites() bool {
} }
for _, inviteReq := range oq.pendingInvites { for _, inviteReq := range oq.pendingInvites {
ev := inviteReq.Event() ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion()
if _, err := oq.client.SendInviteV2( log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_version": roomVersion,
"destination": oq.destination,
}).Info("sending invite")
inviteRes, err := oq.client.SendInviteV2(
context.TODO(), context.TODO(),
oq.destination, oq.destination,
*inviteReq, *inviteReq,
); err != nil { )
if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"state_key": ev.StateKey(), "state_key": ev.StateKey(),
"destination": oq.destination, "destination": oq.destination,
}).WithError(err).Error("failed to send invite") }).WithError(err).Error("failed to send invite")
continue
}
if _, err = oq.rsProducer.SendInviteResponse(
context.TODO(),
inviteRes,
roomVersion,
); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
}).WithError(err).Error("failed to return signed invite to roomserver")
} }
} }

View file

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -25,19 +26,25 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other // OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers // matrix servers
type OutgoingQueues struct { type OutgoingQueues struct {
origin gomatrixserverlib.ServerName rsProducer *producers.RoomserverProducer
client *gomatrixserverlib.FederationClient origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
// The queuesMutex protects queues // The queuesMutex protects queues
queuesMutex sync.Mutex queuesMutex sync.Mutex
queues map[gomatrixserverlib.ServerName]*destinationQueue queues map[gomatrixserverlib.ServerName]*destinationQueue
} }
// NewOutgoingQueues makes a new OutgoingQueues // NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues { func NewOutgoingQueues(
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
rsProducer *producers.RoomserverProducer,
) *OutgoingQueues {
return &OutgoingQueues{ return &OutgoingQueues{
origin: origin, rsProducer: rsProducer,
client: client, origin: origin,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, client: client,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
} }
} }
@ -67,6 +74,7 @@ func (oqs *OutgoingQueues) SendEvent(
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
oq = &destinationQueue{ oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin, origin: oqs.origin,
destination: destination, destination: destination,
client: oqs.client, client: oqs.client,
@ -111,6 +119,7 @@ func (oqs *OutgoingQueues) SendInvite(
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
oq = &destinationQueue{ oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin, origin: oqs.origin,
destination: destination, destination: destination,
client: oqs.client, client: oqs.client,
@ -151,6 +160,7 @@ func (oqs *OutgoingQueues) SendEDU(
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
oq = &destinationQueue{ oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin, origin: oqs.origin,
destination: destination, destination: destination,
client: oqs.client, client: oqs.client,

2
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538 github.com/matrix-org/gomatrixserverlib v0.0.0-20200428112024-9f47f9bfa4b2
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible

5
go.sum
View file

@ -367,8 +367,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538 h1:kj2LdNOdg2+vydS9HrPdbECEVeusRg9VTSOkYm61reA= github.com/matrix-org/gomatrixserverlib v0.0.0-20200428112024-9f47f9bfa4b2 h1:sy2QOqJhb4WXzq8bJhsCntAUYb64Dl6txsFtXWtxxSg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200428112024-9f47f9bfa4b2/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
@ -677,6 +677,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE= gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE=
gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc= gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View file

@ -229,6 +229,8 @@ type QueryStateAndAuthChainResponse struct {
// QueryBackfillRequest is a request to QueryBackfill. // QueryBackfillRequest is a request to QueryBackfill.
type QueryBackfillRequest struct { type QueryBackfillRequest struct {
// The room to backfill
RoomID string `json:"room_id"`
// Events to start paginating from. // Events to start paginating from.
EarliestEventsIDs []string `json:"earliest_event_ids"` EarliestEventsIDs []string `json:"earliest_event_ids"`
// The maximum number of events to retrieve. // The maximum number of events to retrieve.
@ -243,21 +245,7 @@ type QueryBackfillResponse struct {
Events []gomatrixserverlib.HeaderedEvent `json:"events"` Events []gomatrixserverlib.HeaderedEvent `json:"events"`
} }
// QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent // QueryRoomVersionCapabilitiesRequest asks for the default room version
type QueryServersInRoomAtEventRequest struct {
// ID of the room to retrieve member servers for.
RoomID string `json:"room_id"`
// ID of the event for which to retrieve member servers.
EventID string `json:"event_id"`
}
// QueryServersInRoomAtEventResponse is a response to QueryServersInRoomAtEvent
type QueryServersInRoomAtEventResponse struct {
// Servers present in the room for these events.
Servers []gomatrixserverlib.ServerName `json:"servers"`
}
// QueryRoomVersionCapabilities asks for the default room version
type QueryRoomVersionCapabilitiesRequest struct{} type QueryRoomVersionCapabilitiesRequest struct{}
// QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest // QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest
@ -266,12 +254,12 @@ type QueryRoomVersionCapabilitiesResponse struct {
AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"` AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"`
} }
// QueryRoomVersionForRoom asks for the room version for a given room. // QueryRoomVersionForRoomRequest asks for the room version for a given room.
type QueryRoomVersionForRoomRequest struct { type QueryRoomVersionForRoomRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
} }
// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse // QueryRoomVersionForRoomResponse is a response to QueryRoomVersionForRoomRequest
type QueryRoomVersionForRoomResponse struct { type QueryRoomVersionForRoomResponse struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
} }
@ -350,12 +338,6 @@ type RoomserverQueryAPI interface {
response *QueryBackfillResponse, response *QueryBackfillResponse,
) error ) error
QueryServersInRoomAtEvent(
ctx context.Context,
request *QueryServersInRoomAtEventRequest,
response *QueryServersInRoomAtEventResponse,
) error
// Asks for the default room version as preferred by the server. // Asks for the default room version as preferred by the server.
QueryRoomVersionCapabilities( QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,
@ -401,13 +383,10 @@ const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthC
// RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API // RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API
const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill" const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill"
// RoomserverQueryServersInRoomAtEventPath is the HTTP path for the QueryServersInRoomAtEvent API
const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInRoomAtEvents"
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API // RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API
const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities" const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities"
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API // RoomserverQueryRoomVersionForRoomPath is the HTTP path for the QueryRoomVersionForRoom API
const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom" const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
@ -555,19 +534,6 @@ func (h *httpRoomserverQueryAPI) QueryBackfill(
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// QueryServersInRoomAtEvent implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
ctx context.Context,
request *QueryServersInRoomAtEventRequest,
response *QueryServersInRoomAtEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryServersInRoomAtEvent")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryRoomVersionCapabilities implements RoomServerQueryAPI // QueryRoomVersionCapabilities implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities( func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -53,6 +54,7 @@ func processRoomEvent(
// Check that the event passes authentication checks and work out the numeric IDs for the auth events. // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
if err != nil { if err != nil {
logrus.WithError(err).WithField("event_id", event.EventID()).Error("processRoomEvent.checkAuthEvents failed for event")
return return
} }
@ -77,6 +79,7 @@ func processRoomEvent(
// For outliers we can stop after we've stored the event itself as it // For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to // doesn't have any associated state to store and we don't need to
// notify anyone about it. // notify anyone about it.
logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier")
return event.EventID(), nil return event.EventID(), nil
} }
@ -89,11 +92,6 @@ func processRoomEvent(
} }
} }
if input.Kind == api.KindBackfill {
// Backfill is not implemented.
panic("Not implemented")
}
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return event.EventID(), updateLatestEvents( return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,

View file

@ -0,0 +1,217 @@
package query
import (
"context"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// backfillRequester implements gomatrixserverlib.BackfillRequester
type backfillRequester struct {
db storage.Database
fedClient *gomatrixserverlib.FederationClient
thisServer gomatrixserverlib.ServerName
// per-request state
servers []gomatrixserverlib.ServerName
eventIDToBeforeStateIDs map[string][]string
eventIDMap map[string]gomatrixserverlib.Event
}
func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester {
return &backfillRequester{
db: db,
fedClient: fedClient,
thisServer: thisServer,
eventIDToBeforeStateIDs: make(map[string][]string),
eventIDMap: make(map[string]gomatrixserverlib.Event),
}
}
func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) {
b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap()
if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok {
return ids, nil
}
// if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event.
// Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or
// we don't know the result of state res to merge forks (2 or more prev_events)
if len(targetEvent.PrevEventIDs()) == 1 {
prevEventID := targetEvent.PrevEventIDs()[0]
prevEvent, ok := b.eventIDMap[prevEventID]
if !ok {
goto FederationHit
}
prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID]
if !ok {
goto FederationHit
}
newStateIDs := b.calculateNewStateIDs(targetEvent.Unwrap(), prevEvent, prevEventStateIDs)
if newStateIDs != nil {
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs, nil
}
// else we failed to calculate the new state, so fallthrough
}
FederationHit:
var lastErr error
logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event")
for _, srv := range b.servers { // hit any valid server
c := gomatrixserverlib.FederatedStateProvider{
FedClient: b.fedClient,
AuthEventsOnly: false,
Server: srv,
}
res, err := c.StateIDsBeforeEvent(ctx, targetEvent)
if err != nil {
lastErr = err
continue
}
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
return res, nil
}
return nil, lastErr
}
func (b *backfillRequester) calculateNewStateIDs(targetEvent, prevEvent gomatrixserverlib.Event, prevEventStateIDs []string) []string {
newStateIDs := prevEventStateIDs[:]
if prevEvent.StateKey() == nil {
// state is the same as the previous event
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs
}
missingState := false // true if we are missing the info for a state event ID
foundEvent := false // true if we found a (type, state_key) match
// find which state ID to replace, if any
for i, id := range newStateIDs {
ev, ok := b.eventIDMap[id]
if !ok {
missingState = true
continue
}
// The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself
if ev.Type() == prevEvent.Type() && ev.StateKey() != nil && *ev.StateKey() == *prevEvent.StateKey() {
newStateIDs[i] = prevEvent.EventID()
foundEvent = true
break
}
}
if !foundEvent && !missingState {
// we can be certain that this is new state
newStateIDs = append(newStateIDs, prevEvent.EventID())
foundEvent = true
}
if foundEvent {
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs
}
return nil
}
func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) {
// try to fetch the events from the database first
events, err := b.ProvideEvents(roomVer, eventIDs)
if err != nil {
// non-fatal, fallthrough
logrus.WithError(err).Info("Failed to fetch events")
} else {
logrus.Infof("Fetched %d/%d events from the database", len(events), len(eventIDs))
if len(events) == len(eventIDs) {
result := make(map[string]*gomatrixserverlib.Event)
for i := range events {
result[events[i].EventID()] = &events[i]
b.eventIDMap[events[i].EventID()] = events[i]
}
return result, nil
}
}
c := gomatrixserverlib.FederatedStateProvider{
FedClient: b.fedClient,
AuthEventsOnly: false,
Server: b.servers[0],
}
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
if err != nil {
return nil, err
}
for eventID, ev := range result {
b.eventIDMap[eventID] = *ev
}
return result, nil
}
// ServersAtEvent is called when trying to determine which server to request from.
// It returns a list of servers which can be queried for backfill requests. These servers
// will be servers that are in the room already. The entries at the beginning are preferred servers
// and will be tried first. An empty list will fail the request.
func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) {
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
// the event is necessary.
NIDs, err := b.db.EventNIDs(ctx, []string{eventID})
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event")
return
}
// Retrieve all "m.room.member" state events of "join" membership, which
// contains the list of users in the room before the event, therefore all
// the servers in it at that moment.
events, err := getMembershipsBeforeEventNID(ctx, b.db, NIDs[eventID], true)
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
return
}
// Store the server names in a temporary map to avoid duplicates.
serverSet := make(map[gomatrixserverlib.ServerName]bool)
for _, event := range events {
serverSet[event.Origin()] = true
}
for server := range serverSet {
if server == b.thisServer {
continue
}
servers = append(servers, server)
}
b.servers = servers
return
}
// Backfill performs a backfill request to the given server.
// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) {
tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs)
return &tx, err
}
func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) {
ctx := context.Background()
nidMap, err := b.db.EventNIDs(ctx, eventIDs)
if err != nil {
logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events")
return nil, err
}
eventNIDs := make([]types.EventNID, len(nidMap))
i := 0
for _, nid := range nidMap {
eventNIDs[i] = nid
i++
}
eventsWithNids, err := b.db.Events(ctx, eventNIDs)
if err != nil {
logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
return nil, err
}
events := make([]gomatrixserverlib.Event, len(eventsWithNids))
for i := range eventsWithNids {
events[i] = eventsWithNids[i].Event
}
return events, nil
}

View file

@ -19,6 +19,7 @@ package query
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -31,12 +32,16 @@ import (
"github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI // RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
type RoomserverQueryAPI struct { type RoomserverQueryAPI struct {
DB storage.Database DB storage.Database
ImmutableCache caching.ImmutableCache ImmutableCache caching.ImmutableCache
ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier
FedClient *gomatrixserverlib.FederationClient
} }
// QueryLatestEventsAndState implements api.RoomserverQueryAPI // QueryLatestEventsAndState implements api.RoomserverQueryAPI
@ -281,7 +286,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
events, err = r.DB.Events(ctx, eventNIDs) events, err = r.DB.Events(ctx, eventNIDs)
} else { } else {
events, err = r.getMembershipsBeforeEventNID(ctx, membershipEventNID, request.JoinedOnly) events, err = getMembershipsBeforeEventNID(ctx, r.DB, membershipEventNID, request.JoinedOnly)
} }
if err != nil { if err != nil {
@ -300,19 +305,19 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
// of the event's room as it was when this event was fired, then filters the state events to // of the event's room as it was when this event was fired, then filters the state events to
// only keep the "m.room.member" events with a "join" membership. These events are returned. // only keep the "m.room.member" events with a "join" membership. These events are returned.
// Returns an error if there was an issue fetching the events. // Returns an error if there was an issue fetching the events.
func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( func getMembershipsBeforeEventNID(
ctx context.Context, eventNID types.EventNID, joinedOnly bool, ctx context.Context, db storage.Database, eventNID types.EventNID, joinedOnly bool,
) ([]types.Event, error) { ) ([]types.Event, error) {
roomState := state.NewStateResolution(r.DB) roomState := state.NewStateResolution(db)
events := []types.Event{} events := []types.Event{}
// Lookup the event NID // Lookup the event NID
eIDs, err := r.DB.EventIDs(ctx, []types.EventNID{eventNID}) eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventIDs := []string{eIDs[eventNID]} eventIDs := []string{eIDs[eventNID]}
prevState, err := r.DB.StateAtEventIDs(ctx, eventIDs) prevState, err := db.StateAtEventIDs(ctx, eventIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -332,7 +337,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
} }
// Get all of the events in this state // Get all of the events in this state
stateEvents, err := r.DB.Events(ctx, eventNIDs) stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -484,6 +489,13 @@ func (r *RoomserverQueryAPI) QueryBackfill(
request *api.QueryBackfillRequest, request *api.QueryBackfillRequest,
response *api.QueryBackfillResponse, response *api.QueryBackfillResponse,
) error { ) error {
// if we are requesting the backfill then we need to do a federation hit
// TODO: we could be more sensible and fetch as many events we already have then request the rest
// which is what the syncapi does already.
if request.ServerName == r.ServerName {
return r.backfillViaFederation(ctx, request, response)
}
// someone else is requesting the backfill, try to service their request.
var err error var err error
var front []string var front []string
@ -525,6 +537,55 @@ func (r *RoomserverQueryAPI) QueryBackfill(
return err return err
} }
func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
if err != nil {
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
}
requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName)
events, err := gomatrixserverlib.RequestBackfill(
ctx, requester,
r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
if err != nil {
return err
}
logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
// persist these new events - auth checks have already been done
roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events)
if err != nil {
return err
}
for _, ev := range backfilledEventMap {
// now add state for these events
stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()]
if !ok {
// this should be impossible as all events returned must have pass Step 5 of the PDU checks
// which requires a list of state IDs.
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to find state IDs for event which passed auth checks")
continue
}
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
return err
}
var beforeStateSnapshotNID types.StateSnapshotNID
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
return err
}
if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set state before event")
}
}
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
res.Events = events
return nil
}
func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
roomNID, err := r.DB.RoomNID(ctx, roomID) roomNID, err := r.DB.RoomNID(ctx, roomID)
if err != nil { if err != nil {
@ -778,39 +839,33 @@ func getAuthChain(
return authEvents, nil return authEvents, nil
} }
// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent( var roomNID types.RoomNID
ctx context.Context, backfilledEventMap := make(map[string]types.Event)
request *api.QueryServersInRoomAtEventRequest, for _, ev := range events {
response *api.QueryServersInRoomAtEventResponse, nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
) error { if err != nil { // this shouldn't happen as RequestBackfill already found them
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
// the event is necessary. continue
NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID}) }
if err != nil { authNids := make([]types.EventNID, len(nidMap))
return err i := 0
for _, nid := range nidMap {
authNids[i] = nid
i++
}
var stateAtEvent types.StateAtEvent
roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event")
continue
}
backfilledEventMap[ev.EventID()] = types.Event{
EventNID: stateAtEvent.StateEntry.EventNID,
Event: ev.Unwrap(),
}
} }
return roomNID, backfilledEventMap
// Retrieve all "m.room.member" state events of "join" membership, which
// contains the list of users in the room before the event, therefore all
// the servers in it at that moment.
events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true)
if err != nil {
return err
}
// Store the server names in a temporary map to avoid duplicates.
servers := make(map[gomatrixserverlib.ServerName]bool)
for _, event := range events {
servers[event.Origin()] = true
}
// Populate the response.
for server := range servers {
response.Servers = append(response.Servers, server)
}
return nil
} }
// QueryRoomVersionCapabilities implements api.RoomserverQueryAPI // QueryRoomVersionCapabilities implements api.RoomserverQueryAPI
@ -994,20 +1049,6 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
servMux.Handle(
api.RoomserverQueryServersInRoomAtEventPath,
common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse {
var request api.QueryServersInRoomAtEventRequest
var response api.QueryServersInRoomAtEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle( servMux.Handle(
api.RoomserverQueryRoomVersionCapabilitiesPath, api.RoomserverQueryRoomVersionCapabilitiesPath,
common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse { common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse {

View file

@ -18,6 +18,7 @@ import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
asQuery "github.com/matrix-org/dendrite/appservice/query" asQuery "github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
@ -33,7 +34,7 @@ import (
// allowing other components running in the same process to hit the query the // allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP. // APIs directly instead of having to use HTTP.
func SetupRoomServerComponent( func SetupRoomServerComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite, keyRing gomatrixserverlib.JSONVerifier,
) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) { ) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) {
roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer), base.Cfg.DbProperties()) roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer), base.Cfg.DbProperties())
if err != nil { if err != nil {
@ -51,6 +52,11 @@ func SetupRoomServerComponent(
queryAPI := query.RoomserverQueryAPI{ queryAPI := query.RoomserverQueryAPI{
DB: roomserverDB, DB: roomserverDB,
ImmutableCache: base.ImmutableCache, ImmutableCache: base.ImmutableCache,
ServerName: base.Cfg.Matrix.ServerName,
FedClient: base.CreateFederationClient(),
// TODO: We should have a key server so we don't keep adding components
// which talk to the same DB.
KeyRing: keyRing,
} }
queryAPI.SetupHTTP(http.DefaultServeMux) queryAPI.SetupHTTP(http.DefaultServeMux)

View file

@ -31,7 +31,8 @@ type Database interface {
state []types.StateEntry, state []types.StateEntry,
) (types.StateSnapshotNID, error) ) (types.StateSnapshotNID, error)
// Look up the state of a room at each event for a list of string event IDs. // Look up the state of a room at each event for a list of string event IDs.
// Returns an error if there is an error talking to the database // Returns an error if there is an error talking to the database.
// The length of []types.StateAtEvent is guaranteed to equal the length of eventIDs if no error is returned.
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database // Returns a types.MissingEventError if the room state for the event IDs aren't in the database
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error) StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
// Look up the numeric IDs for a list of string event types. // Look up the numeric IDs for a list of string event types.

View file

@ -210,7 +210,10 @@ func (r *messagesReq) retrieveEvents() (
} }
// Sort the events to ensure we send them in the right order. // Sort the events to ensure we send them in the right order.
events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(events) events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(
events,
gomatrixserverlib.TopologicalOrderByPrevEvents,
)
if r.backwardOrdering { if r.backwardOrdering {
// This reverses the array from old->new to new->old // This reverses the array from old->new to new->old
sort.SliceStable(events, func(i, j int) bool { sort.SliceStable(events, func(i, j int) bool {
@ -282,7 +285,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// Check if we have backward extremities for this room. // Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 { if len(backwardExtremities) > 0 {
// If so, retrieve as much events as needed through backfilling. // If so, retrieve as much events as needed through backfilling.
events, err = r.backfill(backwardExtremities, r.limit) events, err = r.backfill(r.roomID, backwardExtremities, r.limit)
if err != nil { if err != nil {
return return
} }
@ -331,7 +334,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.HeaderedEvent var pdus []gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit. // Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents)) pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
if err != nil { if err != nil {
return return
} }
@ -355,45 +358,29 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// event, or if there is no remote homeserver to contact. // event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in // Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request. // the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: r.roomID} var res api.QueryBackfillResponse
verRes := api.QueryRoomVersionForRoomResponse{} err := r.queryAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
if err := r.queryAPI.QueryRoomVersionForRoom(r.ctx, &verReq, &verRes); err != nil { RoomID: roomID,
return nil, err EarliestEventsIDs: fromEventIDs,
} Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs) }, &res)
if err != nil { if err != nil {
return nil, fmt.Errorf("Cannot find server to backfill from: %w", err) return nil, fmt.Errorf("QueryBackfill failed: %w", err)
} }
util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")
headered := make([]gomatrixserverlib.HeaderedEvent, 0) // TODO: we should only be inserting events into the database from the roomserver's kafka output stream.
// Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
// If the roomserver responded with at least one server that isn't us, // when you have multiple entry points to write events.
// send it a request for backfill.
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
txn, err := r.federation.Backfill(
r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
)
if err != nil {
return nil, err
}
for _, p := range txn.PDUs {
event, e := gomatrixserverlib.NewEventFromUntrustedJSON(p, verRes.RoomVersion)
if e != nil {
continue
}
headered = append(headered, event.Headered(verRes.RoomVersion))
}
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(headered)).Info("Storing new events from backfill")
// Store the events in the database, while marking them as unfit to show // Store the events in the database, while marking them as unfit to show
// up in responses to sync requests. // up in responses to sync requests.
for i := range headered { for i := range res.Events {
if _, err = r.db.WriteEvent( if _, err = r.db.WriteEvent(
r.ctx, r.ctx,
&headered[i], &res.Events[i],
[]gomatrixserverlib.HeaderedEvent{}, []gomatrixserverlib.HeaderedEvent{},
[]string{}, []string{},
[]string{}, []string{},
@ -403,63 +390,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
} }
} }
return headered, nil return res.Events, nil
}
func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
serversRequest := api.QueryServersInRoomAtEventRequest{
RoomID: r.roomID,
EventID: fromEventIDs[0],
}
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
// FIXME: We shouldn't be doing this but in situations where we have already backfilled once
// the query API doesn't work as backfilled events do not make it to the room server.
// This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
// We need to inject backfilled events into the room server and store them appropriately.
events, err := r.db.Events(r.ctx, fromEventIDs)
if err != nil {
return "", err
}
if len(events) == 0 {
// should be impossible as these event IDs are backwards extremities
return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
}
// The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
// We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
for _, e := range events {
_, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
if srverr != nil {
util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
continue
}
if srv != r.cfg.Matrix.ServerName {
return srv, nil
}
}
// no valid events which have a remote server, fail.
return "", err
}
// Use the first server from the response, except if that server is us.
// In that case, use the second one if the roomserver responded with
// enough servers. If not, use an empty string to prevent the backfill
// from happening as there's no server to direct the request towards.
// TODO: Be smarter at selecting the server to direct the request
// towards.
srvToBackfillFrom := serversResponse.Servers[0]
if srvToBackfillFrom == r.cfg.Matrix.ServerName {
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
return "", nil
}
}
return srvToBackfillFrom, nil
} }
// setToDefault returns the default value for the "to" query parameter of a // setToDefault returns the default value for the "to" query parameter of a

View file

@ -253,3 +253,8 @@ User can invite local user to room with version 3
User can invite local user to room with version 4 User can invite local user to room with version 4
A pair of servers can establish a join in a v2 room A pair of servers can establish a join in a v2 room
Can logout all devices Can logout all devices
State from remote users is included in the timeline in an incremental sync
User can invite remote user to room with version 1
User can invite remote user to room with version 2
User can invite remote user to room with version 3
User can invite remote user to room with version 4