factor out completesync and a bit more

This commit is contained in:
Kegan Dougal 2020-05-14 15:34:16 +01:00
parent 83b0a0c681
commit f5909f6f96
2 changed files with 11 additions and 200 deletions

View file

@ -466,7 +466,7 @@ func (d *Database) addPDUDeltaToResponse(
}
// TODO: This should be done in getStateDeltas
if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
if err = d.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
return nil, err
}
@ -510,7 +510,8 @@ func (d *Database) addTypingDeltaToResponse(
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
func (d *Database) addEDUDeltaToResponse(
// TODO FIXME TEMPORARY PUBLIC
func (d *Database) AddEDUDeltaToResponse(
fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
@ -550,7 +551,7 @@ func (d *Database) IncrementalSync(
return nil, err
}
err = d.addEDUDeltaToResponse(
err = d.AddEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res,
)
if err != nil {
@ -627,7 +628,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
var prevBatchStr string
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, nil, recentStreamEvents[0].EventID())
backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
if err != nil {
return
}
@ -648,7 +649,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
res.Rooms.Join[roomID] = *jr
}
if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
if err = d.AddInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
return
}
@ -667,7 +668,7 @@ func (d *Database) CompleteSync(
}
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
err = d.AddEDUDeltaToResponse(
types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
)
if err != nil {
@ -687,7 +688,8 @@ var txReadOnlySnapshot = sql.TxOptions{
ReadOnly: true,
}
func (d *Database) addInvitesToResponse(
// TODO FIXME temporary public
func (d *Database) AddInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
fromPos, toPos types.StreamPosition,

View file

@ -18,7 +18,6 @@ package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
@ -173,7 +172,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
}
// TODO: This should be done in getStateDeltas
if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
if err = d.Database.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
return nil, err
}
@ -181,57 +180,6 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
return joinedRoomIDs, nil
}
// addTypingDeltaToResponse adds all typing notifications to a sync response
// since the specified position.
func (d *SyncServerDatasource) addTypingDeltaToResponse(
since types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) error {
var jr types.JoinResponse
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUPosition()),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
}
ev.Content, err = json.Marshal(map[string]interface{}{
"user_ids": typingUsers,
})
if err != nil {
return err
}
if jr, ok = res.Rooms.Join[roomID]; !ok {
jr = *types.NewJoinResponse()
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
res.Rooms.Join[roomID] = jr
}
}
return nil
}
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
func (d *SyncServerDatasource) addEDUDeltaToResponse(
fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) (err error) {
if fromPos.EDUPosition() != toPos.EDUPosition() {
err = d.addTypingDeltaToResponse(
fromPos, joinedRoomIDs, res,
)
}
return
}
// IncrementalSync returns all the data needed in order to create an incremental
// sync response for the given user. Events returned will include any client
// transaction IDs associated with the given device. These transaction IDs come
@ -263,7 +211,7 @@ func (d *SyncServerDatasource) IncrementalSync(
return nil, err
}
err = d.addEDUDeltaToResponse(
err = d.Database.AddEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res,
)
if err != nil {
@ -273,126 +221,6 @@ func (d *SyncServerDatasource) IncrementalSync(
return res, nil
}
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
ctx context.Context,
userID string,
numRecentEventsPerRoom int,
) (
res *types.Response,
toPos types.StreamingToken,
joinedRoomIDs []string,
err error,
) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction.
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil {
return
}
var succeeded bool
defer func() {
txerr := common.EndTransaction(txn, &succeeded)
if err == nil && txerr != nil {
err = txerr
}
}()
// Get the current sync position which we will base the sync response on.
toPos, err = d.Database.SyncPositionTx(ctx, txn)
if err != nil {
return
}
res = types.NewResponse(toPos)
// Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.Database.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
if err != nil {
return
}
stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var stateEvents []gomatrixserverlib.HeaderedEvent
stateEvents, err = d.Database.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, &stateFilterPart)
if err != nil {
return
}
//fmt.Println("State events:", stateEvents)
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
numRecentEventsPerRoom, true, true,
)
if err != nil {
return
}
//fmt.Println("Recent stream events:", recentStreamEvents)
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
var prevBatchStr string
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = d.Database.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
if err != nil {
return
}
prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
prevBatch.Decrement()
prevBatchStr = prevBatch.String()
}
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs
recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatchStr
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
return
}
succeeded = true
return res, toPos, joinedRoomIDs, err
}
// CompleteSync returns a complete /sync API response for the given user.
func (d *SyncServerDatasource) CompleteSync(
ctx context.Context, userID string, numRecentEventsPerRoom int,
) (*types.Response, error) {
res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
ctx, userID, numRecentEventsPerRoom,
)
if err != nil {
return nil, err
}
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, err
}
return res, nil
}
var txReadOnlySnapshot = sql.TxOptions{
// Set the isolation level so that we see a snapshot of the database.
// In PostgreSQL repeatable read transactions will see a snapshot taken
@ -403,25 +231,6 @@ var txReadOnlySnapshot = sql.TxOptions{
ReadOnly: true,
}
func (d *SyncServerDatasource) addInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
fromPos, toPos types.StreamPosition,
res *types.Response,
) error {
invites, err := d.Database.Invites.SelectInviteEventsInRange(
ctx, txn, userID, fromPos, toPos,
)
if err != nil {
return err
}
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse(inviteEvent)
res.Rooms.Invite[roomID] = *ir
}
return nil
}
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
func (d *SyncServerDatasource) getBackwardTopologyPos(