Add some error wrapping to sync API, use background context for sync (#1363)
* Add some error wrapping to sync API * Don't use request context for BeginTx until mattn/go-sqlite3#764 is fixed
This commit is contained in:
parent
fee1c22790
commit
c42c70597c
|
@ -451,7 +451,7 @@ func (d *Database) addPDUDeltaToResponse(
|
||||||
wantFullState bool,
|
wantFullState bool,
|
||||||
res *types.Response,
|
res *types.Response,
|
||||||
) (joinedRoomIDs []string, err error) {
|
) (joinedRoomIDs []string, err error) {
|
||||||
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
|
txn, err := d.DB.BeginTx(context.TODO(), &txReadOnlySnapshot) // TODO: check mattn/go-sqlite3#764
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -577,20 +577,23 @@ func (d *Database) IncrementalSync(
|
||||||
joinedRoomIDs, err = d.addPDUDeltaToResponse(
|
joinedRoomIDs, err = d.addPDUDeltaToResponse(
|
||||||
ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
|
ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("d.addPDUDeltaToResponse: %w", err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
|
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
|
||||||
ctx, nil, device.UserID, gomatrixserverlib.Join,
|
ctx, nil, device.UserID, gomatrixserverlib.Join,
|
||||||
)
|
)
|
||||||
}
|
if err != nil {
|
||||||
if err != nil {
|
return nil, fmt.Errorf("d.CurrentRoomState.SelectRoomIDsWithMembership: %w", err)
|
||||||
return nil, err
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = d.addEDUDeltaToResponse(
|
err = d.addEDUDeltaToResponse(
|
||||||
fromPos, toPos, joinedRoomIDs, res,
|
fromPos, toPos, joinedRoomIDs, res,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
@ -632,7 +635,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
||||||
// a consistent view of the database throughout. This includes extracting the sync position.
|
// a consistent view of the database throughout. This includes extracting the sync position.
|
||||||
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
|
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
|
||||||
// but it's better to not hide the fact that this is being done in a transaction.
|
// but it's better to not hide the fact that this is being done in a transaction.
|
||||||
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
|
txn, err := d.DB.BeginTx(context.TODO(), &txReadOnlySnapshot) // TODO: check mattn/go-sqlite3#764
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -719,7 +722,7 @@ func (d *Database) CompleteSync(
|
||||||
ctx, res, device.UserID, numRecentEventsPerRoom,
|
ctx, res, device.UserID, numRecentEventsPerRoom,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use a zero value SyncPosition for fromPos so all EDU states are added.
|
// Use a zero value SyncPosition for fromPos so all EDU states are added.
|
||||||
|
@ -727,7 +730,7 @@ func (d *Database) CompleteSync(
|
||||||
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
|
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
@ -753,7 +756,7 @@ func (d *Database) addInvitesToResponse(
|
||||||
ctx, txn, userID, r,
|
ctx, txn, userID, r,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err)
|
||||||
}
|
}
|
||||||
for roomID, inviteEvent := range invites {
|
for roomID, inviteEvent := range invites {
|
||||||
ir := types.NewInviteResponse(inviteEvent)
|
ir := types.NewInviteResponse(inviteEvent)
|
||||||
|
|
|
@ -18,6 +18,7 @@ package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -204,31 +205,34 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
// See if we have any new tasks to do for the send-to-device messaging.
|
// See if we have any new tasks to do for the send-to-device messaging.
|
||||||
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, since)
|
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
if req.since == nil {
|
if req.since == nil {
|
||||||
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
|
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
|
||||||
|
if err != nil {
|
||||||
|
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
|
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
|
||||||
}
|
if err != nil {
|
||||||
if err != nil {
|
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
|
||||||
return res, err
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
|
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
|
||||||
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
|
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, fmt.Errorf("rp.appendAccountData: %w", err)
|
||||||
}
|
}
|
||||||
res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
|
res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
|
||||||
}
|
}
|
||||||
err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
|
err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before we return the sync response, make sure that we take action on
|
// Before we return the sync response, make sure that we take action on
|
||||||
|
@ -238,7 +242,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
// Handle the updates and deletions in the database.
|
// Handle the updates and deletions in the database.
|
||||||
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since)
|
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
|
@ -263,7 +267,7 @@ func (rp *RequestPool) appendDeviceLists(
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
|
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return data, nil
|
return data, nil
|
||||||
|
@ -329,7 +333,7 @@ func (rp *RequestPool) appendAccountData(
|
||||||
req.ctx, userID, r, accountDataFilter,
|
req.ctx, userID, r, accountDataFilter,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(dataTypes) == 0 {
|
if len(dataTypes) == 0 {
|
||||||
|
|
Loading…
Reference in a new issue