diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index f280c7483..f5fe2de5b 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -148,7 +148,7 @@ func main() { federation := createFederationClient(base) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) - alias, input, query := roomserver.SetupRoomServerComponent(&base.Base) + alias, input, query := roomserver.SetupRoomServerComponent(&base.Base, keyRing) eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New()) asQuery := appservice.SetupAppServiceAPIComponent( &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 6b0d83ae1..d63e3fd08 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -57,7 +57,7 @@ func main() { federation := base.CreateFederationClient() keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) - alias, input, query := roomserver.SetupRoomServerComponent(base) + alias, input, query := roomserver.SetupRoomServerComponent(base, keyRing) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), diff --git a/cmd/dendrite-room-server/main.go b/cmd/dendrite-room-server/main.go index 41b705755..f33a2b88f 100644 --- a/cmd/dendrite-room-server/main.go +++ b/cmd/dendrite-room-server/main.go @@ -18,6 +18,7 @@ import ( _ "net/http/pprof" "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/roomserver" ) @@ -25,8 +26,11 @@ func main() { cfg := basecomponent.ParseFlags() base := basecomponent.NewBaseDendrite(cfg, "RoomServerAPI") defer base.Close() // nolint: errcheck + keyDB := base.CreateKeyDB() + federation := base.CreateFederationClient() + keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) - roomserver.SetupRoomServerComponent(base) + roomserver.SetupRoomServerComponent(base, keyRing) base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer)) diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index 62471b8a9..eee590eca 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -69,6 +69,7 @@ func Backfill( // Populate the request. req := api.QueryBackfillRequest{ + RoomID: roomID, EarliestEventsIDs: eIDs, ServerName: request.Origin(), } diff --git a/go.mod b/go.mod index daf232781..6ec675ed4 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index c8f7e43fe..016b6c128 100644 --- a/go.sum +++ b/go.sum @@ -369,6 +369,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:km github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 h1:xis1ojN99vjygwqudzB9VQq3cM2SJ7aCAMlXj/YN+88= github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799 h1:OsoUMTirIpeuZJdYkKKiYe6jm0E5viZR7aOS9K465QI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= diff --git a/roomserver/api/query.go b/roomserver/api/query.go index b272b1ebd..11fa5c9ca 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -229,6 +229,8 @@ type QueryStateAndAuthChainResponse struct { // QueryBackfillRequest is a request to QueryBackfill. type QueryBackfillRequest struct { + // The room to backfill + RoomID string `json:"room_id"` // Events to start paginating from. EarliestEventsIDs []string `json:"earliest_event_ids"` // The maximum number of events to retrieve. @@ -243,21 +245,7 @@ type QueryBackfillResponse struct { Events []gomatrixserverlib.HeaderedEvent `json:"events"` } -// QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent -type QueryServersInRoomAtEventRequest struct { - // ID of the room to retrieve member servers for. - RoomID string `json:"room_id"` - // ID of the event for which to retrieve member servers. - EventID string `json:"event_id"` -} - -// QueryServersInRoomAtEventResponse is a response to QueryServersInRoomAtEvent -type QueryServersInRoomAtEventResponse struct { - // Servers present in the room for these events. - Servers []gomatrixserverlib.ServerName `json:"servers"` -} - -// QueryRoomVersionCapabilities asks for the default room version +// QueryRoomVersionCapabilitiesRequest asks for the default room version type QueryRoomVersionCapabilitiesRequest struct{} // QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest @@ -266,12 +254,12 @@ type QueryRoomVersionCapabilitiesResponse struct { AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"` } -// QueryRoomVersionForRoom asks for the room version for a given room. +// QueryRoomVersionForRoomRequest asks for the room version for a given room. type QueryRoomVersionForRoomRequest struct { RoomID string `json:"room_id"` } -// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse +// QueryRoomVersionForRoomResponse is a response to QueryRoomVersionForRoomRequest type QueryRoomVersionForRoomResponse struct { RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` } @@ -350,12 +338,6 @@ type RoomserverQueryAPI interface { response *QueryBackfillResponse, ) error - QueryServersInRoomAtEvent( - ctx context.Context, - request *QueryServersInRoomAtEventRequest, - response *QueryServersInRoomAtEventResponse, - ) error - // Asks for the default room version as preferred by the server. QueryRoomVersionCapabilities( ctx context.Context, @@ -401,13 +383,10 @@ const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthC // RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill" -// RoomserverQueryServersInRoomAtEventPath is the HTTP path for the QueryServersInRoomAtEvent API -const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInRoomAtEvents" - // RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities" -// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API +// RoomserverQueryRoomVersionForRoomPath is the HTTP path for the QueryRoomVersionForRoom API const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom" // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. @@ -555,19 +534,6 @@ func (h *httpRoomserverQueryAPI) QueryBackfill( return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } -// QueryServersInRoomAtEvent implements RoomServerQueryAPI -func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent( - ctx context.Context, - request *QueryServersInRoomAtEventRequest, - response *QueryServersInRoomAtEventResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "QueryServersInRoomAtEvent") - defer span.Finish() - - apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEventPath - return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} - // QueryRoomVersionCapabilities implements RoomServerQueryAPI func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities( ctx context.Context, diff --git a/roomserver/input/events.go b/roomserver/input/events.go index 2bb0d0a05..21c000cca 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/state/database" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -104,6 +105,7 @@ func processRoomEvent( // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) if err != nil { + logrus.WithError(err).WithField("event_id", event.EventID()).Error("processRoomEvent.checkAuthEvents failed for event") return } @@ -128,6 +130,7 @@ func processRoomEvent( // For outliers we can stop after we've stored the event itself as it // doesn't have any associated state to store and we don't need to // notify anyone about it. + logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier") return event.EventID(), nil } diff --git a/roomserver/query/backfill.go b/roomserver/query/backfill.go new file mode 100644 index 000000000..668ffbc8b --- /dev/null +++ b/roomserver/query/backfill.go @@ -0,0 +1,86 @@ +package query + +import ( + "context" + + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +// backfillRequester implements gomatrixserverlib.BackfillRequester +type backfillRequester struct { + db RoomserverQueryAPIDatabase + fedClient *gomatrixserverlib.FederationClient + thisServer gomatrixserverlib.ServerName +} + +// ServersAtEvent is called when trying to determine which server to request from. +// It returns a list of servers which can be queried for backfill requests. These servers +// will be servers that are in the room already. The entries at the beginning are preferred servers +// and will be tried first. An empty list will fail the request. +func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) { + // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for + // the event is necessary. + NIDs, err := b.db.EventNIDs(ctx, []string{eventID}) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event") + return + } + + // Retrieve all "m.room.member" state events of "join" membership, which + // contains the list of users in the room before the event, therefore all + // the servers in it at that moment. + events, err := getMembershipsBeforeEventNID(ctx, b.db, NIDs[eventID], true) + if err != nil { + logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event") + return + } + + // Store the server names in a temporary map to avoid duplicates. + serverSet := make(map[gomatrixserverlib.ServerName]bool) + for _, event := range events { + serverSet[event.Origin()] = true + } + for server := range serverSet { + if server == b.thisServer { + continue + } + servers = append(servers, server) + } + return +} + +// Backfill performs a backfill request to the given server. +// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid +func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) { + tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs) + return &tx, err +} + +func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { + logrus.Info("backfillRequester.ProvideEvents ", eventIDs) + ctx := context.Background() + nidMap, err := b.db.EventNIDs(ctx, eventIDs) + if err != nil { + logrus.WithError(err).WithField("event_ids", eventIDs).Error("Failed to find events") + return nil, err + } + eventNIDs := make([]types.EventNID, len(nidMap)) + i := 0 + for _, nid := range nidMap { + eventNIDs[i] = nid + i++ + } + eventsWithNids, err := b.db.Events(ctx, eventNIDs) + if err != nil { + logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events") + return nil, err + } + events := make([]gomatrixserverlib.Event, len(eventsWithNids)) + for i := range eventsWithNids { + events[i] = eventsWithNids[i].Event + } + logrus.Infof("backfillRequester.ProvideEvents Returning %+v", events) + return events, nil +} diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 224d9fa22..e80bc4a1e 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -19,6 +19,7 @@ package query import ( "context" "encoding/json" + "fmt" "net/http" "github.com/matrix-org/dendrite/common" @@ -31,6 +32,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // RoomserverQueryAPIEventDB has a convenience API to fetch events directly by @@ -100,6 +102,9 @@ type RoomserverQueryAPIDatabase interface { type RoomserverQueryAPI struct { DB RoomserverQueryAPIDatabase ImmutableCache caching.ImmutableCache + ServerName gomatrixserverlib.ServerName + KeyRing gomatrixserverlib.JSONVerifier + FedClient *gomatrixserverlib.FederationClient } // QueryLatestEventsAndState implements api.RoomserverQueryAPI @@ -344,7 +349,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom( events, err = r.DB.Events(ctx, eventNIDs) } else { - events, err = r.getMembershipsBeforeEventNID(ctx, membershipEventNID, request.JoinedOnly) + events, err = getMembershipsBeforeEventNID(ctx, r.DB, membershipEventNID, request.JoinedOnly) } if err != nil { @@ -363,19 +368,19 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom( // of the event's room as it was when this event was fired, then filters the state events to // only keep the "m.room.member" events with a "join" membership. These events are returned. // Returns an error if there was an issue fetching the events. -func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( - ctx context.Context, eventNID types.EventNID, joinedOnly bool, +func getMembershipsBeforeEventNID( + ctx context.Context, db RoomserverQueryAPIDatabase, eventNID types.EventNID, joinedOnly bool, ) ([]types.Event, error) { - roomState := state.NewStateResolution(r.DB) + roomState := state.NewStateResolution(db) events := []types.Event{} // Lookup the event NID - eIDs, err := r.DB.EventIDs(ctx, []types.EventNID{eventNID}) + eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID}) if err != nil { return nil, err } eventIDs := []string{eIDs[eventNID]} - prevState, err := r.DB.StateAtEventIDs(ctx, eventIDs) + prevState, err := db.StateAtEventIDs(ctx, eventIDs) if err != nil { return nil, err } @@ -395,7 +400,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( } // Get all of the events in this state - stateEvents, err := r.DB.Events(ctx, eventNIDs) + stateEvents, err := db.Events(ctx, eventNIDs) if err != nil { return nil, err } @@ -547,6 +552,13 @@ func (r *RoomserverQueryAPI) QueryBackfill( request *api.QueryBackfillRequest, response *api.QueryBackfillResponse, ) error { + logrus.WithField("req", request).Info("QueryBackfill - I am " + r.ServerName) + // if we are requesting the backfill then we need to do a federation hit + // TODO: we could be more sensible and fetch as many events we already have then request the rest + // which is what the syncapi does already. + if request.ServerName == r.ServerName { + return r.backfillViaFederation(ctx, request, response) + } var err error var front []string @@ -588,6 +600,26 @@ func (r *RoomserverQueryAPI) QueryBackfill( return err } +func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error { + roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID) + if err != nil { + return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) + } + events, err := gomatrixserverlib.RequestBackfill(ctx, &backfillRequester{ + db: r.DB, + fedClient: r.FedClient, + thisServer: r.ServerName, + }, r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit) + if err != nil { + return err + } + // TODO: persist these new events and update the state db so we can get state snapshots at these new backfilled events + // this will be important if we want to backfill multiple times as we get the join memberships from state snapshots. + + res.Events = events + return nil +} + func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { roomNID, err := r.DB.RoomNID(ctx, roomID) if err != nil { @@ -839,41 +871,6 @@ func getAuthChain( return authEvents, nil } -// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI -func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent( - ctx context.Context, - request *api.QueryServersInRoomAtEventRequest, - response *api.QueryServersInRoomAtEventResponse, -) error { - // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for - // the event is necessary. - NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID}) - if err != nil { - return err - } - - // Retrieve all "m.room.member" state events of "join" membership, which - // contains the list of users in the room before the event, therefore all - // the servers in it at that moment. - events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true) - if err != nil { - return err - } - - // Store the server names in a temporary map to avoid duplicates. - servers := make(map[gomatrixserverlib.ServerName]bool) - for _, event := range events { - servers[event.Origin()] = true - } - - // Populate the response. - for server := range servers { - response.Servers = append(response.Servers, server) - } - - return nil -} - // QueryRoomVersionCapabilities implements api.RoomserverQueryAPI func (r *RoomserverQueryAPI) QueryRoomVersionCapabilities( ctx context.Context, @@ -1055,20 +1052,6 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) - servMux.Handle( - api.RoomserverQueryServersInRoomAtEventPath, - common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse { - var request api.QueryServersInRoomAtEventRequest - var response api.QueryServersInRoomAtEventResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.ErrorResponse(err) - } - if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) servMux.Handle( api.RoomserverQueryRoomVersionCapabilitiesPath, common.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse { diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index fa4f20626..ea1c5c4c3 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -18,6 +18,7 @@ import ( "net/http" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" asQuery "github.com/matrix-org/dendrite/appservice/query" "github.com/matrix-org/dendrite/common/basecomponent" @@ -33,7 +34,7 @@ import ( // allowing other components running in the same process to hit the query the // APIs directly instead of having to use HTTP. func SetupRoomServerComponent( - base *basecomponent.BaseDendrite, + base *basecomponent.BaseDendrite, keyRing gomatrixserverlib.JSONVerifier, ) (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) { roomserverDB, err := storage.Open(string(base.Cfg.Database.RoomServer)) if err != nil { @@ -51,6 +52,11 @@ func SetupRoomServerComponent( queryAPI := query.RoomserverQueryAPI{ DB: roomserverDB, ImmutableCache: base.ImmutableCache, + ServerName: base.Cfg.Matrix.ServerName, + FedClient: base.CreateFederationClient(), + // TODO: We should have a key server so we don't keep adding components + // which talk to the same DB. + KeyRing: keyRing, } queryAPI.SetupHTTP(http.DefaultServeMux) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 873ee9366..d32a73c7a 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -282,7 +282,7 @@ func (r *messagesReq) handleEmptyEventsSlice() ( // Check if we have backward extremities for this room. if len(backwardExtremities) > 0 { // If so, retrieve as much events as needed through backfilling. - events, err = r.backfill(backwardExtremities, r.limit) + events, err = r.backfill(r.roomID, backwardExtremities, r.limit) if err != nil { return } @@ -331,7 +331,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { var pdus []gomatrixserverlib.HeaderedEvent // Only ask the remote server for enough events to reach the limit. - pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents)) + pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents)) if err != nil { return } @@ -355,45 +355,29 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent // event, or if there is no remote homeserver to contact. // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. -func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { - verReq := api.QueryRoomVersionForRoomRequest{RoomID: r.roomID} - verRes := api.QueryRoomVersionForRoomResponse{} - if err := r.queryAPI.QueryRoomVersionForRoom(r.ctx, &verReq, &verRes); err != nil { - return nil, err - } - - srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs) +func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { + var res api.QueryBackfillResponse + err := r.queryAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{ + RoomID: roomID, + EarliestEventsIDs: fromEventIDs, + Limit: limit, + ServerName: r.cfg.Matrix.ServerName, + }, &res) if err != nil { - return nil, fmt.Errorf("Cannot find server to backfill from: %w", err) + return nil, fmt.Errorf("QueryBackfill failed: %w", err) } + util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill") - headered := make([]gomatrixserverlib.HeaderedEvent, 0) - - // If the roomserver responded with at least one server that isn't us, - // send it a request for backfill. - util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server") - txn, err := r.federation.Backfill( - r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, - ) - if err != nil { - return nil, err - } - - for _, p := range txn.PDUs { - event, e := gomatrixserverlib.NewEventFromUntrustedJSON(p, verRes.RoomVersion) - if e != nil { - continue - } - headered = append(headered, event.Headered(verRes.RoomVersion)) - } - util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(headered)).Info("Storing new events from backfill") + // TODO: we should only be inserting events into the database from the roomserver's kafka output stream. + // Currently, this can race with live events for the room and cause problems. It's also just a bit unclear + // when you have multiple entry points to write events. // Store the events in the database, while marking them as unfit to show // up in responses to sync requests. - for i := range headered { + for i := range res.Events { if _, err = r.db.WriteEvent( r.ctx, - &headered[i], + &res.Events[i], []gomatrixserverlib.HeaderedEvent{}, []string{}, []string{}, @@ -403,63 +387,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv } } - return headered, nil -} - -func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) { - // Query the list of servers in the room when one of the backward extremities - // was sent. - var serversResponse api.QueryServersInRoomAtEventResponse - serversRequest := api.QueryServersInRoomAtEventRequest{ - RoomID: r.roomID, - EventID: fromEventIDs[0], - } - if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil { - util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender") - // FIXME: We shouldn't be doing this but in situations where we have already backfilled once - // the query API doesn't work as backfilled events do not make it to the room server. - // This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question. - // We need to inject backfilled events into the room server and store them appropriately. - events, err := r.db.Events(r.ctx, fromEventIDs) - if err != nil { - return "", err - } - if len(events) == 0 { - // should be impossible as these event IDs are backwards extremities - return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs) - } - // The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it. - // We shouldn't be doing this really, but as a heuristic it should work pretty well for now. - for _, e := range events { - _, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender()) - if srverr != nil { - util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender") - continue - } - if srv != r.cfg.Matrix.ServerName { - return srv, nil - } - } - // no valid events which have a remote server, fail. - return "", err - } - - // Use the first server from the response, except if that server is us. - // In that case, use the second one if the roomserver responded with - // enough servers. If not, use an empty string to prevent the backfill - // from happening as there's no server to direct the request towards. - // TODO: Be smarter at selecting the server to direct the request - // towards. - srvToBackfillFrom := serversResponse.Servers[0] - if srvToBackfillFrom == r.cfg.Matrix.ServerName { - if len(serversResponse.Servers) > 1 { - srvToBackfillFrom = serversResponse.Servers[1] - } else { - util.GetLogger(r.ctx).Info("Not enough servers to backfill from") - return "", nil - } - } - return srvToBackfillFrom, nil + return res.Events, nil } // setToDefault returns the default value for the "to" query parameter of a