diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 925a1233f..6594b938c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -187,114 +187,145 @@ func (d *SyncServerDatabase) IncrementalSync( userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int, -) (res *types.Response, returnErr error) { - returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { - // Work out which rooms to return in the response. This is done by getting not only the currently - // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. - // This works out what the 'state' key should be for each room as well as which membership block - // to put the room into. - deltas, err := d.getStateDeltas(ctx, txn, fromPos, toPos, userID) +) (*types.Response, error) { + txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return nil, err + } + var succeeded bool + defer common.EndTransaction(txn, &succeeded) + + // Work out which rooms to return in the response. This is done by getting not only the currently + // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. + // This works out what the 'state' key should be for each room as well as which membership block + // to put the room into. + deltas, err := d.getStateDeltas(ctx, txn, fromPos, toPos, userID) + if err != nil { + return nil, err + } + + res := types.NewResponse(toPos) + for _, delta := range deltas { + endPos := toPos + if delta.membershipPos > 0 && delta.membership == "leave" { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + endPos = delta.membershipPos + } + var recentStreamEvents []streamEvent + recentStreamEvents, err = d.events.selectRecentEvents( + ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, + ) if err != nil { - return err + return nil, err } + recentEvents := streamEventsToEvents(recentStreamEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - res = types.NewResponse(toPos) - for _, delta := range deltas { - endPos := toPos - if delta.membershipPos > 0 && delta.membership == "leave" { - // make sure we don't leak recent events after the leave event. - // TODO: History visibility makes this somewhat complex to handle correctly. For example: - // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). - // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave - // in a single /sync request - // This is all "okay" assuming history_visibility == "shared" which it is by default. - endPos = delta.membershipPos - } - recentStreamEvents, err := d.events.selectRecentEvents( - ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, - ) - if err != nil { - return err - } - recentEvents := streamEventsToEvents(recentStreamEvents) - delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - - switch delta.membership { - case "join": - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.roomID] = *jr - case "leave": - fallthrough // transitions to leave are the same as ban - case "ban": - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. - lr := types.NewLeaveResponse() - lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.roomID] = *lr - } + switch delta.membership { + case "join": + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case "leave": + fallthrough // transitions to leave are the same as ban + case "ban": + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr } + } - // TODO: This should be done in getStateDeltas - return d.addInvitesToResponse(ctx, txn, userID, res) - }) - return + // TODO: This should be done in getStateDeltas + if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { + return nil, err + } + + succeeded = true + return res, nil } // CompleteSync a complete /sync API response for the given user. func (d *SyncServerDatabase) CompleteSync( ctx context.Context, userID string, numRecentEventsPerRoom int, -) (res *types.Response, returnErr error) { +) (*types.Response, error) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // a consistent view of the database throughout. This includes extracting the sync stream position. // This does have the unfortunate side-effect that all the matrixy logic resides in this function, // but it's better to not hide the fact that this is being done in a transaction. - returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { - // Get the current stream position which we will base the sync response on. - id, err := d.events.selectMaxID(ctx, txn) + txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return nil, err + } + var succeeded bool + defer common.EndTransaction(txn, &succeeded) + + // Get the current stream position which we will base the sync response on. + id, err := d.events.selectMaxID(ctx, txn) + if err != nil { + return nil, err + } + pos := types.StreamPosition(id) + + // Extract room state and recent events for all rooms the user is joined to. + roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + if err != nil { + return nil, err + } + + // Build up a /sync response. Add joined rooms. + res := types.NewResponse(pos) + for _, roomID := range roomIDs { + var stateEvents []gomatrixserverlib.Event + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) if err != nil { - return err + return nil, err } - pos := types.StreamPosition(id) - - // Extract room state and recent events for all rooms the user is joined to. - roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []streamEvent + recentStreamEvents, err = d.events.selectRecentEvents( + ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, + ) if err != nil { - return err + return nil, err } + recentEvents := streamEventsToEvents(recentStreamEvents) - // Build up a /sync response. Add joined rooms. - res = types.NewResponse(pos) - for _, roomID := range roomIDs { - stateEvents, err := d.roomstate.selectCurrentState(ctx, txn, roomID) - if err != nil { - return err - } - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentStreamEvents, err := d.events.selectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, - ) - if err != nil { - return err - } - recentEvents := streamEventsToEvents(recentStreamEvents) + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr + } - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } + if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { + return nil, err + } - return d.addInvitesToResponse(ctx, txn, userID, res) - }) - return + succeeded = true + return res, err +} + +var txReadOnlySnapshot = sql.TxOptions{ + // Set the isolation level so that we see a snapshot of the database. + // In PostgreSQL repeatable read transactions will see a snapshot taken + // at the first query, and since the transaction is read-only it can't + // run into any serialisation errors. + // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, } // GetAccountDataInRange returns all account data for a given user inserted or