From 6bac7e5efddea05aa68a56e44423d2bc157ec364 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 24 Mar 2020 12:20:10 +0000 Subject: [PATCH] Implement backfill over federation (#938) * Implement history visibility checks for /backfill Required for p2p to show history correctly. * Add sytest * Logging * Fix two backfill bugs which prevented backfill from working correctly - When receiving backfill requests, do not send the event that was in the original request. - When storing backfill results, correctly update the backwards extremity for the room. * hack: make backfill work multiple times * add sqlite impl and remove logging * Linting --- go.sum | 6 + roomserver/auth/auth.go | 70 ++++++- roomserver/query/query.go | 89 ++++++++- syncapi/routing/messages.go | 176 +++++++++--------- .../postgres/backward_extremities_table.go | 58 +++--- .../postgres/output_room_events_table.go | 1 - syncapi/storage/postgres/syncserver.go | 21 +-- .../sqlite3/backward_extremities_table.go | 72 +++---- syncapi/storage/sqlite3/syncserver.go | 17 +- syncapi/sync/requestpool.go | 13 +- sytest-whitelist | 1 + 11 files changed, 322 insertions(+), 202 deletions(-) diff --git a/go.sum b/go.sum index 0f4542b28..48e0efd3e 100644 --- a/go.sum +++ b/go.sum @@ -276,6 +276,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a h1:7+ github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203 h1:7HkL6bF7/M2cYteNFVtvGW5qjD4wHIiR0HsdCm2Rqao= github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200318180716-bc4ff56961e2 h1:y4DOMbhgPAnATHJ4lNxTWxIlJG0SlIPhvukx1sQkty4= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200318180716-bc4ff56961e2/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= @@ -470,6 +472,8 @@ go.uber.org/atomic v1.3.0 h1:vs7fgriifsPbGdK3bNuMWapNn3qnZhCRXc19NRdq010= go.uber.org/atomic v1.3.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -491,6 +495,7 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -559,6 +564,7 @@ golang.org/x/tools v0.0.0-20190910044552-dd2b5c81c578/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20190911230505-6bfd74cf029c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678 h1:rM1Udd0CgtYI3KUIhu9ROz0QCqjW+n/ODp/hH7c60Xc= golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/roomserver/auth/auth.go b/roomserver/auth/auth.go index 5ff1fadad..615a94b3c 100644 --- a/roomserver/auth/auth.go +++ b/roomserver/auth/auth.go @@ -12,18 +12,76 @@ package auth -import "github.com/matrix-org/gomatrixserverlib" +import ( + "encoding/json" -// IsServerAllowed returns true if there exists a event in authEvents -// which allows server to view this event. That is true when a client on the server -// can view the event. Otherwise returns false. + "github.com/matrix-org/gomatrixserverlib" +) + +// TODO: This logic should live in gomatrixserverlib + +// IsServerAllowed returns true if the server is allowed to see events in the room +// at this particular state. This function implements https://matrix.org/docs/spec/client_server/r0.6.0#id87 func IsServerAllowed( serverName gomatrixserverlib.ServerName, + serverCurrentlyInRoom bool, authEvents []gomatrixserverlib.Event, ) bool { + historyVisibility := historyVisibilityForRoom(authEvents) + + // 1. If the history_visibility was set to world_readable, allow. + if historyVisibility == "world_readable" { + return true + } + // 2. If the user's membership was join, allow. + joinedUserExists := IsAnyUserOnServerWithMembership(serverName, authEvents, gomatrixserverlib.Join) + if joinedUserExists { + return true + } + // 3. If history_visibility was set to shared, and the user joined the room at any point after the event was sent, allow. + if historyVisibility == "shared" && serverCurrentlyInRoom { + return true + } + // 4. If the user's membership was invite, and the history_visibility was set to invited, allow. + invitedUserExists := IsAnyUserOnServerWithMembership(serverName, authEvents, gomatrixserverlib.Invite) + if invitedUserExists && historyVisibility == "invited" { + return true + } + + // 5. Otherwise, deny. + return false +} + +func historyVisibilityForRoom(authEvents []gomatrixserverlib.Event) string { + // https://matrix.org/docs/spec/client_server/r0.6.0#id87 + // By default if no history_visibility is set, or if the value is not understood, the visibility is assumed to be shared. + visibility := "shared" + knownStates := []string{"invited", "joined", "shared", "world_readable"} + for _, ev := range authEvents { + if ev.Type() != gomatrixserverlib.MRoomHistoryVisibility { + continue + } + // TODO: This should be HistoryVisibilityContent to match things like 'MemberContent'. Do this when moving to GMSL + content := struct { + HistoryVisibility string `json:"history_visibility"` + }{} + if err := json.Unmarshal(ev.Content(), &content); err != nil { + break // value is not understood + } + for _, s := range knownStates { + if s == content.HistoryVisibility { + visibility = s + break + } + } + } + return visibility +} + +func IsAnyUserOnServerWithMembership(serverName gomatrixserverlib.ServerName, authEvents []gomatrixserverlib.Event, wantMembership string) bool { for _, ev := range authEvents { membership, err := ev.Membership() - if err != nil || membership != gomatrixserverlib.Join { + if err != nil || membership != wantMembership { continue } @@ -41,7 +99,5 @@ func IsServerAllowed( return true } } - - // TODO: Check if history visibility is shared and if the server is currently in the room return false } diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 2638919ad..7fe9dc980 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -447,14 +447,26 @@ func (r *RoomserverQueryAPI) QueryServerAllowedToSeeEvent( request *api.QueryServerAllowedToSeeEventRequest, response *api.QueryServerAllowedToSeeEventResponse, ) (err error) { + events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID}) + if err != nil { + return + } + if len(events) == 0 { + response.AllowedToSeeEvent = false // event doesn't exist so not allowed to see + return + } + isServerInRoom, err := r.isServerCurrentlyInRoom(ctx, request.ServerName, events[0].RoomID()) + if err != nil { + return + } response.AllowedToSeeEvent, err = r.checkServerAllowedToSeeEvent( - ctx, request.EventID, request.ServerName, + ctx, request.EventID, request.ServerName, isServerInRoom, ) return } func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent( - ctx context.Context, eventID string, serverName gomatrixserverlib.ServerName, + ctx context.Context, eventID string, serverName gomatrixserverlib.ServerName, isServerInRoom bool, ) (bool, error) { roomState := state.NewStateResolution(r.DB) stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID) @@ -469,7 +481,7 @@ func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent( return false, err } - return auth.IsServerAllowed(serverName, stateAtEvent), nil + return auth.IsServerAllowed(serverName, isServerInRoom, stateAtEvent), nil } // QueryMissingEvents implements api.RoomserverQueryAPI @@ -564,17 +576,55 @@ func (r *RoomserverQueryAPI) QueryBackfill( return err } +func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) { + roomNID, err := r.DB.RoomNID(ctx, roomID) + if err != nil { + return false, err + } + + eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true) + if err != nil { + return false, err + } + + events, err := r.DB.Events(ctx, eventNIDs) + if err != nil { + return false, err + } + gmslEvents := make([]gomatrixserverlib.Event, len(events)) + for i := range events { + gmslEvents[i] = events[i].Event + } + return auth.IsAnyUserOnServerWithMembership(serverName, gmslEvents, gomatrixserverlib.Join), nil +} + +// TODO: Remove this when we have tests to assert correctness of this function +// nolint:gocyclo func (r *RoomserverQueryAPI) scanEventTree( ctx context.Context, front []string, visited map[string]bool, limit int, serverName gomatrixserverlib.ServerName, -) (resultNIDs []types.EventNID, err error) { +) ([]types.EventNID, error) { + var resultNIDs []types.EventNID + var err error var allowed bool var events []types.Event var next []string var pre string + // TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be) + // Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing + // so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in + // duplicate events being sent in response to /backfill requests. + initialIgnoreList := make(map[string]bool, len(visited)) + for k, v := range visited { + initialIgnoreList[k] = v + } + resultNIDs = make([]types.EventNID, 0, limit) + var checkedServerInRoom bool + var isServerInRoom bool + // Loop through the event IDs to retrieve the requested events and go // through the whole tree (up to the provided limit) using the events' // "prev_event" key. @@ -587,7 +637,18 @@ BFSLoop: // Retrieve the events to process from the database. events, err = r.DB.EventsFromIDs(ctx, front) if err != nil { - return + return resultNIDs, err + } + + if !checkedServerInRoom && len(events) > 0 { + // It's nasty that we have to extract the room ID from an event, but many federation requests + // only talk in event IDs, no room IDs at all (!!!) + ev := events[0] + isServerInRoom, err = r.isServerCurrentlyInRoom(ctx, serverName, ev.RoomID()) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to check if server is currently in room, assuming not.") + } + checkedServerInRoom = true } for _, ev := range events { @@ -595,17 +656,23 @@ BFSLoop: if len(resultNIDs) == limit { break BFSLoop } - // Update the list of events to retrieve. - resultNIDs = append(resultNIDs, ev.EventNID) + + if !initialIgnoreList[ev.EventID()] { + // Update the list of events to retrieve. + resultNIDs = append(resultNIDs, ev.EventNID) + } // Loop through the event's parents. for _, pre = range ev.PrevEventIDs() { // Only add an event to the list of next events to process if it // hasn't been seen before. if !visited[pre] { visited[pre] = true - allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName) + allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName, isServerInRoom) if err != nil { - return + util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).WithError(err).Error( + "Error checking if allowed to see event", + ) + return resultNIDs, err } // If the event hasn't been seen before and the HS @@ -613,6 +680,8 @@ BFSLoop: // the list of events to retrieve. if allowed { next = append(next, pre) + } else { + util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).Info("Not allowed to see event") } } } @@ -621,7 +690,7 @@ BFSLoop: front = next } - return + return resultNIDs, err } // QueryStateAndAuthChain implements api.RoomserverQueryAPI diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index cd21c3bd8..5f2e4f171 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -28,7 +28,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) type messagesReq struct { @@ -151,6 +151,14 @@ func OnIncomingMessagesRequest( util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed") return jsonerror.InternalServerError() } + util.GetLogger(req.Context()).WithFields(logrus.Fields{ + "from": from.String(), + "to": to.String(), + "limit": limit, + "backwards": backwardOrdering, + "return_start": start.String(), + "return_end": end.String(), + }).Info("Responding") // Respond with the events. return util.JSONResponse{ @@ -302,8 +310,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent events []gomatrixserverlib.HeaderedEvent, err error, ) { // Check if we have enough events. - isSetLargeEnough := true - if len(streamEvents) < r.limit { + isSetLargeEnough := len(streamEvents) >= r.limit + if !isSetLargeEnough { + // it might be fine we don't have up to 'limit' events, let's find out if r.backwardOrdering { if r.wasToProvided { // The condition in the SQL query is a strict "greater than" so @@ -343,54 +352,6 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent return } -// containsBackwardExtremity checks if a slice of StreamEvent contains a -// backward extremity. It does so by selecting the earliest event in the slice -// and by checking the presence in the database of all of its parent events, and -// considers the event itself a backward extremity if at least one of the parent -// events doesn't exist in the database. -// Returns an error if there was an issue with talking to the database. -// -// This function is unused but currently set to nolint for now until we are -// absolutely sure that the changes in matrix-org/dendrite#847 are behaving -// properly. -// nolint:unused -func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) { - // Select the earliest retrieved event. - var ev *types.StreamEvent - if r.backwardOrdering { - ev = &(events[len(events)-1]) - } else { - ev = &(events[0]) - } - // Get the earliest retrieved event's parents. - prevIDs := ev.PrevEventIDs() - prevs, err := r.db.Events(r.ctx, prevIDs) - if err != nil { - return false, nil - } - // Check if we have all of the events we requested. If not, it means we've - // reached a backward extremity. - var eventInDB bool - var id string - // Iterate over the IDs we used in the request. - for _, id = range prevIDs { - eventInDB = false - // Iterate over the events we got in response. - for _, ev := range prevs { - if ev.EventID() == id { - eventInDB = true - } - } - // One occurrence of one the event's parents not being present in the - // database is enough to say that the event is a backward extremity. - if !eventInDB { - return true, nil - } - } - - return false, nil -} - // backfill performs a backfill request over the federation on another // homeserver in the room. // See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid @@ -401,6 +362,48 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { + srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs) + if err != nil { + return nil, fmt.Errorf("Cannot find server to backfill from: %w", err) + } + + pdus := make([]gomatrixserverlib.HeaderedEvent, 0) + + // If the roomserver responded with at least one server that isn't us, + // send it a request for backfill. + util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server") + txn, err := r.federation.Backfill( + r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, + ) + if err != nil { + return nil, err + } + + for _, p := range txn.PDUs { + pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1)) + } + util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill") + + // Store the events in the database, while marking them as unfit to show + // up in responses to sync requests. + for _, pdu := range pdus { + headered := pdu.Headered(gomatrixserverlib.RoomVersionV1) + if _, err = r.db.WriteEvent( + r.ctx, + &headered, + []gomatrixserverlib.HeaderedEvent{}, + []string{}, + []string{}, + nil, true, + ); err != nil { + return nil, err + } + } + + return pdus, nil +} + +func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) { // Query the list of servers in the room when one of the backward extremities // was sent. var serversResponse api.QueryServersInRoomAtEventResponse @@ -409,7 +412,33 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv EventID: fromEventIDs[0], } if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil { - return nil, err + util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender") + // FIXME: We shouldn't be doing this but in situations where we have already backfilled once + // the query API doesn't work as backfilled events do not make it to the room server. + // This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question. + // We need to inject backfilled events into the room server and store them appropriately. + events, err := r.db.Events(r.ctx, fromEventIDs) + if err != nil { + return "", err + } + if len(events) == 0 { + // should be impossible as these event IDs are backwards extremities + return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs) + } + // The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it. + // We shouldn't be doing this really, but as a heuristic it should work pretty well for now. + for _, e := range events { + _, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender()) + if srverr != nil { + util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender") + continue + } + if srv != r.cfg.Matrix.ServerName { + return srv, nil + } + } + // no valid events which have a remote server, fail. + return "", err } // Use the first server from the response, except if that server is us. @@ -423,45 +452,11 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv if len(serversResponse.Servers) > 1 { srvToBackfillFrom = serversResponse.Servers[1] } else { - srvToBackfillFrom = gomatrixserverlib.ServerName("") - log.Warn("Not enough servers to backfill from") + util.GetLogger(r.ctx).Info("Not enough servers to backfill from") + return "", nil } } - - pdus := make([]gomatrixserverlib.HeaderedEvent, 0) - - // If the roomserver responded with at least one server that isn't us, - // send it a request for backfill. - if len(srvToBackfillFrom) > 0 { - txn, err := r.federation.Backfill( - r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, - ) - if err != nil { - return nil, err - } - - for _, p := range txn.PDUs { - pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1)) - } - - // Store the events in the database, while marking them as unfit to show - // up in responses to sync requests. - for _, pdu := range pdus { - headered := pdu.Headered(gomatrixserverlib.RoomVersionV1) - if _, err = r.db.WriteEvent( - r.ctx, - &headered, - []gomatrixserverlib.HeaderedEvent{}, - []string{}, - []string{}, - nil, true, - ); err != nil { - return nil, err - } - } - } - - return pdus, nil + return srvToBackfillFrom, nil } // setToDefault returns the default value for the "to" query parameter of a @@ -475,7 +470,8 @@ func setToDefault( roomID string, ) (to *types.PaginationToken, err error) { if backwardOrdering { - to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0) + // go 1 earlier than the first event so we correctly fetch the earliest event + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0) } else { var pos types.StreamPosition pos, err = db.MaxTopologicalPosition(ctx, roomID) diff --git a/syncapi/storage/postgres/backward_extremities_table.go b/syncapi/storage/postgres/backward_extremities_table.go index 8286ca434..b3ee28e01 100644 --- a/syncapi/storage/postgres/backward_extremities_table.go +++ b/syncapi/storage/postgres/backward_extremities_table.go @@ -21,39 +21,53 @@ import ( "github.com/matrix-org/dendrite/common" ) +// The purpose of this table is to keep track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. const backwardExtremitiesSchema = ` -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( -- The 'room_id' key for the event. room_id TEXT NOT NULL, - -- The event ID for the event. + -- The event ID for the last known event. This is the backwards extremity. event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, - PRIMARY KEY(room_id, event_id) + PRIMARY KEY(room_id, event_id, prev_event_id) ); ` const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id)" + - " VALUES ($1, $2)" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + " ON CONFLICT DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + "SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -const isBackwardExtremitySQL = "" + - "SELECT EXISTS (" + - " SELECT TRUE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + - ")" - const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" type backwardExtremitiesStatements struct { insertBackwardExtremityStmt *sql.Stmt selectBackwardExtremitiesForRoomStmt *sql.Stmt - isBackwardExtremityStmt *sql.Stmt deleteBackwardExtremityStmt *sql.Stmt } @@ -68,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { return } - if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil { - return - } if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { return } @@ -78,17 +89,15 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { } func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, ) (err error) { - _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) return } func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (eventIDs []string, err error) { - eventIDs = make([]string, 0) - rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return @@ -107,16 +116,9 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( return eventIDs, rows.Err() } -func (s *backwardExtremitiesStatements) isBackwardExtremity( - ctx context.Context, roomID, eventID string, -) (isBE bool, err error) { - err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE) - return -} - func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, ) (err error) { - _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) return } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 5f9a1d0c6..0b53dfa9e 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -305,7 +305,6 @@ func (s *outputRoomEventsStatements) selectRecentEvents( } else { stmt = common.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if err != nil { return nil, err diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index ccf1c5656..f3f1aabc7 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -111,22 +111,17 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ return d.StreamEventsToEvents(nil, streamEvents), nil } -func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error { - // If the event is already known as a backward extremity, don't consider - // it as such anymore now that we have it. - isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID()) - if err != nil { +// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of +// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table +// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. +func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { + if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } - if isBackwardExtremity { - if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs()) + prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) if err != nil { return err } @@ -141,7 +136,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev // If the event is missing, consider it a backward extremity. if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { return err } } @@ -174,7 +169,7 @@ func (d *SyncServerDatasource) WriteEvent( return err } - if err = d.handleBackwardExtremities(ctx, ev); err != nil { + if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { return err } diff --git a/syncapi/storage/sqlite3/backward_extremities_table.go b/syncapi/storage/sqlite3/backward_extremities_table.go index fcf15da25..0663e2a12 100644 --- a/syncapi/storage/sqlite3/backward_extremities_table.go +++ b/syncapi/storage/sqlite3/backward_extremities_table.go @@ -21,38 +21,53 @@ import ( "github.com/matrix-org/dendrite/common" ) +// The purpose of this table is to keep track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. const backwardExtremitiesSchema = ` -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. room_id TEXT NOT NULL, + -- The event ID for the last known event. This is the backwards extremity. event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, - PRIMARY KEY(room_id, event_id) + PRIMARY KEY(room_id, event_id, prev_event_id) ); ` const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id)" + - " VALUES ($1, $2)" + - " ON CONFLICT (room_id, event_id) DO NOTHING" + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + "SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -const isBackwardExtremitySQL = "" + - "SELECT EXISTS (" + - " SELECT TRUE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + - ")" - const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" type backwardExtremitiesStatements struct { insertBackwardExtremityStmt *sql.Stmt selectBackwardExtremitiesForRoomStmt *sql.Stmt - isBackwardExtremityStmt *sql.Stmt deleteBackwardExtremityStmt *sql.Stmt } @@ -67,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { return } - if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil { - return - } if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { return } @@ -77,23 +89,20 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { } func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, ) (err error) { - stmt := common.TxStmt(txn, s.insertBackwardExtremityStmt) - _, err = stmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) return } func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( - ctx context.Context, txn *sql.Tx, roomID string, + ctx context.Context, roomID string, ) (eventIDs []string, err error) { - eventIDs = make([]string, 0) - - stmt := common.TxStmt(txn, s.selectBackwardExtremitiesForRoomStmt) - rows, err := stmt.QueryContext(ctx, roomID) + rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return } + defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") for rows.Next() { var eID string @@ -104,21 +113,12 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( eventIDs = append(eventIDs, eID) } - return -} - -func (s *backwardExtremitiesStatements) isBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, -) (isBE bool, err error) { - stmt := common.TxStmt(txn, s.isBackwardExtremityStmt) - err = stmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE) - return + return eventIDs, rows.Err() } func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, ) (err error) { - stmt := common.TxStmt(txn, s.deleteBackwardExtremityStmt) - _, err = stmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) return } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index a06fc91f5..8ff189007 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -137,18 +137,13 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ return d.StreamEventsToEvents(nil, streamEvents), nil } +// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of +// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table +// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { - // If the event is already known as a backward extremity, don't consider - // it as such anymore now that we have it. - isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()) - if err != nil { + if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } - if isBackwardExtremity { - if err = d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. @@ -167,7 +162,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // If the event is missing, consider it a backward extremity. if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { return err } } @@ -348,7 +343,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi func (d *SyncServerDatasource) BackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (backwardExtremities []string, err error) { - return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, nil, roomID) + return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID) } // MaxTopologicalPosition returns the highest topological position for a given diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index b4ccbd272..69efd8aa8 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -47,7 +47,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype var syncData *types.Response // Extract values from request - logger := util.GetLogger(req.Context()) userID := device.UserID syncReq, err := newSyncRequest(req, *device) if err != nil { @@ -56,20 +55,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype JSON: jsonerror.Unknown(err.Error()), } } - logger.WithFields(log.Fields{ + logger := util.GetLogger(req.Context()).WithFields(log.Fields{ "userID": userID, "since": syncReq.since, "timeout": syncReq.timeout, - }).Info("Incoming /sync request") + }) currPos := rp.notifier.CurrentPosition() if shouldReturnImmediately(syncReq) { syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed") + logger.WithError(err).Error("rp.currentSyncForUser failed") return jsonerror.InternalServerError() } + logger.WithField("next", syncData.NextBatch).Info("Responding immediately") return util.JSONResponse{ Code: http.StatusOK, JSON: syncData, @@ -107,7 +107,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype hasTimedOut = true // Or for the request to be cancelled case <-req.Context().Done(): - util.GetLogger(req.Context()).WithError(err).Error("request cancelled") + logger.WithError(err).Error("request cancelled") return jsonerror.InternalServerError() } @@ -118,11 +118,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed") + logger.WithError(err).Error("rp.currentSyncForUser failed") return jsonerror.InternalServerError() } if !syncData.IsEmpty() || hasTimedOut { + logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding") return util.JSONResponse{ Code: http.StatusOK, JSON: syncData, diff --git a/sytest-whitelist b/sytest-whitelist index 03e11b83b..95b000e16 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -220,3 +220,4 @@ Regular users can add and delete aliases when m.room.aliases is restricted GET /r0/capabilities is not public GET /joined_rooms lists newly-created room /joined_rooms returns only joined rooms +Message history can be paginated over federation