diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index e34445acf..4595253a1 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -121,7 +121,10 @@ func (p *PDUStreamProvider) CompleteSync( ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return from + if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { + return from + } + continue } req.Response.Rooms.Peek[peek.RoomID] = *jr } @@ -190,10 +193,10 @@ func (p *PDUStreamProvider) IncrementalSync( var pos types.StreamPosition if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - if err == context.DeadlineExceeded || err == context.Canceled { + if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { return newPos } - return newPos + continue } // Reset the position, as it is only for the special case of newly joined rooms if delta.NewlyJoined {