Lazy loading fixes (#2362)
* Return some more usefully wrapped errors when doing sync * Remove unnecessary error check * Couple of guards around `sql.ErrNoRows` * Nolint
This commit is contained in:
parent
a9f0a390c6
commit
bb987cd64b
|
@ -3,6 +3,7 @@ package streams
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -205,6 +206,7 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
return newPos
|
return newPos
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nolint:gocyclo
|
||||||
func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
|
@ -228,13 +230,16 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
eventFilter, true, true,
|
eventFilter, true, true,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return r.From, err
|
if err == sql.ErrNoRows {
|
||||||
|
return r.To, nil
|
||||||
|
}
|
||||||
|
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
|
||||||
}
|
}
|
||||||
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
||||||
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
|
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
|
||||||
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
|
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return r.From, err
|
return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we didn't return any events at all then don't bother doing anything else.
|
// If we didn't return any events at all then don't bother doing anything else.
|
||||||
|
@ -268,15 +273,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
|
|
||||||
if stateFilter.LazyLoadMembers {
|
if stateFilter.LazyLoadMembers {
|
||||||
if err != nil {
|
|
||||||
return r.From, err
|
|
||||||
}
|
|
||||||
delta.StateEvents, err = p.lazyLoadMembers(
|
delta.StateEvents, err = p.lazyLoadMembers(
|
||||||
ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers,
|
ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers,
|
||||||
device, recentEvents, delta.StateEvents,
|
device, recentEvents, delta.StateEvents,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return r.From, err
|
return r.From, fmt.Errorf("p.lazyLoadMembers: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue