From 68119986116457f3acc551bff1f0e50ba8bf5ad3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 11 Oct 2022 16:28:33 +0100 Subject: [PATCH] Early pagination --- syncapi/routing/relations.go | 3 +-- syncapi/storage/postgres/relations_table.go | 9 ++++--- syncapi/storage/shared/storage_sync.go | 30 +++++++++++++++++---- syncapi/storage/sqlite3/relations_table.go | 9 ++++--- syncapi/storage/tables/interface.go | 2 +- syncapi/types/types.go | 5 ++++ 6 files changed, 44 insertions(+), 14 deletions(-) diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go index 91b1861bc..6bc2b3733 100644 --- a/syncapi/routing/relations.go +++ b/syncapi/routing/relations.go @@ -68,8 +68,6 @@ func Relations(req *http.Request, device *api.Device, syncDB storage.Database, r } } - res := &RelationsResponse{} - snapshot, err := syncDB.NewDatabaseSnapshot(req.Context()) if err != nil { return jsonerror.InternalServerError() @@ -83,6 +81,7 @@ func Relations(req *http.Request, device *api.Device, syncDB storage.Database, r } } + res := &RelationsResponse{} res.Chunk, res.PrevBatch, res.NextBatch, err = snapshot.RelationsFor( req.Context(), roomID, eventID, relType, eventType, from, to, limit, ) diff --git a/syncapi/storage/postgres/relations_table.go b/syncapi/storage/postgres/relations_table.go index 876b5a3a1..934fddbf0 100644 --- a/syncapi/storage/postgres/relations_table.go +++ b/syncapi/storage/postgres/relations_table.go @@ -119,7 +119,7 @@ func (s *relationsStatements) DeleteRelation( func (s *relationsStatements) SelectRelationsInRange( ctx context.Context, txn *sql.Tx, roomID, eventID, relType string, r types.Range, limit int, -) (map[string][]string, types.StreamPosition, error) { +) (map[string][]types.RelationEntry, types.StreamPosition, error) { var lastPos types.StreamPosition var rows *sql.Rows var err error @@ -134,7 +134,7 @@ func (s *relationsStatements) SelectRelationsInRange( return nil, lastPos, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRelationsInRange: rows.close() failed") - result := map[string][]string{} + result := map[string][]types.RelationEntry{} for rows.Next() { var ( id types.StreamPosition @@ -148,7 +148,10 @@ func (s *relationsStatements) SelectRelationsInRange( if id > lastPos { lastPos = id } - result[relType] = append(result[relType], childEventID) + result[relType] = append(result[relType], types.RelationEntry{ + Position: id, + EventID: childEventID, + }) } if lastPos == 0 { lastPos = r.To diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index b1a04b962..00f62d8ff 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -598,28 +598,47 @@ func (d *DatabaseTransaction) MaxStreamPositionForRelations(ctx context.Context) func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, limit int) ( clientEvents []gomatrixserverlib.ClientEvent, prevBatch, nextBatch string, err error, ) { - // TODO: Nothing here is limited or setting prev_batch or next_batch var eventIDs []string r := types.Range{ From: from, To: to, Backwards: from > to, } - rels, _, err := d.Relations.SelectRelationsInRange(ctx, d.txn, roomID, eventID, relType, r, limit+1) + relations, _, err := d.Relations.SelectRelationsInRange(ctx, d.txn, roomID, eventID, relType, r, limit+1) if err != nil { return nil, "", "", fmt.Errorf("d.Relations.SelectRelationsInRange: %w", err) } + + entries := []types.RelationEntry{} if relType != "" { - eventIDs = rels[relType] + entries = relations[relType] } else { - for _, ids := range rels { - eventIDs = append(eventIDs, ids...) + for _, e := range relations { + entries = append(entries, e...) } } + + if len(entries) == 0 { + return nil, "", "", nil + } + + if from > 0 { + prevBatch = fmt.Sprintf("%d", entries[0].Position-1) + } + if len(entries) > limit { + nextBatch = fmt.Sprintf("%d", entries[len(entries)-1].Position) + entries = entries[:len(entries)-1] + } + + for _, entry := range entries { + eventIDs = append(eventIDs, entry.EventID) + } + events, err := d.OutputEvents.SelectEvents(ctx, d.txn, eventIDs, nil, true) if err != nil { return nil, "", "", fmt.Errorf("d.OutputEvents.SelectEvents: %w", err) } + for _, event := range events { if eventType != "" && event.Type() != eventType { continue @@ -629,5 +648,6 @@ func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, gomatrixserverlib.ToClientEvent(event.Event, gomatrixserverlib.FormatAll), ) } + return clientEvents, prevBatch, nextBatch, nil } diff --git a/syncapi/storage/sqlite3/relations_table.go b/syncapi/storage/sqlite3/relations_table.go index 02463668d..4f25777ea 100644 --- a/syncapi/storage/sqlite3/relations_table.go +++ b/syncapi/storage/sqlite3/relations_table.go @@ -123,7 +123,7 @@ func (s *relationsStatements) DeleteRelation( func (s *relationsStatements) SelectRelationsInRange( ctx context.Context, txn *sql.Tx, roomID, eventID, relType string, r types.Range, limit int, -) (map[string][]string, types.StreamPosition, error) { +) (map[string][]types.RelationEntry, types.StreamPosition, error) { var lastPos types.StreamPosition var rows *sql.Rows var err error @@ -138,7 +138,7 @@ func (s *relationsStatements) SelectRelationsInRange( return nil, lastPos, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRelationsInRange: rows.close() failed") - result := map[string][]string{} + result := map[string][]types.RelationEntry{} for rows.Next() { var ( id types.StreamPosition @@ -152,7 +152,10 @@ func (s *relationsStatements) SelectRelationsInRange( if id > lastPos { lastPos = id } - result[relType] = append(result[relType], childEventID) + result[relType] = append(result[relType], types.RelationEntry{ + Position: id, + EventID: childEventID, + }) } if lastPos == 0 { lastPos = r.To diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 1311ef22c..51de0965b 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -210,6 +210,6 @@ type Presence interface { type Relations interface { InsertRelation(ctx context.Context, txn *sql.Tx, roomID, eventID, childEventID, relType string) (streamPos types.StreamPosition, err error) DeleteRelation(ctx context.Context, txn *sql.Tx, roomID, childEventID string) error - SelectRelationsInRange(ctx context.Context, txn *sql.Tx, roomID, eventID, relType string, r types.Range, limit int) (map[string][]string, types.StreamPosition, error) + SelectRelationsInRange(ctx context.Context, txn *sql.Tx, roomID, eventID, relType string, r types.Range, limit int) (map[string][]types.RelationEntry, types.StreamPosition, error) SelectMaxRelationID(ctx context.Context, txn *sql.Tx) (id int64, err error) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 72e816d30..60a74a285 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -607,3 +607,8 @@ type OutputSendToDeviceEvent struct { type IgnoredUsers struct { List map[string]interface{} `json:"ignored_users"` } + +type RelationEntry struct { + Position StreamPosition + EventID string +}