From b3568a07631379afe63de6416ff0aa946a60c465 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 24 Jan 2020 22:50:36 +0000 Subject: [PATCH] Further sync tweaks --- .../postgres/output_room_events_topology_table.go | 2 +- syncapi/storage/postgres/syncserver.go | 9 +++------ syncapi/sync/requestpool.go | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 4a50b9a08..793d1e236 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -40,7 +40,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync const insertEventInTopologySQL = "" + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" + " VALUES ($1, $2, $3)" + - " ON CONFLICT DO NOTHING" + " ON CONFLICT (topological_position, room_id) DO UPDATE SET event_id = $1" const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 621aec957..cc34e3cdc 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -446,6 +446,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( for _, delta := range deltas { err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) if err != nil { + return nil, err } } @@ -602,7 +603,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( recentStreamEvents, err = d.events.selectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition, numRecentEventsPerRoom, true, true, - //ctx, txn, roomID, 0, toPos.PDUPosition, numRecentEventsPerRoom, ) if err != nil { return @@ -618,7 +618,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( if backwardTopologyPos-1 <= 0 { backwardTopologyPos = types.StreamPosition(1) } else { - backwardTopologyPos = backwardTopologyPos - 1 + backwardTopologyPos-- } // We don't include a device here as we don't need to send down @@ -769,10 +769,7 @@ func (d *SyncServerDatasource) getBackwardTopologyPos( events []types.StreamEvent, ) (pos types.StreamPosition, err error) { if len(events) > 0 { - pos, err = d.topology.selectPositionInTopology(ctx, events[0].EventID()) - if err != nil { - return - } + pos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID()) } if pos-1 <= 0 { pos = types.StreamPosition(1) diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 5a3ae8807..8ef7bc16a 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -185,7 +185,7 @@ func (rp *RequestPool) appendAccountData( // Sync is not initial, get all account data since the latest sync dataTypes, err := rp.db.GetAccountDataInRange( req.ctx, userID, - types.StreamPosition(req.since.PDUPosition), types.StreamPosition(currentPos), + types.StreamPosition(currentPos), types.StreamPosition(req.since.PDUPosition), accountDataFilter, ) if err != nil {