From 646b4c1bf559df6f17a4f9ea55be0179ee86135e Mon Sep 17 00:00:00 2001 From: Joakim Recht Date: Mon, 22 Jan 2024 13:19:32 +0100 Subject: [PATCH] Only fetch events once for all rooms --- syncapi/streams/stream_pdu.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 3abb0b3c6..303148ba2 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -203,6 +203,21 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } + roomIDs := make([]string, len(stateDeltas)) + for i := range stateDeltas { + roomIDs[i] = stateDeltas[i].RoomID + } + dbEvents, err := snapshot.RecentEvents( + ctx, roomIDs, r, + &eventFilter, true, true, + ) + if err != nil { + if err == sql.ErrNoRows { + return r.To + } + return r.From + } + newPos = from for _, delta := range stateDeltas { newRange := r @@ -218,7 +233,7 @@ func (p *PDUStreamProvider) IncrementalSync( } } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { return newPos @@ -253,6 +268,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( eventFilter *synctypes.RoomEventFilter, stateFilter *synctypes.StateFilter, req *types.SyncRequest, + dbEvents map[string]types.RecentEvents, ) (types.StreamPosition, error) { var err error originalLimit := eventFilter.Limit @@ -265,17 +281,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } - dbEvents, err := snapshot.RecentEvents( - ctx, []string{delta.RoomID}, r, - eventFilter, true, true, - ) - if err != nil { - if err == sql.ErrNoRows { - return r.To, nil - } - return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err) - } - recentStreamEvents := dbEvents[delta.RoomID].Events limited := dbEvents[delta.RoomID].Limited