mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 14:33:10 -06:00
Only fetch events once for all rooms
This commit is contained in:
parent
7863a405a5
commit
646b4c1bf5
|
|
@ -203,6 +203,21 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
roomIDs := make([]string, len(stateDeltas))
|
||||||
|
for i := range stateDeltas {
|
||||||
|
roomIDs[i] = stateDeltas[i].RoomID
|
||||||
|
}
|
||||||
|
dbEvents, err := snapshot.RecentEvents(
|
||||||
|
ctx, roomIDs, r,
|
||||||
|
&eventFilter, true, true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return r.To
|
||||||
|
}
|
||||||
|
return r.From
|
||||||
|
}
|
||||||
|
|
||||||
newPos = from
|
newPos = from
|
||||||
for _, delta := range stateDeltas {
|
for _, delta := range stateDeltas {
|
||||||
newRange := r
|
newRange := r
|
||||||
|
|
@ -218,7 +233,7 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var pos types.StreamPosition
|
var pos types.StreamPosition
|
||||||
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
|
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil {
|
||||||
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
||||||
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
|
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
|
||||||
return newPos
|
return newPos
|
||||||
|
|
@ -253,6 +268,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
eventFilter *synctypes.RoomEventFilter,
|
eventFilter *synctypes.RoomEventFilter,
|
||||||
stateFilter *synctypes.StateFilter,
|
stateFilter *synctypes.StateFilter,
|
||||||
req *types.SyncRequest,
|
req *types.SyncRequest,
|
||||||
|
dbEvents map[string]types.RecentEvents,
|
||||||
) (types.StreamPosition, error) {
|
) (types.StreamPosition, error) {
|
||||||
var err error
|
var err error
|
||||||
originalLimit := eventFilter.Limit
|
originalLimit := eventFilter.Limit
|
||||||
|
|
@ -265,17 +281,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dbEvents, err := snapshot.RecentEvents(
|
|
||||||
ctx, []string{delta.RoomID}, r,
|
|
||||||
eventFilter, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return r.To, nil
|
|
||||||
}
|
|
||||||
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
recentStreamEvents := dbEvents[delta.RoomID].Events
|
recentStreamEvents := dbEvents[delta.RoomID].Events
|
||||||
limited := dbEvents[delta.RoomID].Limited
|
limited := dbEvents[delta.RoomID].Limited
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue