Merge branch 'master' into event-redaction

Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
Cnly 2019-08-01 12:41:58 +08:00
commit f3bc28fa66
4 changed files with 143 additions and 47 deletions

View file

@ -23,7 +23,7 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx
-- For deleting old invites -- For deleting old invites
CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
ON syncapi_invite_events(target_user_id, id); ON syncapi_invite_events (event_id);
` `
const insertInviteEventSQL = "" + const insertInviteEventSQL = "" +

View file

@ -35,6 +35,12 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
const (
membershipJoin = "join"
membershipLeave = "leave"
membershipBan = "ban"
)
type stateDelta struct { type stateDelta struct {
roomID string roomID string
stateEvents []gomatrixserverlib.Event stateEvents []gomatrixserverlib.Event
@ -274,6 +280,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
device authtypes.Device, device authtypes.Device,
fromPos, toPos int64, fromPos, toPos int64,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
wantFullState bool,
res *types.Response, res *types.Response,
) ([]string, error) { ) ([]string, error) {
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
@ -287,14 +294,18 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
// This works out what the 'state' key should be for each room as well as which membership block // This works out what the 'state' key should be for each room as well as which membership block
// to put the room into. // to put the room into.
deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) var deltas []stateDelta
var joinedRoomIDs []string
if !wantFullState {
deltas, joinedRoomIDs, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
} else {
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
joinedRoomIDs := make([]string, 0, len(deltas))
for _, delta := range deltas { for _, delta := range deltas {
joinedRoomIDs = append(joinedRoomIDs, delta.roomID)
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
if err != nil { if err != nil {
return nil, err return nil, err
@ -371,19 +382,20 @@ func (d *SyncServerDatasource) IncrementalSync(
device authtypes.Device, device authtypes.Device,
fromPos, toPos types.SyncPosition, fromPos, toPos types.SyncPosition,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
wantFullState bool,
) (*types.Response, error) { ) (*types.Response, error) {
nextBatchPos := fromPos.WithUpdates(toPos) nextBatchPos := fromPos.WithUpdates(toPos)
res := types.NewResponse(nextBatchPos) res := types.NewResponse(nextBatchPos)
var joinedRoomIDs []string var joinedRoomIDs []string
var err error var err error
if fromPos.PDUPosition != toPos.PDUPosition { if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
joinedRoomIDs, err = d.addPDUDeltaToResponse( joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res,
) )
} else { } else {
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
ctx, nil, device.UserID, "join", ctx, nil, device.UserID, membershipJoin,
) )
} }
if err != nil { if err != nil {
@ -432,7 +444,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
res = types.NewResponse(toPos) res = types.NewResponse(toPos)
// Extract room state and recent events for all rooms the user is joined to. // Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin)
if err != nil { if err != nil {
return return
} }
@ -635,7 +647,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
res *types.Response, res *types.Response,
) error { ) error {
endPos := toPos endPos := toPos
if delta.membershipPos > 0 && delta.membership == "leave" { if delta.membershipPos > 0 && delta.membership == membershipLeave {
// make sure we don't leak recent events after the leave event. // make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
@ -653,9 +665,23 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
recentEvents := streamEventsToEvents(device, recentStreamEvents) recentEvents := streamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
// Don't bother appending empty room entries var prevPDUPos int64
if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
return nil if len(recentEvents) == 0 {
if len(delta.stateEvents) == 0 {
// Don't bother appending empty room entries
return nil
}
// If full_state=true and since is already up to date, then we'll have
// state events but no recent events.
prevPDUPos = toPos - 1
} else {
prevPDUPos = recentStreamEvents[0].streamPosition - 1
}
if prevPDUPos <= 0 {
prevPDUPos = 1
} }
// Note that we're not passing txn into applyRedactions because txn is // Note that we're not passing txn into applyRedactions because txn is
@ -668,32 +694,22 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
} }
switch delta.membership { switch delta.membership {
case "join": case membershipJoin:
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { // Use the short form of batch token for prev_batch
// Use the short form of batch token for prev_batch jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else {
// Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = "1"
}
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr res.Rooms.Join[delta.roomID] = *jr
case "leave": case membershipLeave:
fallthrough // transitions to leave are the same as ban fallthrough // transitions to leave are the same as ban
case "ban": case membershipBan:
// TODO: recentEvents may contain events that this user is not allowed to see because they are // TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room. // no longer in the room.
lr := types.NewLeaveResponse() lr := types.NewLeaveResponse()
if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { // Use the short form of batch token for prev_batch
// Use the short form of batch token for prev_batch lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else {
// Use the short form of batch token for prev_batch
lr.Timeline.PrevBatch = "1"
}
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@ -789,10 +805,14 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
return events, nil return events, nil
} }
// getStateDeltas returns the state deltas between fromPos and toPos,
// exclusive of oldPos, inclusive of newPos, for the rooms in which
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
func (d *SyncServerDatasource) getStateDeltas( func (d *SyncServerDatasource) getStateDeltas(
ctx context.Context, device *authtypes.Device, txn *sql.Tx, ctx context.Context, device *authtypes.Device, txn *sql.Tx,
fromPos, toPos int64, userID string, fromPos, toPos int64, userID string,
) ([]stateDelta, error) { ) ([]stateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response // - Get membership list changes for this user in this sync response
// - For each room which has membership list changes: // - For each room which has membership list changes:
@ -806,11 +826,11 @@ func (d *SyncServerDatasource) getStateDeltas(
// get all the state events ever between these two positions // get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
for roomID, stateStreamEvents := range state { for roomID, stateStreamEvents := range state {
@ -821,16 +841,12 @@ func (d *SyncServerDatasource) getStateDeltas(
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline. // the timeline.
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership == "join" { if membership == membershipJoin {
// send full room state down instead of a delta // send full room state down instead of a delta
var allState []gomatrixserverlib.Event var s []streamEvent
allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID)
if err != nil { if err != nil {
return nil, err return nil, nil, err
}
s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = streamEvent{Event: allState[i], streamPosition: 0}
} }
state[roomID] = s state[roomID] = s
continue // we'll add this room in when we do joined rooms continue // we'll add this room in when we do joined rooms
@ -848,19 +864,92 @@ func (d *SyncServerDatasource) getStateDeltas(
} }
// Add in currently joined rooms // Add in currently joined rooms
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
for _, joinedRoomID := range joinedRoomIDs { for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, stateDelta{ deltas = append(deltas, stateDelta{
membership: "join", membership: membershipJoin,
stateEvents: streamEventsToEvents(device, state[joinedRoomID]), stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
roomID: joinedRoomID, roomID: joinedRoomID,
}) })
} }
return deltas, nil return deltas, joinedRoomIDs, nil
}
// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
// requests with full_state=true.
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
fromPos, toPos int64, userID string,
) ([]stateDelta, []string, error) {
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
if err != nil {
return nil, nil, err
}
// Use a reasonable initial capacity
deltas := make([]stateDelta, 0, len(joinedRoomIDs))
// Add full states for all joined rooms
for _, joinedRoomID := range joinedRoomIDs {
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID)
if stateErr != nil {
return nil, nil, stateErr
}
deltas = append(deltas, stateDelta{
membership: "join",
stateEvents: streamEventsToEvents(device, s),
roomID: joinedRoomID,
})
}
// Get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
if err != nil {
return nil, nil, err
}
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
if err != nil {
return nil, nil, err
}
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership != "join" { // We've already added full state for all joined rooms above.
deltas = append(deltas, stateDelta{
membership: membership,
membershipPos: ev.streamPosition,
stateEvents: streamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
})
}
break
}
}
}
return deltas, joinedRoomIDs, nil
}
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]streamEvent, error) {
allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
return nil, err
}
s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = streamEvent{Event: allState[i], streamPosition: 0}
}
return s, nil
} }
// applyRedactionsForEventLists applies necessary redactions to the events in the lists in-place. // applyRedactionsForEventLists applies necessary redactions to the events in the lists in-place.

