From 5faecdac828592b6cf9d2851e13af8fb6d1df30a Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 19 May 2020 16:42:30 +0100 Subject: [PATCH 1/6] Bake in git commit into dendritejs binary (#1048) --- build-dendritejs.sh | 4 ++++ cmd/dendritejs/main.go | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100755 build-dendritejs.sh diff --git a/build-dendritejs.sh b/build-dendritejs.sh new file mode 100755 index 000000000..cd42a6bee --- /dev/null +++ b/build-dendritejs.sh @@ -0,0 +1,4 @@ +#!/bin/bash -eu + +export GIT_COMMIT=$(git rev-list -1 HEAD) && \ +GOOS=js GOARCH=wasm go build -ldflags "-X main.GitCommit=$GIT_COMMIT" -o main.wasm ./cmd/dendritejs \ No newline at end of file diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 618ebbc60..5bdf18efa 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -46,8 +46,10 @@ import ( _ "github.com/matrix-org/go-sqlite3-js" ) +var GitCommit string + func init() { - fmt.Println("dendrite.js starting...") + fmt.Printf("[%s] dendrite.js starting...\n", GitCommit) } const keyNameEd25519 = "_go_ed25519_key" From 260e69d138fb82003b0a7e77aeb5cac6b3281777 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 19 May 2020 18:42:55 +0100 Subject: [PATCH 2/6] Make "Outbound federation can backfill events" pass sytest (#1049) - Use a backfill limit of 100 regardless of what was asked. - Special case the create event for `StateIDsBeforeEvent` - Trim to the limit in `syncapi` --- roomserver/internal/query.go | 7 ++++++- roomserver/internal/query_backfill.go | 6 ++++++ syncapi/routing/messages.go | 9 ++++++++- sytest-whitelist | 1 + 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index 8a8c8e7d0..8043746ca 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -535,9 +535,14 @@ func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req * return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) } requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName) + // Request 100 items regardless of what the query asks for. + // We don't want to go much higher than this. + // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass + // (so we don't need to hit /state_ids which the test has no listener for) + // Specifically the test "Outbound federation can backfill events" events, err := gomatrixserverlib.RequestBackfill( ctx, requester, - r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit) + r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, 100) if err != nil { return err } diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/query_backfill.go index d42038e74..82f7238d7 100644 --- a/roomserver/internal/query_backfill.go +++ b/roomserver/internal/query_backfill.go @@ -7,6 +7,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" "github.com/sirupsen/logrus" ) @@ -37,6 +38,11 @@ func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok { return ids, nil } + if len(targetEvent.PrevEventIDs()) == 0 && targetEvent.Type() == "m.room.create" && targetEvent.StateKeyEquals("") { + util.GetLogger(ctx).WithField("room_id", targetEvent.RoomID()).Info("Backfilled to the beginning of the room") + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = []string{} + return nil, nil + } // if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event. // Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or // we don't know the result of state res to merge forks (2 or more prev_events) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 67de6df7e..811188cc8 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -412,7 +412,14 @@ func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) } } - return res.Events, nil + // we may have got more than the requested limit so resize now + events := res.Events + if len(events) > limit { + // last `limit` events + events = events[len(events)-limit:] + } + + return events, nil } // setToDefault returns the default value for the "to" query parameter of a diff --git a/sytest-whitelist b/sytest-whitelist index 035b9b36e..4a8af13a6 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -279,3 +279,4 @@ Inbound federation can return missing events for invite visibility Inbound federation can get public room list An event which redacts itself should be ignored A pair of events which redact each other should be ignored +Outbound federation can backfill events From 1414922026636e71ba61acc5c243e9a6cf914981 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 20 May 2020 16:04:31 +0100 Subject: [PATCH 3/6] sytest: Make 'Inbound federation can backfill events' pass (#1051) * sytest: Make 'Inbound federation can backfill events' pass This breaks 'Outbound federation can backfill events' because now we are returning the right number of events, which the previous test was relying on. Previously, /messages was backfilling the membership event, causing the test to pass. Now we are no longer backfilling the membership event due to the change in this commit, causing the test to fail. The test should instead be returning the membership event locally from synacpis database, but it doesn't do it fast enough, resulting in a no-op /sync response with a next_batch=s0_0 which will never pick up the local membership event when it rolls in. The test does attempt to retry, but doesn't take the new next_batch=s1_0 resulting in it missing from the /messages response. * Linting --- federationapi/routing/backfill.go | 17 +++++++-- roomserver/api/query.go | 15 ++++++-- roomserver/internal/query.go | 14 +++----- roomserver/internal/query_backfill.go | 35 +++++++++++++++---- syncapi/routing/messages.go | 11 +++--- syncapi/storage/interface.go | 5 ++- .../postgres/backwards_extremities_table.go | 13 +++---- syncapi/storage/shared/syncserver.go | 3 +- .../sqlite3/backwards_extremities_table.go | 13 +++---- syncapi/storage/tables/interface.go | 4 +-- sytest-whitelist | 1 + 11 files changed, 86 insertions(+), 45 deletions(-) diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index 651a4a2d2..1f46b240f 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -69,9 +69,14 @@ func Backfill( // Populate the request. req := api.QueryBackfillRequest{ - RoomID: roomID, - EarliestEventsIDs: eIDs, - ServerName: request.Origin(), + RoomID: roomID, + // we don't know who the successors are for these events, which won't + // be a problem because we don't use that information when servicing /backfill requests, + // only when making them. TODO: Think of a better API shape + BackwardsExtremities: map[string][]string{ + "": eIDs, + }, + ServerName: request.Origin(), } if req.Limit, err = strconv.Atoi(limit); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("strconv.Atoi failed") @@ -105,6 +110,12 @@ func Backfill( eventJSONs = append(eventJSONs, e.JSON()) } + // sytest wants these in reversed order, similar to /messages, so reverse them now. + for i := len(eventJSONs)/2 - 1; i >= 0; i-- { + opp := len(eventJSONs) - 1 - i + eventJSONs[i], eventJSONs[opp] = eventJSONs[opp], eventJSONs[i] + } + txn := gomatrixserverlib.Transaction{ Origin: cfg.Matrix.ServerName, PDUs: eventJSONs, diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 9afc51f4e..e92d2a992 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -21,6 +21,7 @@ import ( commonHTTP "github.com/matrix-org/dendrite/common/http" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" opentracing "github.com/opentracing/opentracing-go" ) @@ -228,14 +229,24 @@ type QueryStateAndAuthChainResponse struct { type QueryBackfillRequest struct { // The room to backfill RoomID string `json:"room_id"` - // Events to start paginating from. - EarliestEventsIDs []string `json:"earliest_event_ids"` + // A map of backwards extremity event ID to a list of its prev_event IDs. + BackwardsExtremities map[string][]string `json:"backwards_extremities"` // The maximum number of events to retrieve. Limit int `json:"limit"` // The server interested in the events. ServerName gomatrixserverlib.ServerName `json:"server_name"` } +// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order. +func (r *QueryBackfillRequest) PrevEventIDs() []string { + var prevEventIDs []string + for _, pes := range r.BackwardsExtremities { + prevEventIDs = append(prevEventIDs, pes...) + } + prevEventIDs = util.UniqueStrings(prevEventIDs) + return prevEventIDs +} + // QueryBackfillResponse is a response to QueryBackfill. type QueryBackfillResponse struct { // Missing events, arbritrary order. diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index 8043746ca..2d1c21c57 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -495,14 +495,8 @@ func (r *RoomserverInternalAPI) QueryBackfill( // defines the highest number of elements in the map below. visited := make(map[string]bool, request.Limit) - // The provided event IDs have already been seen by the request's emitter, - // and will be retrieved anyway, so there's no need to care about them if - // they appear in our exploration of the event tree. - for _, id := range request.EarliestEventsIDs { - visited[id] = true - } - - front = request.EarliestEventsIDs + // this will include these events which is what we want + front = request.PrevEventIDs() // Scan the event tree for events to send back. resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName) @@ -534,7 +528,7 @@ func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req * if err != nil { return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) } - requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName) + requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities) // Request 100 items regardless of what the query asks for. // We don't want to go much higher than this. // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass @@ -542,7 +536,7 @@ func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req * // Specifically the test "Outbound federation can backfill events" events, err := gomatrixserverlib.RequestBackfill( ctx, requester, - r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, 100) + r.KeyRing, req.RoomID, roomVer, req.PrevEventIDs(), 100) if err != nil { return err } diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/query_backfill.go index 82f7238d7..49e0af34a 100644 --- a/roomserver/internal/query_backfill.go +++ b/roomserver/internal/query_backfill.go @@ -16,6 +16,7 @@ type backfillRequester struct { db storage.Database fedClient *gomatrixserverlib.FederationClient thisServer gomatrixserverlib.ServerName + bwExtrems map[string][]string // per-request state servers []gomatrixserverlib.ServerName @@ -23,13 +24,14 @@ type backfillRequester struct { eventIDMap map[string]gomatrixserverlib.Event } -func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester { +func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { return &backfillRequester{ db: db, fedClient: fedClient, thisServer: thisServer, eventIDToBeforeStateIDs: make(map[string][]string), eventIDMap: make(map[string]gomatrixserverlib.Event), + bwExtrems: bwExtrems, } } @@ -160,26 +162,44 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr // It returns a list of servers which can be queried for backfill requests. These servers // will be servers that are in the room already. The entries at the beginning are preferred servers // and will be tried first. An empty list will fail the request. -func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) { +func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName { + // eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use + // its successor, so look it up. + successor := "" +FindSuccessor: + for sucID, prevEventIDs := range b.bwExtrems { + for _, pe := range prevEventIDs { + if pe == eventID { + successor = sucID + break FindSuccessor + } + } + } + if successor == "" { + logrus.WithField("event_id", eventID).Error("ServersAtEvent: failed to find successor of this event to determine room state") + return nil + } + eventID = successor + // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for // the event is necessary. NIDs, err := b.db.EventNIDs(ctx, []string{eventID}) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event") - return + return nil } stateEntries, err := stateBeforeEvent(ctx, b.db, NIDs[eventID]) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event") - return + return nil } // possibly return all joined servers depending on history visiblity memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries) if err != nil { logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") - return + return nil } logrus.Infof("ServersAtEvent including %d current events from history visibility", len(memberEventsFromVis)) @@ -189,7 +209,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID memberEvents, err := getMembershipsAtState(ctx, b.db, stateEntries, true) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event") - return + return nil } memberEvents = append(memberEvents, memberEventsFromVis...) @@ -198,6 +218,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID for _, event := range memberEvents { serverSet[event.Origin()] = true } + var servers []gomatrixserverlib.ServerName for server := range serverSet { if server == b.thisServer { continue @@ -205,7 +226,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID servers = append(servers, server) } b.servers = servers - return + return servers } // Backfill performs a backfill request to the given server. diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 811188cc8..88e16fe57 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -205,6 +205,7 @@ func (r *messagesReq) retrieveEvents() ( } var events []gomatrixserverlib.HeaderedEvent + util.GetLogger(r.ctx).WithField("start", start).WithField("end", end).Infof("Fetched %d events locally", len(streamEvents)) // There can be two reasons for streamEvents to be empty: either we've // reached the oldest event in the room (or the most recent one, depending @@ -373,13 +374,13 @@ func (e eventsByDepth) Less(i, j int) bool { // event, or if there is no remote homeserver to contact. // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. -func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { +func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { var res api.QueryBackfillResponse err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{ - RoomID: roomID, - EarliestEventsIDs: fromEventIDs, - Limit: limit, - ServerName: r.cfg.Matrix.ServerName, + RoomID: roomID, + BackwardsExtremities: backwardsExtremities, + Limit: limit, + ServerName: r.cfg.Matrix.ServerName, }, &res) if err != nil { return nil, fmt.Errorf("QueryBackfill failed: %w", err) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index eba008b34..121e94c71 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -94,9 +94,8 @@ type Database interface { GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // EventPositionInTopology returns the depth and stream position of the given event. EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error) - // BackwardExtremitiesForRoom returns the event IDs of all of the backward - // extremities we know of for a given room. - BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) + // BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events. + BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error) // MaxTopologicalPosition returns the highest topological position for a given room. MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error) // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and diff --git a/syncapi/storage/postgres/backwards_extremities_table.go b/syncapi/storage/postgres/backwards_extremities_table.go index 3dd1b2349..fa498a401 100644 --- a/syncapi/storage/postgres/backwards_extremities_table.go +++ b/syncapi/storage/postgres/backwards_extremities_table.go @@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" + " ON CONFLICT DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + - "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" + "SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1" const deleteBackwardExtremitySQL = "" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" @@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom( ctx context.Context, roomID string, -) (eventIDs []string, err error) { +) (bwExtrems map[string][]string, err error) { rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return } defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") + bwExtrems = make(map[string][]string) for rows.Next() { var eID string - if err = rows.Scan(&eID); err != nil { + var prevEventID string + if err = rows.Scan(&eID, &prevEventID); err != nil { return } - - eventIDs = append(eventIDs, eID) + bwExtrems[eID] = append(bwExtrems[eID], prevEventID) } - return eventIDs, rows.Err() + return bwExtrems, rows.Err() } func (s *backwardExtremitiesStatements) DeleteBackwardExtremity( diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index be1f9c7aa..543e5b4a3 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -310,6 +310,7 @@ func (d *Database) updateRoomState( } membership = &value } + if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { return err } @@ -367,7 +368,7 @@ func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, func (d *Database) BackwardExtremitiesForRoom( ctx context.Context, roomID string, -) (backwardExtremities []string, err error) { +) (backwardExtremities map[string][]string, err error) { return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) } diff --git a/syncapi/storage/sqlite3/backwards_extremities_table.go b/syncapi/storage/sqlite3/backwards_extremities_table.go index a172f6bf7..c5d9cae59 100644 --- a/syncapi/storage/sqlite3/backwards_extremities_table.go +++ b/syncapi/storage/sqlite3/backwards_extremities_table.go @@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" + " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + - "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" + "SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1" const deleteBackwardExtremitySQL = "" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" @@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom( ctx context.Context, roomID string, -) (eventIDs []string, err error) { +) (bwExtrems map[string][]string, err error) { rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return } defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") + bwExtrems = make(map[string][]string) for rows.Next() { var eID string - if err = rows.Scan(&eID); err != nil { + var prevEventID string + if err = rows.Scan(&eID, &prevEventID); err != nil { return } - - eventIDs = append(eventIDs, eID) + bwExtrems[eID] = append(bwExtrems[eID], prevEventID) } - return eventIDs, rows.Err() + return bwExtrems, rows.Err() } func (s *backwardExtremitiesStatements) DeleteBackwardExtremity( diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index f31bdc2e9..bc3b6941a 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -89,8 +89,8 @@ type CurrentRoomState interface { type BackwardsExtremities interface { // InsertsBackwardExtremity inserts a new backwards extremity. InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error) - // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room. - SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (eventIDs []string, err error) + // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room, as a map of event_id to list of prev_event_ids. + SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error) // DeleteBackwardExtremity removes a backwards extremity for a room, if one existed. DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error) } diff --git a/sytest-whitelist b/sytest-whitelist index 4a8af13a6..2e8205dcd 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -280,3 +280,4 @@ Inbound federation can get public room list An event which redacts itself should be ignored A pair of events which redact each other should be ignored Outbound federation can backfill events +Inbound federation can backfill events From 6091bf044ff41e9f248b1077d5b05f1a4c694412 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 20 May 2020 17:30:03 +0100 Subject: [PATCH 4/6] sytest: Add remaining backfill tests (#1052) One failed because of `null` instead of `[]` in HTTP responses. One failed because we hadn't implemented in-line filter limits! --- federationapi/routing/backfill.go | 2 +- syncapi/sync/request.go | 22 +++++++++++++++++++++- syncapi/sync/requestpool.go | 1 + sytest-whitelist | 2 ++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index 1f46b240f..0d3893de3 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -102,7 +102,7 @@ func Backfill( } } - var eventJSONs []json.RawMessage + eventJSONs := []json.RawMessage{} for _, e := range gomatrixserverlib.ReverseTopologicalOrdering( evs, gomatrixserverlib.TopologicalOrderByPrevEvents, diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 66663cf0a..c7796b561 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -16,6 +16,7 @@ package sync import ( "context" + "encoding/json" "net/http" "strconv" "time" @@ -30,6 +31,14 @@ import ( const defaultSyncTimeout = time.Duration(0) const defaultTimelineLimit = 20 +type filter struct { + Room struct { + Timeline struct { + Limit *int `json:"limit"` + } `json:"timeline"` + } `json:"room"` +} + // syncRequest represents a /sync request, with sensible defaults/sanity checks applied. type syncRequest struct { ctx context.Context @@ -54,6 +63,17 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e } since = &tok } + timelineLimit := defaultTimelineLimit + // TODO: read from stored filters too + filterQuery := req.URL.Query().Get("filter") + if filterQuery != "" && filterQuery[0] == '{' { + // attempt to parse the timeline limit at least + var f filter + err := json.Unmarshal([]byte(filterQuery), &f) + if err == nil && f.Room.Timeline.Limit != nil { + timelineLimit = *f.Room.Timeline.Limit + } + } // TODO: Additional query params: set_presence, filter return &syncRequest{ ctx: req.Context(), @@ -61,7 +81,7 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e timeout: timeout, since: since, wantFullState: wantFullState, - limit: defaultTimelineLimit, // TODO: read from filter + limit: timelineLimit, log: util.GetLogger(req.Context()), }, nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 6e0f44e9a..82ce283b6 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -59,6 +59,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype "userID": userID, "since": syncReq.since, "timeout": syncReq.timeout, + "limit": syncReq.limit, }) currPos := rp.notifier.CurrentPosition() diff --git a/sytest-whitelist b/sytest-whitelist index 2e8205dcd..d5b4feaf6 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -281,3 +281,5 @@ An event which redacts itself should be ignored A pair of events which redact each other should be ignored Outbound federation can backfill events Inbound federation can backfill events +Backfill checks the events requested belong to the room +Backfilled events whose prev_events are in a different room do not allow cross-room back-pagination From f2c07437fe3d7f54977f7e645cd045a04e832020 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 20 May 2020 18:03:06 +0100 Subject: [PATCH 5/6] Use memberships to determine whether to reset latest events/state on room join (#1047) * Track local/remote memberships, re-scope some input stuff * Check if we're in the room already before resetting latest events/state * Fix postgres, fix lint * Review comments --- roomserver/internal/input.go | 4 +- roomserver/internal/input_events.go | 53 ++++++++--------- roomserver/internal/input_latest_events.go | 23 +++----- roomserver/internal/input_membership.go | 24 +++++--- roomserver/internal/query.go | 4 +- roomserver/internal/query_backfill.go | 2 +- roomserver/storage/interface.go | 4 +- .../storage/postgres/membership_table.go | 59 ++++++++++++++----- roomserver/storage/postgres/storage.go | 17 +++--- .../storage/sqlite3/membership_table.go | 53 +++++++++++++---- roomserver/storage/sqlite3/storage.go | 17 +++--- roomserver/types/types.go | 2 +- 12 files changed, 162 insertions(+), 100 deletions(-) diff --git a/roomserver/internal/input.go b/roomserver/internal/input.go index ab3d7516b..932b4df46 100644 --- a/roomserver/internal/input.go +++ b/roomserver/internal/input.go @@ -60,7 +60,7 @@ func (r *RoomserverInternalAPI) InputRoomEvents( defer r.mutex.Unlock() for i := range request.InputInviteEvents { var loopback *api.InputRoomEvent - if loopback, err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { + if loopback, err = r.processInviteEvent(ctx, r, request.InputInviteEvents[i]); err != nil { return err } // The processInviteEvent function can optionally return a @@ -71,7 +71,7 @@ func (r *RoomserverInternalAPI) InputRoomEvents( } } for i := range request.InputRoomEvents { - if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { + if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { return err } } diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input_events.go index f5c678ca6..a4167714d 100644 --- a/roomserver/internal/input_events.go +++ b/roomserver/internal/input_events.go @@ -31,21 +31,13 @@ import ( log "github.com/sirupsen/logrus" ) -// OutputRoomEventWriter has the APIs needed to write an event to the output logs. -type OutputRoomEventWriter interface { - // Write a list of events for a room - WriteOutputEvents(roomID string, updates []api.OutputEvent) error -} - // processRoomEvent can only be called once at a time // // TODO(#375): This should be rewritten to allow concurrent calls. The // difficulty is in ensuring that we correctly annotate events with the correct // state deltas when sending to kafka streams -func processRoomEvent( +func (r *RoomserverInternalAPI) processRoomEvent( ctx context.Context, - db storage.Database, - ow OutputRoomEventWriter, input api.InputRoomEvent, ) (eventID string, err error) { // Parse and validate the event JSON @@ -54,7 +46,7 @@ func processRoomEvent( // Check that the event passes authentication checks and work out // the numeric IDs for the auth events. - authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) + authEventNIDs, err := checkAuthEvents(ctx, r.DB, headered, input.AuthEventIDs) if err != nil { logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event") return @@ -63,7 +55,7 @@ func processRoomEvent( // If we don't have a transaction ID then get one. if input.TransactionID != nil { tdID := input.TransactionID - eventID, err = db.GetTransactionEventID( + eventID, err = r.DB.GetTransactionEventID( ctx, tdID.TransactionID, tdID.SessionID, event.Sender(), ) // On error OR event with the transaction already processed/processesing @@ -73,7 +65,7 @@ func processRoomEvent( } // Store the event. - roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) + roomNID, stateAtEvent, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { return } @@ -93,16 +85,14 @@ func processRoomEvent( if stateAtEvent.BeforeStateSnapshotNID == 0 { // We haven't calculated a state for this event yet. // Lets calculate one. - err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event) + err = r.calculateAndSetState(ctx, input, roomNID, &stateAtEvent, event) if err != nil { return } } - if err = updateLatestEvents( + if err = r.updateLatestEvents( ctx, // context - db, // roomserver database - ow, // output event writer roomNID, // room NID to update stateAtEvent, // state at event (below) event, // event @@ -116,29 +106,36 @@ func processRoomEvent( return event.EventID(), nil } -func calculateAndSetState( +func (r *RoomserverInternalAPI) calculateAndSetState( ctx context.Context, - db storage.Database, input api.InputRoomEvent, roomNID types.RoomNID, stateAtEvent *types.StateAtEvent, event gomatrixserverlib.Event, ) error { var err error - roomState := state.NewStateResolution(db) + roomState := state.NewStateResolution(r.DB) if input.HasState { - // TODO: Check here if we think we're in the room already. + // Check here if we think we're in the room already. stateAtEvent.Overwrite = true + var joinEventNIDs []types.EventNID + // Request join memberships only for local users only. + if joinEventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true, true); err == nil { + // If we have no local users that are joined to the room then any state about + // the room that we have is quite possibly out of date. Therefore in that case + // we should overwrite it rather than merge it. + stateAtEvent.Overwrite = len(joinEventNIDs) == 0 + } // We've been told what the state at the event is so we don't need to calculate it. // Check that those state events are in the database and store the state. var entries []types.StateEntry - if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { + if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { return err } - if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil { + if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil { return err } } else { @@ -149,12 +146,11 @@ func calculateAndSetState( return err } } - return db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) + return r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) } -func processInviteEvent( +func (r *RoomserverInternalAPI) processInviteEvent( ctx context.Context, - db storage.Database, ow *RoomserverInternalAPI, input api.InputInviteEvent, ) (*api.InputRoomEvent, error) { @@ -172,7 +168,10 @@ func processInviteEvent( "target_user_id": targetUserID, }).Info("processing invite event") - updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion) + _, domain, _ := gomatrixserverlib.SplitID('@', targetUserID) + isTargetLocalUser := domain == r.Cfg.Matrix.ServerName + + updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocalUser, input.RoomVersion) if err != nil { return nil, err } @@ -239,7 +238,7 @@ func processInviteEvent( // up from local data (which is most likely to be if the event came // from the CS API). If we know about the room then we can insert // the invite room state, if we don't then we just fail quietly. - if irs, ierr := buildInviteStrippedState(ctx, db, input); ierr == nil { + if irs, ierr := buildInviteStrippedState(ctx, r.DB, input); ierr == nil { if err = event.SetUnsignedField("invite_room_state", irs); err != nil { return nil, err } diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index 6eeeedab0..d7c9a5cb6 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" - "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -46,17 +45,15 @@ import ( // 7 <----- latest // // Can only be called once at a time -func updateLatestEvents( +func (r *RoomserverInternalAPI) updateLatestEvents( ctx context.Context, - db storage.Database, - ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, sendAsServer string, transactionID *api.TransactionID, ) (err error) { - updater, err := db.GetLatestEventsForUpdate(ctx, roomNID) + updater, err := r.DB.GetLatestEventsForUpdate(ctx, roomNID) if err != nil { return } @@ -70,9 +67,8 @@ func updateLatestEvents( u := latestEventsUpdater{ ctx: ctx, - db: db, + api: r, updater: updater, - ow: ow, roomNID: roomNID, stateAtEvent: stateAtEvent, event: event, @@ -94,9 +90,8 @@ func updateLatestEvents( // when there are so many variables to pass around. type latestEventsUpdater struct { ctx context.Context - db storage.Database + api *RoomserverInternalAPI updater types.RoomRecentEventsUpdater - ow OutputRoomEventWriter roomNID types.RoomNID stateAtEvent types.StateAtEvent event gomatrixserverlib.Event @@ -181,7 +176,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // If we need to generate any output events then here's where we do it. // TODO: Move this! - updates, err := updateMemberships(u.ctx, u.db, u.updater, u.removed, u.added) + updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added) if err != nil { return err } @@ -200,7 +195,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { return err } @@ -213,7 +208,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { func (u *latestEventsUpdater) latestState() error { var err error - roomState := state.NewStateResolution(u.db) + roomState := state.NewStateResolution(u.api.DB) // Get a list of the current latest events. latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)) @@ -303,7 +298,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) latestEventIDs[i] = u.latest[i].EventID } - roomVersion, err := u.db.GetRoomVersionForRoom(u.ctx, u.event.RoomID()) + roomVersion, err := u.api.DB.GetRoomVersionForRoom(u.ctx, u.event.RoomID()) if err != nil { return nil, err } @@ -329,7 +324,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) stateEventNIDs = append(stateEventNIDs, entry.EventNID) } stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] - eventIDMap, err := u.db.EventIDs(u.ctx, stateEventNIDs) + eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs) if err != nil { return nil, err } diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input_membership.go index 666e7ebcc..af0c7f8b3 100644 --- a/roomserver/internal/input_membership.go +++ b/roomserver/internal/input_membership.go @@ -19,7 +19,6 @@ import ( "fmt" "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -28,9 +27,8 @@ import ( // user affected by a change in the current state of the room. // Returns a list of output events to write to the kafka log to inform the // consumers about the invites added or retired by the change in current state. -func updateMemberships( +func (r *RoomserverInternalAPI) updateMemberships( ctx context.Context, - db storage.Database, updater types.RoomRecentEventsUpdater, removed, added []types.StateEntry, ) ([]api.OutputEvent, error) { @@ -48,7 +46,7 @@ func updateMemberships( // Load the event JSON so we can look up the "membership" key. // TODO: Maybe add a membership key to the events table so we can load that // key without having to load the entire event JSON? - events, err := db.Events(ctx, eventNIDs) + events, err := r.DB.Events(ctx, eventNIDs) if err != nil { return nil, err } @@ -71,15 +69,16 @@ func updateMemberships( ae = &ev.Event } } - if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil { + if updates, err = r.updateMembership(updater, targetUserNID, re, ae, updates); err != nil { return nil, err } } return updates, nil } -func updateMembership( - updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID, +func (r *RoomserverInternalAPI) updateMembership( + updater types.RoomRecentEventsUpdater, + targetUserNID types.EventStateKeyNID, remove, add *gomatrixserverlib.Event, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { @@ -113,7 +112,7 @@ func updateMembership( return updates, nil } - mu, err := updater.MembershipUpdater(targetUserNID) + mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add)) if err != nil { return nil, err } @@ -132,6 +131,15 @@ func updateMembership( } } +func (r *RoomserverInternalAPI) isLocalTarget(event *gomatrixserverlib.Event) bool { + isTargetLocalUser := false + if statekey := event.StateKey(); statekey != nil { + _, domain, _ := gomatrixserverlib.SplitID('@', *statekey) + isTargetLocalUser = domain == r.Cfg.Matrix.ServerName + } + return isTargetLocalUser +} + func updateToInviteMembership( mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, roomVersion gomatrixserverlib.RoomVersion, diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index 2d1c21c57..fce2ae907 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -267,7 +267,7 @@ func (r *RoomserverInternalAPI) QueryMembershipsForRoom( var stateEntries []types.StateEntry if stillInRoom { var eventNIDs []types.EventNID - eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, request.JoinedOnly) + eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, request.JoinedOnly, false) if err != nil { return err } @@ -591,7 +591,7 @@ func (r *RoomserverInternalAPI) isServerCurrentlyInRoom(ctx context.Context, ser return false, err } - eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true) + eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false) if err != nil { return false, err } diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/query_backfill.go index 49e0af34a..23ae9455a 100644 --- a/roomserver/internal/query_backfill.go +++ b/roomserver/internal/query_backfill.go @@ -297,7 +297,7 @@ func joinEventsFromHistoryVisibility( if err != nil { return nil, err } - joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true) + joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false) if err != nil { return nil, err } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index fb39eca63..1e0232d20 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -83,9 +83,9 @@ type Database interface { GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) GetCreatorIDForAlias(ctx context.Context, alias string) (string, error) RemoveRoomAlias(ctx context.Context, alias string) error - MembershipUpdater(ctx context.Context, roomID, targetUserID string, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error) + MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error) - GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error) + GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) GetRoomVersionForRoom(ctx context.Context, roomID string) (gomatrixserverlib.RoomVersion, error) } diff --git a/roomserver/storage/postgres/membership_table.go b/roomserver/storage/postgres/membership_table.go index 9c8a4c259..820ef4e71 100644 --- a/roomserver/storage/postgres/membership_table.go +++ b/roomserver/storage/postgres/membership_table.go @@ -59,6 +59,10 @@ CREATE TABLE IF NOT EXISTS roomserver_membership ( -- This NID is updated if the join event gets updated (e.g. profile update), -- or if the user leaves/joins the room. event_nid BIGINT NOT NULL DEFAULT 0, + -- Local target is true if the target_nid refers to a local user rather than + -- a federated one. This is an optimisation for resetting state on federated + -- room joins. + target_local BOOLEAN NOT NULL DEFAULT false, UNIQUE (room_nid, target_nid) ); ` @@ -66,8 +70,8 @@ CREATE TABLE IF NOT EXISTS roomserver_membership ( // Insert a row in to membership table so that it can be locked by the // SELECT FOR UPDATE const insertMembershipSQL = "" + - "INSERT INTO roomserver_membership (room_nid, target_nid)" + - " VALUES ($1, $2)" + + "INSERT INTO roomserver_membership (room_nid, target_nid, target_local)" + + " VALUES ($1, $2, $3)" + " ON CONFLICT DO NOTHING" const selectMembershipFromRoomAndTargetSQL = "" + @@ -78,10 +82,20 @@ const selectMembershipsFromRoomAndMembershipSQL = "" + "SELECT event_nid FROM roomserver_membership" + " WHERE room_nid = $1 AND membership_nid = $2" +const selectLocalMembershipsFromRoomAndMembershipSQL = "" + + "SELECT event_nid FROM roomserver_membership" + + " WHERE room_nid = $1 AND membership_nid = $2" + + " AND target_local = true" + const selectMembershipsFromRoomSQL = "" + "SELECT event_nid FROM roomserver_membership" + " WHERE room_nid = $1" +const selectLocalMembershipsFromRoomSQL = "" + + "SELECT event_nid FROM roomserver_membership" + + " WHERE room_nid = $1" + + " AND target_local = true" + const selectMembershipForUpdateSQL = "" + "SELECT membership_nid FROM roomserver_membership" + " WHERE room_nid = $1 AND target_nid = $2 FOR UPDATE" @@ -91,12 +105,14 @@ const updateMembershipSQL = "" + " WHERE room_nid = $1 AND target_nid = $2" type membershipStatements struct { - insertMembershipStmt *sql.Stmt - selectMembershipForUpdateStmt *sql.Stmt - selectMembershipFromRoomAndTargetStmt *sql.Stmt - selectMembershipsFromRoomAndMembershipStmt *sql.Stmt - selectMembershipsFromRoomStmt *sql.Stmt - updateMembershipStmt *sql.Stmt + insertMembershipStmt *sql.Stmt + selectMembershipForUpdateStmt *sql.Stmt + selectMembershipFromRoomAndTargetStmt *sql.Stmt + selectMembershipsFromRoomAndMembershipStmt *sql.Stmt + selectLocalMembershipsFromRoomAndMembershipStmt *sql.Stmt + selectMembershipsFromRoomStmt *sql.Stmt + selectLocalMembershipsFromRoomStmt *sql.Stmt + updateMembershipStmt *sql.Stmt } func (s *membershipStatements) prepare(db *sql.DB) (err error) { @@ -110,7 +126,9 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) { {&s.selectMembershipForUpdateStmt, selectMembershipForUpdateSQL}, {&s.selectMembershipFromRoomAndTargetStmt, selectMembershipFromRoomAndTargetSQL}, {&s.selectMembershipsFromRoomAndMembershipStmt, selectMembershipsFromRoomAndMembershipSQL}, + {&s.selectLocalMembershipsFromRoomAndMembershipStmt, selectLocalMembershipsFromRoomAndMembershipSQL}, {&s.selectMembershipsFromRoomStmt, selectMembershipsFromRoomSQL}, + {&s.selectLocalMembershipsFromRoomStmt, selectLocalMembershipsFromRoomSQL}, {&s.updateMembershipStmt, updateMembershipSQL}, }.prepare(db) } @@ -118,9 +136,10 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) { func (s *membershipStatements) insertMembership( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, + localTarget bool, ) error { stmt := common.TxStmt(txn, s.insertMembershipStmt) - _, err := stmt.ExecContext(ctx, roomNID, targetUserNID) + _, err := stmt.ExecContext(ctx, roomNID, targetUserNID, localTarget) return err } @@ -145,9 +164,15 @@ func (s *membershipStatements) selectMembershipFromRoomAndTarget( } func (s *membershipStatements) selectMembershipsFromRoom( - ctx context.Context, roomNID types.RoomNID, + ctx context.Context, roomNID types.RoomNID, localOnly bool, ) (eventNIDs []types.EventNID, err error) { - rows, err := s.selectMembershipsFromRoomStmt.QueryContext(ctx, roomNID) + var stmt *sql.Stmt + if localOnly { + stmt = s.selectLocalMembershipsFromRoomStmt + } else { + stmt = s.selectMembershipsFromRoomStmt + } + rows, err := stmt.QueryContext(ctx, roomNID) if err != nil { return } @@ -165,10 +190,16 @@ func (s *membershipStatements) selectMembershipsFromRoom( func (s *membershipStatements) selectMembershipsFromRoomAndMembership( ctx context.Context, - roomNID types.RoomNID, membership membershipState, + roomNID types.RoomNID, membership membershipState, localOnly bool, ) (eventNIDs []types.EventNID, err error) { - stmt := s.selectMembershipsFromRoomAndMembershipStmt - rows, err := stmt.QueryContext(ctx, roomNID, membership) + var rows *sql.Rows + var stmt *sql.Stmt + if localOnly { + stmt = s.selectLocalMembershipsFromRoomAndMembershipStmt + } else { + stmt = s.selectMembershipsFromRoomAndMembershipStmt + } + rows, err = stmt.QueryContext(ctx, roomNID, membership) if err != nil { return } diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index 1d825ecc2..d451d6650 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -459,8 +459,8 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error return u.d.statements.updateEventSentToOutput(u.ctx, u.txn, eventNID) } -func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (types.MembershipUpdater, error) { - return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID) +func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (types.MembershipUpdater, error) { + return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID, targetLocal) } // RoomNID implements query.RoomserverQueryAPIDB @@ -558,7 +558,7 @@ func (d *Database) StateEntriesForTuples( // MembershipUpdater implements input.RoomEventDatabase func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, - roomVersion gomatrixserverlib.RoomVersion, + targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, ) (types.MembershipUpdater, error) { txn, err := d.db.Begin() if err != nil { @@ -581,7 +581,7 @@ func (d *Database) MembershipUpdater( return nil, err } - updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID) + updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) if err != nil { return nil, err } @@ -603,9 +603,10 @@ func (d *Database) membershipUpdaterTxn( txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, + targetLocal bool, ) (types.MembershipUpdater, error) { - if err := d.statements.insertMembership(ctx, txn, roomNID, targetUserNID); err != nil { + if err := d.statements.insertMembership(ctx, txn, roomNID, targetUserNID, targetLocal); err != nil { return nil, err } @@ -748,15 +749,15 @@ func (d *Database) GetMembership( // GetMembershipEventNIDsForRoom implements query.RoomserverQueryAPIDB func (d *Database) GetMembershipEventNIDsForRoom( - ctx context.Context, roomNID types.RoomNID, joinOnly bool, + ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool, ) ([]types.EventNID, error) { if joinOnly { return d.statements.selectMembershipsFromRoomAndMembership( - ctx, roomNID, membershipStateJoin, + ctx, roomNID, membershipStateJoin, localOnly, ) } - return d.statements.selectMembershipsFromRoom(ctx, roomNID) + return d.statements.selectMembershipsFromRoom(ctx, roomNID, localOnly) } // EventsFromIDs implements query.RoomserverQueryAPIEventDB diff --git a/roomserver/storage/sqlite3/membership_table.go b/roomserver/storage/sqlite3/membership_table.go index 7ae28e4b8..ca4d8fbe9 100644 --- a/roomserver/storage/sqlite3/membership_table.go +++ b/roomserver/storage/sqlite3/membership_table.go @@ -38,6 +38,7 @@ const membershipSchema = ` sender_nid INTEGER NOT NULL DEFAULT 0, membership_nid INTEGER NOT NULL DEFAULT 1, event_nid INTEGER NOT NULL DEFAULT 0, + target_local BOOLEAN NOT NULL DEFAULT false, UNIQUE (room_nid, target_nid) ); ` @@ -45,8 +46,8 @@ const membershipSchema = ` // Insert a row in to membership table so that it can be locked by the // SELECT FOR UPDATE const insertMembershipSQL = "" + - "INSERT INTO roomserver_membership (room_nid, target_nid)" + - " VALUES ($1, $2)" + + "INSERT INTO roomserver_membership (room_nid, target_nid, target_local)" + + " VALUES ($1, $2, $3)" + " ON CONFLICT DO NOTHING" const selectMembershipFromRoomAndTargetSQL = "" + @@ -57,10 +58,20 @@ const selectMembershipsFromRoomAndMembershipSQL = "" + "SELECT event_nid FROM roomserver_membership" + " WHERE room_nid = $1 AND membership_nid = $2" +const selectLocalMembershipsFromRoomAndMembershipSQL = "" + + "SELECT event_nid FROM roomserver_membership" + + " WHERE room_nid = $1 AND membership_nid = $2" + + " AND target_local = true" + const selectMembershipsFromRoomSQL = "" + "SELECT event_nid FROM roomserver_membership" + " WHERE room_nid = $1" +const selectLocalMembershipsFromRoomSQL = "" + + "SELECT event_nid FROM roomserver_membership" + + " WHERE room_nid = $1" + + " AND target_local = true" + const selectMembershipForUpdateSQL = "" + "SELECT membership_nid FROM roomserver_membership" + " WHERE room_nid = $1 AND target_nid = $2" @@ -70,12 +81,14 @@ const updateMembershipSQL = "" + " WHERE room_nid = $4 AND target_nid = $5" type membershipStatements struct { - insertMembershipStmt *sql.Stmt - selectMembershipForUpdateStmt *sql.Stmt - selectMembershipFromRoomAndTargetStmt *sql.Stmt - selectMembershipsFromRoomAndMembershipStmt *sql.Stmt - selectMembershipsFromRoomStmt *sql.Stmt - updateMembershipStmt *sql.Stmt + insertMembershipStmt *sql.Stmt + selectMembershipForUpdateStmt *sql.Stmt + selectMembershipFromRoomAndTargetStmt *sql.Stmt + selectMembershipsFromRoomAndMembershipStmt *sql.Stmt + selectLocalMembershipsFromRoomAndMembershipStmt *sql.Stmt + selectMembershipsFromRoomStmt *sql.Stmt + selectLocalMembershipsFromRoomStmt *sql.Stmt + updateMembershipStmt *sql.Stmt } func (s *membershipStatements) prepare(db *sql.DB) (err error) { @@ -89,7 +102,9 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) { {&s.selectMembershipForUpdateStmt, selectMembershipForUpdateSQL}, {&s.selectMembershipFromRoomAndTargetStmt, selectMembershipFromRoomAndTargetSQL}, {&s.selectMembershipsFromRoomAndMembershipStmt, selectMembershipsFromRoomAndMembershipSQL}, + {&s.selectLocalMembershipsFromRoomAndMembershipStmt, selectLocalMembershipsFromRoomAndMembershipSQL}, {&s.selectMembershipsFromRoomStmt, selectMembershipsFromRoomSQL}, + {&s.selectLocalMembershipsFromRoomStmt, selectLocalMembershipsFromRoomSQL}, {&s.updateMembershipStmt, updateMembershipSQL}, }.prepare(db) } @@ -97,9 +112,10 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) { func (s *membershipStatements) insertMembership( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, + localTarget bool, ) error { stmt := common.TxStmt(txn, s.insertMembershipStmt) - _, err := stmt.ExecContext(ctx, roomNID, targetUserNID) + _, err := stmt.ExecContext(ctx, roomNID, targetUserNID, localTarget) return err } @@ -127,9 +143,14 @@ func (s *membershipStatements) selectMembershipFromRoomAndTarget( func (s *membershipStatements) selectMembershipsFromRoom( ctx context.Context, txn *sql.Tx, - roomNID types.RoomNID, + roomNID types.RoomNID, localOnly bool, ) (eventNIDs []types.EventNID, err error) { - selectStmt := common.TxStmt(txn, s.selectMembershipsFromRoomStmt) + var selectStmt *sql.Stmt + if localOnly { + selectStmt = common.TxStmt(txn, s.selectLocalMembershipsFromRoomStmt) + } else { + selectStmt = common.TxStmt(txn, s.selectMembershipsFromRoomStmt) + } rows, err := selectStmt.QueryContext(ctx, roomNID) if err != nil { return nil, err @@ -145,11 +166,17 @@ func (s *membershipStatements) selectMembershipsFromRoom( } return } + func (s *membershipStatements) selectMembershipsFromRoomAndMembership( ctx context.Context, txn *sql.Tx, - roomNID types.RoomNID, membership membershipState, + roomNID types.RoomNID, membership membershipState, localOnly bool, ) (eventNIDs []types.EventNID, err error) { - stmt := common.TxStmt(txn, s.selectMembershipsFromRoomAndMembershipStmt) + var stmt *sql.Stmt + if localOnly { + stmt = common.TxStmt(txn, s.selectLocalMembershipsFromRoomAndMembershipStmt) + } else { + stmt = common.TxStmt(txn, s.selectMembershipsFromRoomAndMembershipStmt) + } rows, err := stmt.QueryContext(ctx, roomNID, membership) if err != nil { return diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index e77fea9cf..209922fa2 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -569,9 +569,9 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error return err } -func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (mu types.MembershipUpdater, err error) { +func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (mu types.MembershipUpdater, err error) { err = common.WithTransaction(u.d.db, func(txn *sql.Tx) error { - mu, err = u.d.membershipUpdaterTxn(u.ctx, txn, u.roomNID, targetUserNID) + mu, err = u.d.membershipUpdaterTxn(u.ctx, txn, u.roomNID, targetUserNID, targetLocal) return err }) return @@ -680,7 +680,7 @@ func (d *Database) StateEntriesForTuples( // MembershipUpdater implements input.RoomEventDatabase func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, - roomVersion gomatrixserverlib.RoomVersion, + targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, ) (updater types.MembershipUpdater, err error) { var txn *sql.Tx txn, err = d.db.Begin() @@ -716,7 +716,7 @@ func (d *Database) MembershipUpdater( return nil, err } - updater, err = d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID) + updater, err = d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) if err != nil { return nil, err } @@ -738,9 +738,10 @@ func (d *Database) membershipUpdaterTxn( txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, + targetLocal bool, ) (types.MembershipUpdater, error) { - if err := d.statements.insertMembership(ctx, txn, roomNID, targetUserNID); err != nil { + if err := d.statements.insertMembership(ctx, txn, roomNID, targetUserNID, targetLocal); err != nil { return nil, err } @@ -896,17 +897,17 @@ func (d *Database) GetMembership( // GetMembershipEventNIDsForRoom implements query.RoomserverQueryAPIDB func (d *Database) GetMembershipEventNIDsForRoom( - ctx context.Context, roomNID types.RoomNID, joinOnly bool, + ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool, ) (eventNIDs []types.EventNID, err error) { err = common.WithTransaction(d.db, func(txn *sql.Tx) error { if joinOnly { eventNIDs, err = d.statements.selectMembershipsFromRoomAndMembership( - ctx, txn, roomNID, membershipStateJoin, + ctx, txn, roomNID, membershipStateJoin, localOnly, ) return nil } - eventNIDs, err = d.statements.selectMembershipsFromRoom(ctx, txn, roomNID) + eventNIDs, err = d.statements.selectMembershipsFromRoom(ctx, txn, roomNID, localOnly) return nil }) return diff --git a/roomserver/types/types.go b/roomserver/types/types.go index da83f614c..74e6b0784 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -172,7 +172,7 @@ type RoomRecentEventsUpdater interface { MarkEventAsSent(eventNID EventNID) error // Build a membership updater for the target user in this room. // It will share the same transaction as this updater. - MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error) + MembershipUpdater(targetUserNID EventStateKeyNID, isTargetLocalUser bool) (MembershipUpdater, error) // Implements Transaction so it can be committed or rolledback common.Transaction } From 7d91ef0616c7eccd7d32bd30c1ee2d779d983d12 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 20 May 2020 18:31:02 +0100 Subject: [PATCH 6/6] This now passes on sytest/develop --- sytest-whitelist | 1 + 1 file changed, 1 insertion(+) diff --git a/sytest-whitelist b/sytest-whitelist index d5b4feaf6..c022b16c3 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -283,3 +283,4 @@ Outbound federation can backfill events Inbound federation can backfill events Backfill checks the events requested belong to the room Backfilled events whose prev_events are in a different room do not allow cross-room back-pagination +Outbound federation can request missing events