Use a read-only snapshot transaction for calculating sync responses

This commit is contained in:
Mark Haines 2017-09-19 14:42:19 +01:00
parent 08b9940dde
commit 746daec4fe

View file

@ -187,114 +187,145 @@ func (d *SyncServerDatabase) IncrementalSync(
userID string, userID string,
fromPos, toPos types.StreamPosition, fromPos, toPos types.StreamPosition,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
) (res *types.Response, returnErr error) { ) (*types.Response, error) {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
// Work out which rooms to return in the response. This is done by getting not only the currently if err != nil {
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. return nil, err
// This works out what the 'state' key should be for each room as well as which membership block }
// to put the room into. var succeeded bool
deltas, err := d.getStateDeltas(ctx, txn, fromPos, toPos, userID) defer common.EndTransaction(txn, &succeeded)
// Work out which rooms to return in the response. This is done by getting not only the currently
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
// This works out what the 'state' key should be for each room as well as which membership block
// to put the room into.
deltas, err := d.getStateDeltas(ctx, txn, fromPos, toPos, userID)
if err != nil {
return nil, err
}
res := types.NewResponse(toPos)
for _, delta := range deltas {
endPos := toPos
if delta.membershipPos > 0 && delta.membership == "leave" {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
endPos = delta.membershipPos
}
var recentStreamEvents []streamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
)
if err != nil { if err != nil {
return err return nil, err
} }
recentEvents := streamEventsToEvents(recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
res = types.NewResponse(toPos) switch delta.membership {
for _, delta := range deltas { case "join":
endPos := toPos jr := types.NewJoinResponse()
if delta.membershipPos > 0 && delta.membership == "leave" { jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
// make sure we don't leak recent events after the leave event. jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
// TODO: History visibility makes this somewhat complex to handle correctly. For example: jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). res.Rooms.Join[delta.roomID] = *jr
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave case "leave":
// in a single /sync request fallthrough // transitions to leave are the same as ban
// This is all "okay" assuming history_visibility == "shared" which it is by default. case "ban":
endPos = delta.membershipPos // TODO: recentEvents may contain events that this user is not allowed to see because they are
} // no longer in the room.
recentStreamEvents, err := d.events.selectRecentEvents( lr := types.NewLeaveResponse()
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
if err != nil { lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
return err res.Rooms.Leave[delta.roomID] = *lr
}
recentEvents := streamEventsToEvents(recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
switch delta.membership {
case "join":
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case "leave":
fallthrough // transitions to leave are the same as ban
case "ban":
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
} }
}
// TODO: This should be done in getStateDeltas // TODO: This should be done in getStateDeltas
return d.addInvitesToResponse(ctx, txn, userID, res) if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil {
}) return nil, err
return }
succeeded = true
return res, nil
} }
// CompleteSync a complete /sync API response for the given user. // CompleteSync a complete /sync API response for the given user.
func (d *SyncServerDatabase) CompleteSync( func (d *SyncServerDatabase) CompleteSync(
ctx context.Context, userID string, numRecentEventsPerRoom int, ctx context.Context, userID string, numRecentEventsPerRoom int,
) (res *types.Response, returnErr error) { ) (*types.Response, error) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync stream position. // a consistent view of the database throughout. This includes extracting the sync stream position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function, // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction. // but it's better to not hide the fact that this is being done in a transaction.
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
// Get the current stream position which we will base the sync response on. if err != nil {
id, err := d.events.selectMaxID(ctx, txn) return nil, err
}
var succeeded bool
defer common.EndTransaction(txn, &succeeded)
// Get the current stream position which we will base the sync response on.
id, err := d.events.selectMaxID(ctx, txn)
if err != nil {
return nil, err
}
pos := types.StreamPosition(id)
// Extract room state and recent events for all rooms the user is joined to.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
if err != nil {
return nil, err
}
// Build up a /sync response. Add joined rooms.
res := types.NewResponse(pos)
for _, roomID := range roomIDs {
var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil { if err != nil {
return err return nil, err
} }
pos := types.StreamPosition(id) // TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
// Extract room state and recent events for all rooms the user is joined to. var recentStreamEvents []streamEvent
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
)
if err != nil { if err != nil {
return err return nil, err
} }
recentEvents := streamEventsToEvents(recentStreamEvents)
// Build up a /sync response. Add joined rooms. stateEvents = removeDuplicates(stateEvents, recentEvents)
res = types.NewResponse(pos) jr := types.NewJoinResponse()
for _, roomID := range roomIDs { jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
stateEvents, err := d.roomstate.selectCurrentState(ctx, txn, roomID) jr.Timeline.Limited = true
if err != nil { jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return err res.Rooms.Join[roomID] = *jr
} }
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
recentStreamEvents, err := d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
)
if err != nil {
return err
}
recentEvents := streamEventsToEvents(recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents) if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil {
jr := types.NewJoinResponse() return nil, err
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) }
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
return d.addInvitesToResponse(ctx, txn, userID, res) succeeded = true
}) return res, err
return }
var txReadOnlySnapshot = sql.TxOptions{
// Set the isolation level so that we see a snapshot of the database.
// In PostgreSQL repeatable read transactions will see a snapshot taken
// at the first query, and since the transaction is read-only it can't
// run into any serialisation errors.
// https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
} }
// GetAccountDataInRange returns all account data for a given user inserted or // GetAccountDataInRange returns all account data for a given user inserted or