Initial cut for backfilling

The syncserver now asks the roomserver via QueryBackfill (which already
existed to *handle* backfill requests) which then makes federation requests
via gomatrixserverlib.RequestBackfill.

Currently, tests fail on subsequent /messages requests because we don't know
which servers are in the room, because we are unable to get state snapshots
from a backfilled event because that code doesn't exist yet.
This commit is contained in:
Kegan Dougal 2020-04-23 18:05:55 +01:00
parent c30b12b5a1
commit 3ab24c3afd
12 changed files with 170 additions and 191 deletions

View file

@ -148,7 +148,7 @@ func main() {
federation := createFederationClient(base) federation := createFederationClient(base)
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := roomserver.SetupRoomServerComponent(&base.Base) alias, input, query := roomserver.SetupRoomServerComponent(&base.Base, keyRing)
eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),

View file

@ -57,7 +57,7 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := roomserver.SetupRoomServerComponent(base) alias, input, query := roomserver.SetupRoomServerComponent(base, keyRing)
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),

View file

@ -18,6 +18,7 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
) )
@ -25,8 +26,11 @@ func main() {
cfg := basecomponent.ParseFlags() cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "RoomServerAPI") base := basecomponent.NewBaseDendrite(cfg, "RoomServerAPI")
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
keyDB := base.CreateKeyDB()
federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
roomserver.SetupRoomServerComponent(base) roomserver.SetupRoomServerComponent(base, keyRing)
base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer)) base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer))

View file

