From 3e6d0a6246ed1bc10760c214fce762e6880bc5be Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Mon, 29 Jul 2019 15:18:21 +0800 Subject: [PATCH 1/6] Add newly passing tests from matrix-org/sytest 56de891 (#769) --- testfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testfile b/testfile index 4003638f1..2f9229e87 100644 --- a/testfile +++ b/testfile @@ -154,3 +154,5 @@ POST /login returns the same device_id as that in the request POST /createRoom with creation content User can create and send/receive messages in a room with version 1 POST /createRoom ignores attempts to set the room version via creation_content +Inbound federation rejects remote attempts to join local users to rooms +Inbound federation rejects remote attempts to kick local users to rooms From 40e44c5f3b445b2e5bb7dc98d991a9177d0f2abb Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Wed, 31 Jul 2019 20:45:45 +0800 Subject: [PATCH 2/6] Add newly passing tests from matrix-org/sytest (#771) Signed-off-by: Alex Chen --- testfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testfile b/testfile index 2f9229e87..c23dda662 100644 --- a/testfile +++ b/testfile @@ -156,3 +156,5 @@ User can create and send/receive messages in a room with version 1 POST /createRoom ignores attempts to set the room version via creation_content Inbound federation rejects remote attempts to join local users to rooms Inbound federation rejects remote attempts to kick local users to rooms +An event which redacts itself should be ignored +A pair of events which redact each other should be ignored From 3e1abe9ad3464a6c47dfcf003bb4bf6422c820af Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Wed, 31 Jul 2019 21:20:11 +0800 Subject: [PATCH 3/6] Fix /sync may contain duplicate EDUs and EDUs for left rooms (#752) In 29841be (#718), EDUs are added to /sync responses for rooms listed in joinedRoomIDs returned by addPDUDeltaToResponse. However this list may contain rooms other than those currently joined. Some variable renamings are done to make golangci-lint pass. Signed-off-by: Alex Chen minecnly@gmail.com --- syncapi/storage/syncserver.go | 44 +++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index b4d7ccbd2..20fa8a4e0 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -35,6 +35,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +const ( + membershipJoin = "join" + membershipLeave = "leave" + membershipBan = "ban" +) + type stateDelta struct { roomID string stateEvents []gomatrixserverlib.Event @@ -248,14 +254,12 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + deltas, joinedRoomIDs, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) if err != nil { return nil, err } - joinedRoomIDs := make([]string, 0, len(deltas)) for _, delta := range deltas { - joinedRoomIDs = append(joinedRoomIDs, delta.roomID) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err @@ -344,7 +348,7 @@ func (d *SyncServerDatasource) IncrementalSync( ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( - ctx, nil, device.UserID, "join", + ctx, nil, device.UserID, membershipJoin, ) } if err != nil { @@ -393,7 +397,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( res = types.NewResponse(toPos) // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { return } @@ -571,7 +575,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( res *types.Response, ) error { endPos := toPos - if delta.membershipPos > 0 && delta.membership == "leave" { + if delta.membershipPos > 0 && delta.membership == membershipLeave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). @@ -595,7 +599,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( } switch delta.membership { - case "join": + case membershipJoin: jr := types.NewJoinResponse() if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { // Use the short form of batch token for prev_batch @@ -608,9 +612,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr - case "leave": + case membershipLeave: fallthrough // transitions to leave are the same as ban - case "ban": + case membershipBan: // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() @@ -716,10 +720,14 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( return events, nil } +// getStateDeltas returns the state deltas between fromPos and toPos, +// exclusive of oldPos, inclusive of newPos, for the rooms in which +// the user has new membership events. +// A list of joined room IDs is also returned in case the caller needs it. func (d *SyncServerDatasource) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, fromPos, toPos int64, userID string, -) ([]stateDelta, error) { +) ([]stateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -733,11 +741,11 @@ func (d *SyncServerDatasource) getStateDeltas( // get all the state events ever between these two positions stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) if err != nil { - return nil, err + return nil, nil, err } state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) if err != nil { - return nil, err + return nil, nil, err } for roomID, stateStreamEvents := range state { @@ -748,12 +756,12 @@ func (d *SyncServerDatasource) getStateDeltas( // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the timeline. if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { - if membership == "join" { + if membership == membershipJoin { // send full room state down instead of a delta var allState []gomatrixserverlib.Event allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) if err != nil { - return nil, err + return nil, nil, err } s := make([]streamEvent, len(allState)) for i := 0; i < len(s); i++ { @@ -775,19 +783,19 @@ func (d *SyncServerDatasource) getStateDeltas( } // Add in currently joined rooms - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { - return nil, err + return nil, nil, err } for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, stateDelta{ - membership: "join", + membership: membershipJoin, stateEvents: streamEventsToEvents(device, state[joinedRoomID]), roomID: joinedRoomID, }) } - return deltas, nil + return deltas, joinedRoomIDs, nil } // streamEventsToEvents converts streamEvent to Event. If device is non-nil and From 92db6cd0eaecbd2c27f83488a176ef76ecf16f36 Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Wed, 31 Jul 2019 21:36:21 +0800 Subject: [PATCH 4/6] Fix index in invites_table.go (#770) This PR fixes a possible typo in an index created in invites_table.go. Signed-off-by: Alex Chen minecnly@gmail.com --- syncapi/storage/invites_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncapi/storage/invites_table.go b/syncapi/storage/invites_table.go index 88c98f7e3..9f52087f6 100644 --- a/syncapi/storage/invites_table.go +++ b/syncapi/storage/invites_table.go @@ -23,7 +23,7 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx -- For deleting old invites CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx - ON syncapi_invite_events(target_user_id, id); + ON syncapi_invite_events (event_id); ` const insertInviteEventSQL = "" + From 3578d77d259c852a16dc430725d348e2c62c4ff5 Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Thu, 1 Aug 2019 12:36:13 +0800 Subject: [PATCH 6/6] Implement "full_state" query parameter for /sync (#751) Closes #637. --- syncapi/storage/syncserver.go | 133 +++++++++++++++++++++++++++------- syncapi/sync/requestpool.go | 12 ++- testfile | 1 + 3 files changed, 117 insertions(+), 29 deletions(-) diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index 20fa8a4e0..e914bddfe 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -241,6 +241,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( device authtypes.Device, fromPos, toPos int64, numRecentEventsPerRoom int, + wantFullState bool, res *types.Response, ) ([]string, error) { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) @@ -254,7 +255,13 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, joinedRoomIDs, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + var deltas []stateDelta + var joinedRoomIDs []string + if !wantFullState { + deltas, joinedRoomIDs, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + } else { + deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID) + } if err != nil { return nil, err } @@ -336,15 +343,16 @@ func (d *SyncServerDatasource) IncrementalSync( device authtypes.Device, fromPos, toPos types.SyncPosition, numRecentEventsPerRoom int, + wantFullState bool, ) (*types.Response, error) { nextBatchPos := fromPos.WithUpdates(toPos) res := types.NewResponse(nextBatchPos) var joinedRoomIDs []string var err error - if fromPos.PDUPosition != toPos.PDUPosition { + if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, + ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res, ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( @@ -593,21 +601,30 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( recentEvents := streamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - // Don't bother appending empty room entries - if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { - return nil + var prevPDUPos int64 + + if len(recentEvents) == 0 { + if len(delta.stateEvents) == 0 { + // Don't bother appending empty room entries + return nil + } + + // If full_state=true and since is already up to date, then we'll have + // state events but no recent events. + prevPDUPos = toPos - 1 + } else { + prevPDUPos = recentStreamEvents[0].streamPosition - 1 + } + + if prevPDUPos <= 0 { + prevPDUPos = 1 } switch delta.membership { case membershipJoin: jr := types.NewJoinResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -618,13 +635,8 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -758,15 +770,11 @@ func (d *SyncServerDatasource) getStateDeltas( if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership == membershipJoin { // send full room state down instead of a delta - var allState []gomatrixserverlib.Event - allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + var s []streamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID) if err != nil { return nil, nil, err } - s := make([]streamEvent, len(allState)) - for i := 0; i < len(s); i++ { - s[i] = streamEvent{Event: allState[i], streamPosition: 0} - } state[roomID] = s continue // we'll add this room in when we do joined rooms } @@ -798,6 +806,79 @@ func (d *SyncServerDatasource) getStateDeltas( return deltas, joinedRoomIDs, nil } +// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// requests with full_state=true. +// Fetches full state for all joined rooms and uses selectStateInRange to get +// updates for other rooms. +func (d *SyncServerDatasource) getStateDeltasForFullStateSync( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos int64, userID string, +) ([]stateDelta, []string, error) { + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + if err != nil { + return nil, nil, err + } + + // Use a reasonable initial capacity + deltas := make([]stateDelta, 0, len(joinedRoomIDs)) + + // Add full states for all joined rooms + for _, joinedRoomID := range joinedRoomIDs { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID) + if stateErr != nil { + return nil, nil, stateErr + } + deltas = append(deltas, stateDelta{ + membership: "join", + stateEvents: streamEventsToEvents(device, s), + roomID: joinedRoomID, + }) + } + + // Get all the state events ever between these two positions + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) + if err != nil { + return nil, nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership != "join" { // We've already added full state for all joined rooms above. + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + } + + break + } + } + } + + return deltas, joinedRoomIDs, nil +} + +func (d *SyncServerDatasource) currentStateStreamEventsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) ([]streamEvent, error) { + allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{Event: allState[i], streamPosition: 0} + } + return s, nil +} + // streamEventsToEvents converts streamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a6ec6bd92..d773a4606 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -65,8 +65,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos := rp.notifier.CurrentPosition() - // If this is an initial sync or timeout=0 we return immediately - if syncReq.since == nil || syncReq.timeout == 0 { + if shouldReturnImmediately(syncReq) { syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) @@ -135,7 +134,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState) } if err != nil { @@ -216,3 +215,10 @@ func (rp *RequestPool) appendAccountData( return data, nil } + +// shouldReturnImmediately returns whether the /sync request is an initial sync, +// or timeout=0, or full_state=true, in any of the cases the request should +// return immediately. +func shouldReturnImmediately(syncReq *syncRequest) bool { + return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState +} diff --git a/testfile b/testfile index c23dda662..4c5163e5e 100644 --- a/testfile +++ b/testfile @@ -158,3 +158,4 @@ Inbound federation rejects remote attempts to join local users to rooms Inbound federation rejects remote attempts to kick local users to rooms An event which redacts itself should be ignored A pair of events which redact each other should be ignored +Full state sync includes joined rooms