From 25c86554970ea4aa4d7100272a367a7c2c832b70 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 11 Oct 2022 16:03:40 +0100 Subject: [PATCH] It sorta works --- syncapi/routing/relations.go | 46 +++++++++++++++++++++----- syncapi/storage/interface.go | 2 ++ syncapi/storage/shared/storage_sync.go | 5 +++ syncapi/types/types.go | 8 +++++ 4 files changed, 53 insertions(+), 8 deletions(-) diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go index d72f1d12e..91b1861bc 100644 --- a/syncapi/routing/relations.go +++ b/syncapi/routing/relations.go @@ -16,6 +16,7 @@ package routing import ( "net/http" + "strconv" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -23,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/api" ) @@ -34,20 +36,37 @@ type RelationsResponse struct { // nolint:gocyclo func Relations(req *http.Request, device *api.Device, syncDB storage.Database, roomID, eventID, relType, eventType string) util.JSONResponse { + var err error + var from, to types.StreamPosition + var limit int dir := req.URL.Query().Get("dir") - from := req.URL.Query().Get("from") - to := req.URL.Query().Get("to") - limit := req.URL.Query().Get("limit") - + if f := req.URL.Query().Get("from"); f != "" { + if from, err = types.NewStreamPositionFromString(f); err != nil { + return util.ErrorResponse(err) + } + } + if t := req.URL.Query().Get("to"); t != "" { + if to, err = types.NewStreamPositionFromString(t); err != nil { + return util.ErrorResponse(err) + } + } + if l := req.URL.Query().Get("limit"); l != "" { + if limit, err = strconv.Atoi(l); err != nil { + return util.ErrorResponse(err) + } + } + if limit == 0 || limit > 50 { + limit = 50 + } + if dir == "" { + dir = "b" + } if dir != "b" && dir != "f" { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"), } } - if dir == "" { - dir = "b" - } res := &RelationsResponse{} @@ -58,7 +77,18 @@ func Relations(req *http.Request, device *api.Device, syncDB storage.Database, r var succeeded bool defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) - _, _, _, _ = from, to, limit, dir + if to == 0 { + if to, err = snapshot.MaxStreamPositionForRelations(req.Context()); err != nil { + return util.ErrorResponse(err) + } + } + + res.Chunk, res.PrevBatch, res.NextBatch, err = snapshot.RelationsFor( + req.Context(), roomID, eventID, relType, eventType, from, to, limit, + ) + if err != nil { + return util.ErrorResponse(err) + } succeeded = true return util.JSONResponse{ diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 972e74f16..3135f1a88 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -38,6 +38,7 @@ type DatabaseTransaction interface { MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) + MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) @@ -107,6 +108,7 @@ type DatabaseTransaction interface { GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) + RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, limit int) (clientEvents []gomatrixserverlib.ClientEvent, prevBatch, nextBatch string, err error) } type Database interface { diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index 28e669d81..b1a04b962 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -590,6 +590,11 @@ func (d *DatabaseTransaction) MaxStreamPositionForPresence(ctx context.Context) return d.Presence.GetMaxPresenceID(ctx, d.txn) } +func (d *DatabaseTransaction) MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error) { + id, err := d.Relations.SelectMaxRelationID(ctx, d.txn) + return types.StreamPosition(id), err +} + func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, limit int) ( clientEvents []gomatrixserverlib.ClientEvent, prevBatch, nextBatch string, err error, ) { diff --git a/syncapi/types/types.go b/syncapi/types/types.go index b6d340f93..72e816d30 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -47,6 +47,14 @@ type StateDelta struct { // StreamPosition represents the offset in the sync stream a client is at. type StreamPosition int64 +func NewStreamPositionFromString(s string) (StreamPosition, error) { + n, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + return StreamPosition(n), nil +} + // StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event. type StreamEvent struct { *gomatrixserverlib.HeaderedEvent