@ -69,6 +69,7 @@ func Backfill(
// Populate the request. // Populate the request.
req := api.QueryBackfillRequest{ req := api.QueryBackfillRequest{
RoomID: roomID,
EarliestEventsIDs: eIDs, EarliestEventsIDs: eIDs,
ServerName: request.Origin(), ServerName: request.Origin(),
} }

2
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible

2
go.sum
View file

@ -369,6 +369,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:km
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 h1:xis1ojN99vjygwqudzB9VQq3cM2SJ7aCAMlXj/YN+88= github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 h1:xis1ojN99vjygwqudzB9VQq3cM2SJ7aCAMlXj/YN+88=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799 h1:OsoUMTirIpeuZJdYkKKiYe6jm0E5viZR7aOS9K465QI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=

View file

@ -229,6 +229,8 @@ type QueryStateAndAuthChainResponse struct {
// QueryBackfillRequest is a request to QueryBackfill. // QueryBackfillRequest is a request to QueryBackfill.
type QueryBackfillRequest struct { type QueryBackfillRequest struct {
// The room to backfill
RoomID string `json:"room_id"`
// Events to start paginating from. // Events to start paginating from.
EarliestEventsIDs []string `json:"earliest_event_ids"` EarliestEventsIDs []string `json:"earliest_event_ids"`
// The maximum number of events to retrieve. // The maximum number of events to retrieve.
@ -243,21 +245,7 @@ type QueryBackfillResponse struct {
Events []gomatrixserverlib.HeaderedEvent `json:"events"` Events []gomatrixserverlib.HeaderedEvent `json:"events"`
} }
// QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent // QueryRoomVersionCapabilitiesRequest asks for the default room version
type QueryServersInRoomAtEventRequest struct {
// ID of the room to retrieve member servers for.
RoomID string `json:"room_id"`
// ID of the event for which to retrieve member servers.
EventID string `json:"event_id"`
}
// QueryServersInRoomAtEventResponse is a response to QueryServersInRoomAtEvent
type QueryServersInRoomAtEventResponse struct {
// Servers present in the room for these events.
Servers []gomatrixserverlib.ServerName `json:"servers"`
}
// QueryRoomVersionCapabilities asks for the default room version
type QueryRoomVersionCapabilitiesRequest struct{} type QueryRoomVersionCapabilitiesRequest struct{}
// QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest // QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest
@ -266,12 +254,12 @@ type QueryRoomVersionCapabilitiesResponse struct {
AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"` AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"`
} }
// QueryRoomVersionForRoom asks for the room version for a given room. // QueryRoomVersionForRoomRequest asks for the room version for a given room.
type QueryRoomVersionForRoomRequest struct { type QueryRoomVersionForRoomRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
} }
// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse // QueryRoomVersionForRoomResponse is a response to QueryRoomVersionForRoomRequest
type QueryRoomVersionForRoomResponse struct { type QueryRoomVersionForRoomResponse struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
} }
@ -350,12 +338,6 @@ type RoomserverQueryAPI interface {
response *QueryBackfillResponse, response *QueryBackfillResponse,
) error ) error
QueryServersInRoomAtEvent(
ctx context.Context,
request *QueryServersInRoomAtEventRequest,
response *QueryServersInRoomAtEventResponse,
) error
// Asks for the default room version as preferred by the server. // Asks for the default room version as preferred by the server.
QueryRoomVersionCapabilities( QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,
@ -401,13 +383,10 @@ const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthC
// RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API // RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API
const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill" const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill"
// RoomserverQueryServersInRoomAtEventPath is the HTTP path for the QueryServersInRoomAtEvent API
const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInRoomAtEvents"
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API // RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API
const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities" const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities"
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API // RoomserverQueryRoomVersionForRoomPath is the HTTP path for the QueryRoomVersionForRoom API
const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom" const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
@ -555,19 +534,6 @@ func (h *httpRoomserverQueryAPI) QueryBackfill(
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// QueryServersInRoomAtEvent implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
ctx context.Context,
request *QueryServersInRoomAtEventRequest,
response *QueryServersInRoomAtEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryServersInRoomAtEvent")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryRoomVersionCapabilities implements RoomServerQueryAPI // QueryRoomVersionCapabilities implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities( func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state/database" "github.com/matrix-org/dendrite/roomserver/state/database"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -104,6 +105,7 @@ func processRoomEvent(
// Check that the event passes authentication checks and work out the numeric IDs for the auth events. // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
if err != nil { if err != nil {
logrus.WithError(err).WithField("event_id", event.EventID()).Error("processRoomEvent.checkAuthEvents failed for event")
return return
} }
@ -128,6 +130,7 @@ func processRoomEvent(
// For outliers we can stop after we've stored the event itself as it // For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to // doesn't have any associated state to store and we don't need to
// notify anyone about it. // notify anyone about it.
logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier")
return event.EventID(), nil return event.EventID(), nil
} }

View file

@ -0,0 +1,86 @@
package query
import (
"context"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// backfillRequester implements gomatrixserverlib.BackfillRequester
type backfillRequester struct {
db RoomserverQueryAPIDatabase
fedClient *gomatrixserverlib.FederationClient
thisServer gomatrixserverlib.ServerName
}
// ServersAtEvent is called when trying to determine which server to request from.
// It returns a list of servers which can be queried for backfill requests. These servers
// will be servers that are in the room already. The entries at the beginning are preferred servers
// and will be tried first. An empty list will fail the request.
func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) {
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
// the event is necessary.
NIDs, err := b.db.EventNIDs(ctx, []string{eventID})
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event")
return
}
// Retrieve all "m.room.member" state events of "join" membership, which
// contains the list of users in the room before the event, therefore all
// the servers in it at that moment.
events, err := getMembershipsBeforeEventNID(ctx, b.db, NIDs[eventID], true)
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
return
}
// Store the server names in a temporary map to avoid duplicates.
serverSet := make(map[gomatrixserverlib.ServerName]bool)
for _, event := range events {
serverSet[event.Origin()] = true
}
for server := range serverSet {
if server == b.thisServer {
continue
}
servers = append(servers, server)
}
return
}
// Backfill performs a backfill request to the given server.
// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) {
tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs)
return &tx, err
}
func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) {
logrus.Info("backfillRequester.ProvideEvents ", eventIDs)
ctx := context.Background()
nidMap, err := b.db.EventNIDs(ctx, eventIDs)
if err != nil {
logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events")
return nil, err
}
eventNIDs := make([]types.EventNID, len(nidMap))
i := 0
for _, nid := range nidMap {
eventNIDs[i] = nid
i++
}
eventsWithNids, err := b.db.Events(ctx, eventNIDs)
if err != nil {
logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
return nil, err
}
events := make([]gomatrixserverlib.Event, len(eventsWithNids))
for i := range eventsWithNids {
events[i] = eventsWithNids[i].Event
}
logrus.Infof("backfillRequester.ProvideEvents Returning %+v", events)
return events, nil
}

View file

@ -19,6 +19,7 @@ package query
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -31,6 +32,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// RoomserverQueryAPIEventDB has a convenience API to fetch events directly by // RoomserverQueryAPIEventDB has a convenience API to fetch events directly by
@ -100,6 +102,9 @@ type RoomserverQueryAPIDatabase interface {
type RoomserverQueryAPI struct { type RoomserverQueryAPI struct {
DB RoomserverQueryAPIDatabase DB RoomserverQueryAPIDatabase
ImmutableCache caching.ImmutableCache ImmutableCache caching.ImmutableCache
ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier
FedClient *gomatrixserverlib.FederationClient
} }
// QueryLatestEventsAndState implements api.RoomserverQueryAPI // QueryLatestEventsAndState implements api.RoomserverQueryAPI
@ -344,7 +349,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
events, err = r.DB.Events(ctx, eventNIDs) events, err = r.DB.Events(ctx, eventNIDs)
} else { } else {
events, err = r.getMembershipsBeforeEventNID(ctx, membershipEventNID, request.JoinedOnly) events, err = getMembershipsBeforeEventNID(ctx, r.DB, membershipEventNID, request.JoinedOnly)
} }
if err != nil { if err != nil {
@ -363,19 +368,19 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
// of the event's room as it was when this event was fired, then filters the state events to // of the event's room as it was when this event was fired, then filters the state events to
// only keep the "m.room.member" events with a "join" membership. These events are returned. // only keep the "m.room.member" events with a "join" membership. These events are returned.
// Returns an error if there was an issue fetching the events. // Returns an error if there was an issue fetching the events.
func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( func getMembershipsBeforeEventNID(
ctx context.Context, eventNID types.EventNID, joinedOnly bool, ctx context.Context, db RoomserverQueryAPIDatabase, eventNID types.EventNID, joinedOnly bool,
) ([]types.Event, error) { ) ([]types.Event, error) {
roomState := state.NewStateResolution(r.DB) roomState := state.NewStateResolution(db)
events := []types.Event{} events := []types.Event{}
// Lookup the event NID // Lookup the event NID
eIDs, err := r.DB.EventIDs(ctx, []types.EventNID{eventNID}) eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventIDs := []string{eIDs[eventNID]} eventIDs := []string{eIDs[eventNID]}
prevState, err := r.DB.StateAtEventIDs(ctx, eventIDs) prevState, err := db.StateAtEventIDs(ctx, eventIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -395,7 +400,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
} }
// Get all of the events in this state // Get all of the events in this state
stateEvents, err := r.DB.Events(ctx, eventNIDs) stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -547,6 +552,13 @@ func (r *RoomserverQueryAPI) QueryBackfill(
request *api.QueryBackfillRequest, request *api.QueryBackfillRequest,
response *api.QueryBackfillResponse, response *api.QueryBackfillResponse,
) error { ) error {
logrus.WithField("req", request).Info("QueryBackfill - I am " + r.ServerName)
// if we are requesting the backfill then we need to do a federation hit
// TODO: we could be more sensible and fetch as many events we already have then request the rest
// which is what the syncapi does already.
if request.ServerName == r.ServerName {
return r.backfillViaFederation(ctx, request, response)
}
var err error var err error
var front []string var front []string
@ -588,6 +600,26 @@ func (r *RoomserverQueryAPI) QueryBackfill(
return err return err
} }
func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
if err != nil {
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
}
events, err := gomatrixserverlib.RequestBackfill(ctx, &backfillRequester{
db: r.DB,
fedClient: r.FedClient,
thisServer: r.ServerName,
}, r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
if err != nil {
return err
}
// TODO: persist these new events and update the state db so we can get state snapshots at these new backfilled events
// this will be important if we want to backfill multiple times as we get the join memberships from state snapshots.
res.Events = events
return nil
}
func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
roomNID, err := r.DB.RoomNID(ctx, roomID) roomNID, err := r.DB.RoomNID(ctx, roomID)
if err != nil { if err != nil {
@ -839,41 +871,6 @@ func getAuthChain(
return authEvents, nil return authEvents, nil
} }
// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent(
ctx context.Context,
request *api.QueryServersInRoomAtEventRequest,
response *api.QueryServersInRoomAtEventResponse,
) error {
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
// the event is necessary.
NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID})
if err != nil {
return err
}
// Retrieve all "m.room.member" state events of "join" membership, which
// contains the list of users in the room before the event, therefore all
// the servers in it at that moment.
events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true)
if err != nil {
return err
}
// Store the server names in a temporary map to avoid duplicates.
servers := make(map[gomatrixserverlib.ServerName]bool)
for _, event := range events {
servers[event.Origin()] = true
}
// Populate the response.
for server := range servers {
response.Servers = append(response.Servers, server)
}
return nil
}
// QueryRoomVersionCapabilities implements api.RoomserverQueryAPI // QueryRoomVersionCapabilities implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryRoomVersionCapabilities( func (r *RoomserverQueryAPI) QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,
@ -1055,20 +1052,6 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
servMux.Handle(
api.RoomserverQueryServersInRoomAtEventPath,
common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse {
var request api.QueryServersInRoomAtEventRequest
var response api.QueryServersInRoomAtEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle( servMux.Handle(
api.RoomserverQueryRoomVersionCapabilitiesPath, api.RoomserverQueryRoomVersionCapabilitiesPath,
common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse { common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse {

View file

@ -18,6 +18,7 @@ import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
asQuery "github.com/matrix-org/dendrite/appservice/query" asQuery "github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
@ -33,7 +34,7 @@ import (
// allowing other components running in the same process to hit the query the // allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP. // APIs directly instead of having to use HTTP.
func SetupRoomServerComponent( func SetupRoomServerComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite, keyRing gomatrixserverlib.JSONVerifier,
) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) { ) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) {
roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer)) roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer))
if err != nil { if err != nil {
@ -51,6 +52,11 @@ func SetupRoomServerComponent(
queryAPI := query.RoomserverQueryAPI{ queryAPI := query.RoomserverQueryAPI{
DB: roomserverDB, DB: roomserverDB,
ImmutableCache: base.ImmutableCache, ImmutableCache: base.ImmutableCache,
ServerName: base.Cfg.Matrix.ServerName,
FedClient: base.CreateFederationClient(),
// TODO: We should have a key server so we don't keep adding components
// which talk to the same DB.
KeyRing: keyRing,
} }
queryAPI.SetupHTTP(http.DefaultServeMux) queryAPI.SetupHTTP(http.DefaultServeMux)

View file

@ -282,7 +282,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// Check if we have backward extremities for this room. // Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 { if len(backwardExtremities) > 0 {
// If so, retrieve as much events as needed through backfilling. // If so, retrieve as much events as needed through backfilling.
events, err = r.backfill(backwardExtremities, r.limit) events, err = r.backfill(r.roomID, backwardExtremities, r.limit)
if err != nil { if err != nil {
return return
} }
@ -331,7 +331,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.HeaderedEvent var pdus []gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit. // Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents)) pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
if err != nil { if err != nil {
return return
} }
@ -355,45 +355,29 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// event, or if there is no remote homeserver to contact. // event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in // Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request. // the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: r.roomID} var res api.QueryBackfillResponse
verRes := api.QueryRoomVersionForRoomResponse{} err := r.queryAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
if err := r.queryAPI.QueryRoomVersionForRoom(r.ctx, &verReq, &verRes); err != nil { RoomID: roomID,
return nil, err EarliestEventsIDs: fromEventIDs,
} Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs) }, &res)
if err != nil { if err != nil {
return nil, fmt.Errorf("Cannot find server to backfill from: %w", err) return nil, fmt.Errorf("QueryBackfill failed: %w", err)
} }
util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")
headered := make([]gomatrixserverlib.HeaderedEvent, 0) // TODO: we should only be inserting events into the database from the roomserver's kafka output stream.
// Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
// If the roomserver responded with at least one server that isn't us, // when you have multiple entry points to write events.
// send it a request for backfill.
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
txn, err := r.federation.Backfill(
r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
)
if err != nil {
return nil, err
}
for _, p := range txn.PDUs {
event, e := gomatrixserverlib.NewEventFromUntrustedJSON(p, verRes.RoomVersion)
if e != nil {
continue
}
headered = append(headered, event.Headered(verRes.RoomVersion))
}
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(headered)).Info("Storing new events from backfill")
// Store the events in the database, while marking them as unfit to show // Store the events in the database, while marking them as unfit to show
// up in responses to sync requests. // up in responses to sync requests.
for i := range headered { for i := range res.Events {
if _, err = r.db.WriteEvent( if _, err = r.db.WriteEvent(
r.ctx, r.ctx,
&headered[i], &res.Events[i],
[]gomatrixserverlib.HeaderedEvent{}, []gomatrixserverlib.HeaderedEvent{},
[]string{}, []string{},
[]string{}, []string{},
@ -403,63 +387,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
} }
} }
return headered, nil return res.Events, nil
}
func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
serversRequest := api.QueryServersInRoomAtEventRequest{
RoomID: r.roomID,
EventID: fromEventIDs[0],
}
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
// FIXME: We shouldn't be doing this but in situations where we have already backfilled once
// the query API doesn't work as backfilled events do not make it to the room server.
// This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
// We need to inject backfilled events into the room server and store them appropriately.
events, err := r.db.Events(r.ctx, fromEventIDs)
if err != nil {
return "", err
}
if len(events) == 0 {
// should be impossible as these event IDs are backwards extremities
return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
}
// The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
// We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
for _, e := range events {
_, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
if srverr != nil {
util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
continue
}
if srv != r.cfg.Matrix.ServerName {
return srv, nil
}
}
// no valid events which have a remote server, fail.
return "", err
}
// Use the first server from the response, except if that server is us.
// In that case, use the second one if the roomserver responded with
// enough servers. If not, use an empty string to prevent the backfill
// from happening as there's no server to direct the request towards.
// TODO: Be smarter at selecting the server to direct the request
// towards.
srvToBackfillFrom := serversResponse.Servers[0]
if srvToBackfillFrom == r.cfg.Matrix.ServerName {
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
return "", nil
}
}
return srvToBackfillFrom, nil
} }
// setToDefault returns the default value for the "to" query parameter of a // setToDefault returns the default value for the "to" query parameter of a