View file

@ -65,8 +65,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
currPos := rp.notifier.CurrentPosition() currPos := rp.notifier.CurrentPosition()
// If this is an initial sync or timeout=0 we return immediately if shouldReturnImmediately(syncReq) {
if syncReq.since == nil || syncReq.timeout == 0 {
syncData, err = rp.currentSyncForUser(*syncReq, currPos) syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
@ -135,7 +134,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP
if req.since == nil { if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
} else { } else {
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit) res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState)
} }
if err != nil { if err != nil {
@ -216,3 +215,10 @@ func (rp *RequestPool) appendAccountData(
return data, nil return data, nil
} }
// shouldReturnImmediately returns whether the /sync request is an initial sync,
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
func shouldReturnImmediately(syncReq *syncRequest) bool {
return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState
}

View file

@ -158,5 +158,6 @@ Inbound federation rejects remote attempts to join local users to rooms
Inbound federation rejects remote attempts to kick local users to rooms Inbound federation rejects remote attempts to kick local users to rooms
An event which redacts itself should be ignored An event which redacts itself should be ignored
A pair of events which redact each other should be ignored A pair of events which redact each other should be ignored
Full state sync includes joined rooms
POST /rooms/:room_id/redact/:event_id as original message sender redacts message POST /rooms/:room_id/redact/:event_id as original message sender redacts message
POST /rooms/:room_id/redact/:event_id as random user does not redact message POST /rooms/:room_id/redact/:event_id as random user does not redact message