It sorta works

This commit is contained in:
Neil Alexander 2022-10-11 16:03:40 +01:00
parent 17b532104f
commit 25c8655497
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 53 additions and 8 deletions

View file

@ -16,6 +16,7 @@ package routing
import ( import (
"net/http" "net/http"
"strconv"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -23,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
) )
@ -34,20 +36,37 @@ type RelationsResponse struct {
// nolint:gocyclo // nolint:gocyclo
func Relations(req *http.Request, device *api.Device, syncDB storage.Database, roomID, eventID, relType, eventType string) util.JSONResponse { func Relations(req *http.Request, device *api.Device, syncDB storage.Database, roomID, eventID, relType, eventType string) util.JSONResponse {
var err error
var from, to types.StreamPosition
var limit int
dir := req.URL.Query().Get("dir") dir := req.URL.Query().Get("dir")
from := req.URL.Query().Get("from") if f := req.URL.Query().Get("from"); f != "" {
to := req.URL.Query().Get("to") if from, err = types.NewStreamPositionFromString(f); err != nil {
limit := req.URL.Query().Get("limit") return util.ErrorResponse(err)
}
}
if t := req.URL.Query().Get("to"); t != "" {
if to, err = types.NewStreamPositionFromString(t); err != nil {
return util.ErrorResponse(err)
}
}
if l := req.URL.Query().Get("limit"); l != "" {
if limit, err = strconv.Atoi(l); err != nil {
return util.ErrorResponse(err)
}
}
if limit == 0 || limit > 50 {
limit = 50
}
if dir == "" {
dir = "b"
}
if dir != "b" && dir != "f" { if dir != "b" && dir != "f" {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"), JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"),
} }
} }
if dir == "" {
dir = "b"
}
res := &RelationsResponse{} res := &RelationsResponse{}
@ -58,7 +77,18 @@ func Relations(req *http.Request, device *api.Device, syncDB storage.Database, r
var succeeded bool var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
_, _, _, _ = from, to, limit, dir if to == 0 {
if to, err = snapshot.MaxStreamPositionForRelations(req.Context()); err != nil {
return util.ErrorResponse(err)
}
}
res.Chunk, res.PrevBatch, res.NextBatch, err = snapshot.RelationsFor(
req.Context(), roomID, eventID, relType, eventType, from, to, limit,
)
if err != nil {
return util.ErrorResponse(err)
}
succeeded = true succeeded = true
return util.JSONResponse{ return util.JSONResponse{

View file

@ -38,6 +38,7 @@ type DatabaseTransaction interface {
MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
@ -107,6 +108,7 @@ type DatabaseTransaction interface {
GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error) GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error)
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, limit int) (clientEvents []gomatrixserverlib.ClientEvent, prevBatch, nextBatch string, err error)
} }
type Database interface { type Database interface {

View file

@ -590,6 +590,11 @@ func (d *DatabaseTransaction) MaxStreamPositionForPresence(ctx context.Context)
return d.Presence.GetMaxPresenceID(ctx, d.txn) return d.Presence.GetMaxPresenceID(ctx, d.txn)
} }
func (d *DatabaseTransaction) MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error) {
id, err := d.Relations.SelectMaxRelationID(ctx, d.txn)
return types.StreamPosition(id), err
}
func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, limit int) ( func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, limit int) (
clientEvents []gomatrixserverlib.ClientEvent, prevBatch, nextBatch string, err error, clientEvents []gomatrixserverlib.ClientEvent, prevBatch, nextBatch string, err error,
) { ) {

View file

@ -47,6 +47,14 @@ type StateDelta struct {
// StreamPosition represents the offset in the sync stream a client is at. // StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64 type StreamPosition int64
func NewStreamPositionFromString(s string) (StreamPosition, error) {
n, err := strconv.Atoi(s)
if err != nil {
return 0, err
}
return StreamPosition(n), nil
}
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event. // StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct { type StreamEvent struct {
*gomatrixserverlib.HeaderedEvent *gomatrixserverlib.HeaderedEvent