Merge branch 'master' into sync-full-state

Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
Cnly 2019-08-01 11:12:42 +08:00
commit b78dd76f4a
2 changed files with 35 additions and 26 deletions

View file

@ -23,7 +23,7 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx
-- For deleting old invites -- For deleting old invites
CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
ON syncapi_invite_events(target_user_id, id); ON syncapi_invite_events (event_id);
` `
const insertInviteEventSQL = "" + const insertInviteEventSQL = "" +

View file

@ -35,6 +35,12 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
const (
membershipJoin = "join"
membershipLeave = "leave"
membershipBan = "ban"
)
type stateDelta struct { type stateDelta struct {
roomID string roomID string
stateEvents []gomatrixserverlib.Event stateEvents []gomatrixserverlib.Event
@ -250,18 +256,17 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
// This works out what the 'state' key should be for each room as well as which membership block // This works out what the 'state' key should be for each room as well as which membership block
// to put the room into. // to put the room into.
var deltas []stateDelta var deltas []stateDelta
var joinedRoomIDs []string
if !wantFullState { if !wantFullState {
deltas, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) deltas, joinedRoomIDs, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
} else { } else {
deltas, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID) deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID)
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
joinedRoomIDs := make([]string, 0, len(deltas))
for _, delta := range deltas { for _, delta := range deltas {
joinedRoomIDs = append(joinedRoomIDs, delta.roomID)
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
if err != nil { if err != nil {
return nil, err return nil, err
@ -351,7 +356,7 @@ func (d *SyncServerDatasource) IncrementalSync(
) )
} else { } else {
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
ctx, nil, device.UserID, "join", ctx, nil, device.UserID, membershipJoin,
) )
} }
if err != nil { if err != nil {
@ -400,7 +405,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
res = types.NewResponse(toPos) res = types.NewResponse(toPos)
// Extract room state and recent events for all rooms the user is joined to. // Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin)
if err != nil { if err != nil {
return return
} }
@ -578,7 +583,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
res *types.Response, res *types.Response,
) error { ) error {
endPos := toPos endPos := toPos
if delta.membershipPos > 0 && delta.membership == "leave" { if delta.membershipPos > 0 && delta.membership == membershipLeave {
// make sure we don't leak recent events after the leave event. // make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
@ -616,7 +621,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
} }
switch delta.membership { switch delta.membership {
case "join": case membershipJoin:
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
// Use the short form of batch token for prev_batch // Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
@ -624,9 +629,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr res.Rooms.Join[delta.roomID] = *jr
case "leave": case membershipLeave:
fallthrough // transitions to leave are the same as ban fallthrough // transitions to leave are the same as ban
case "ban": case membershipBan:
// TODO: recentEvents may contain events that this user is not allowed to see because they are // TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room. // no longer in the room.
lr := types.NewLeaveResponse() lr := types.NewLeaveResponse()
@ -727,10 +732,14 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
return events, nil return events, nil
} }
// getStateDeltas returns the state deltas between fromPos and toPos,
// exclusive of oldPos, inclusive of newPos, for the rooms in which
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
func (d *SyncServerDatasource) getStateDeltas( func (d *SyncServerDatasource) getStateDeltas(
ctx context.Context, device *authtypes.Device, txn *sql.Tx, ctx context.Context, device *authtypes.Device, txn *sql.Tx,
fromPos, toPos int64, userID string, fromPos, toPos int64, userID string,
) ([]stateDelta, error) { ) ([]stateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response // - Get membership list changes for this user in this sync response
// - For each room which has membership list changes: // - For each room which has membership list changes:
@ -744,11 +753,11 @@ func (d *SyncServerDatasource) getStateDeltas(
// get all the state events ever between these two positions // get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
for roomID, stateStreamEvents := range state { for roomID, stateStreamEvents := range state {
@ -759,12 +768,12 @@ func (d *SyncServerDatasource) getStateDeltas(
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline. // the timeline.
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership == "join" { if membership == membershipJoin {
// send full room state down instead of a delta // send full room state down instead of a delta
var s []streamEvent var s []streamEvent
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID) s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
state[roomID] = s state[roomID] = s
continue // we'll add this room in when we do joined rooms continue // we'll add this room in when we do joined rooms
@ -782,19 +791,19 @@ func (d *SyncServerDatasource) getStateDeltas(
} }
// Add in currently joined rooms // Add in currently joined rooms
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
for _, joinedRoomID := range joinedRoomIDs { for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, stateDelta{ deltas = append(deltas, stateDelta{
membership: "join", membership: membershipJoin,
stateEvents: streamEventsToEvents(device, state[joinedRoomID]), stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
roomID: joinedRoomID, roomID: joinedRoomID,
}) })
} }
return deltas, nil return deltas, joinedRoomIDs, nil
} }
// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync // getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
@ -804,10 +813,10 @@ func (d *SyncServerDatasource) getStateDeltas(
func (d *SyncServerDatasource) getStateDeltasForFullStateSync( func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
ctx context.Context, device *authtypes.Device, txn *sql.Tx, ctx context.Context, device *authtypes.Device, txn *sql.Tx,
fromPos, toPos int64, userID string, fromPos, toPos int64, userID string,
) ([]stateDelta, error) { ) ([]stateDelta, []string, error) {
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
// Use a reasonable initial capacity // Use a reasonable initial capacity
@ -817,7 +826,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
for _, joinedRoomID := range joinedRoomIDs { for _, joinedRoomID := range joinedRoomIDs {
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID) s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID)
if stateErr != nil { if stateErr != nil {
return nil, stateErr return nil, nil, stateErr
} }
deltas = append(deltas, stateDelta{ deltas = append(deltas, stateDelta{
membership: "join", membership: "join",
@ -829,11 +838,11 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
// Get all the state events ever between these two positions // Get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
for roomID, stateStreamEvents := range state { for roomID, stateStreamEvents := range state {
@ -853,7 +862,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
} }
} }
return deltas, nil return deltas, joinedRoomIDs, nil
} }
func (d *SyncServerDatasource) currentStateStreamEventsForRoom( func (d *SyncServerDatasource) currentStateStreamEventsForRoom(