mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-02-15 15:34:28 -06:00
closer to complete
This commit is contained in:
parent
faeb0b4ba0
commit
9934f9543b
|
@ -1,6 +1,10 @@
|
||||||
package routing
|
package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
@ -10,8 +14,6 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"net/http"
|
|
||||||
"strconv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ThreadsResponse struct {
|
type ThreadsResponse struct {
|
||||||
|
@ -42,7 +44,13 @@ func Threads(
|
||||||
limit = 100
|
limit = 100
|
||||||
}
|
}
|
||||||
|
|
||||||
from := req.URL.Query().Get("from")
|
var from types.StreamPosition
|
||||||
|
if f := req.URL.Query().Get("from"); f != "" {
|
||||||
|
if from, err = types.NewStreamPositionFromString(f); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
include := req.URL.Query().Get("include")
|
include := req.URL.Query().Get("include")
|
||||||
|
|
||||||
snapshot, err := syncDB.NewDatabaseSnapshot(req.Context())
|
snapshot, err := syncDB.NewDatabaseSnapshot(req.Context())
|
||||||
|
@ -60,8 +68,9 @@ func Threads(
|
||||||
Chunk: []synctypes.ClientEvent{},
|
Chunk: []synctypes.ClientEvent{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var userID string
|
||||||
if include == "participated" {
|
if include == "participated" {
|
||||||
userID, err := spec.NewUserID(device.UserID, true)
|
_, err := spec.NewUserID(device.UserID, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("device.UserID invalid")
|
util.GetLogger(req.Context()).WithError(err).Error("device.UserID invalid")
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
@ -69,9 +78,26 @@ func Threads(
|
||||||
JSON: spec.Unknown("internal server error"),
|
JSON: spec.Unknown("internal server error"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var events []types.StreamEvent
|
userID = device.UserID
|
||||||
events, res.PrevBatch, res.NextBatch, err = snapshot.RelationsFor(
|
} else {
|
||||||
req.Context(), roomID.String(), "", relType, eventType, from, to, dir == "b", limit,
|
userID = ""
|
||||||
)
|
}
|
||||||
|
var headeredEvents []*rstypes.HeaderedEvent
|
||||||
|
headeredEvents, _, res.NextBatch, err = snapshot.ThreadsFor(
|
||||||
|
req.Context(), roomID.String(), userID, from, limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, event := range headeredEvents {
|
||||||
|
ce, err := synctypes.ToClientEvent(event, synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
||||||
|
return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
res.Chunk = append(res.Chunk, *ce)
|
||||||
|
}
|
||||||
|
|
||||||
|
return util.JSONResponse{
|
||||||
|
JSON: res,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,10 +17,9 @@ package postgres
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
types2 "github.com/matrix-org/dendrite/roomserver/types"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
)
|
)
|
||||||
|
@ -65,21 +64,21 @@ const selectRelationsInRangeDescSQL = "" +
|
||||||
" ORDER BY id DESC LIMIT $7"
|
" ORDER BY id DESC LIMIT $7"
|
||||||
|
|
||||||
const selectThreadsSQL = "" +
|
const selectThreadsSQL = "" +
|
||||||
"SELECT syncapi_relations.id, syncapi_relations.child_event_id, syncapi_output_room_events.sender, syncapi_relations.event_id, syncapi_output_room_events.headered_event_json FROM syncapi_relations" +
|
"SELECT syncapi_relations.id, syncapi_relations.child_event_id, syncapi_output_room_events.sender, syncapi_relations.event_id, syncapi_output_room_events.headered_event_json, syncapi_output_room_events.type FROM syncapi_relations" +
|
||||||
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
|
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
|
||||||
" WHERE syncapi_relations.room_id = $1" +
|
" WHERE syncapi_relations.room_id = $1" +
|
||||||
" AND syncapi_relations.rel_type = 'm.thread'" +
|
" AND syncapi_relations.rel_type = 'm.thread'" +
|
||||||
" AND syncapi_relations.id >= $2 AND syncapi_relations.id < $3" +
|
" AND syncapi_relations.id >= $2 AND" +
|
||||||
" ORDER BY syncapi_relations.id LIMIT $4"
|
" ORDER BY syncapi_relations.id LIMIT $3"
|
||||||
|
|
||||||
const selectThreadsWithSenderSQL = "" +
|
const selectThreadsWithSenderSQL = "" +
|
||||||
"SELECT syncapi_relations.id, syncapi_relations.child_event_id, syncapi_output_room_events.sender, syncapi_relations.event_id, syncapi_output_room_events.headered_event_json FROM syncapi_relations" +
|
"SELECT syncapi_relations.id, syncapi_relations.child_event_id, syncapi_output_room_events.sender, syncapi_relations.event_id, syncapi_output_room_events.headered_event_json, syncapi_output_room_events.type FROM syncapi_relations" +
|
||||||
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
|
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
|
||||||
" WHERE syncapi_relations.room_id = $1" +
|
" WHERE syncapi_relations.room_id = $1" +
|
||||||
" AND syncapi_output_room_events.sender = $2" +
|
" AND syncapi_output_room_events.sender = $2" +
|
||||||
" AND syncapi_relations.rel_type = 'm.thread'" +
|
" AND syncapi_relations.rel_type = 'm.thread'" +
|
||||||
" AND syncapi_relations.id >= $3 AND syncapi_relations.id < $4" +
|
" AND syncapi_relations.id >= $3" +
|
||||||
" ORDER BY syncapi_relations.id LIMIT $5"
|
" ORDER BY syncapi_relations.id LIMIT $4"
|
||||||
|
|
||||||
const selectMaxRelationIDSQL = "" +
|
const selectMaxRelationIDSQL = "" +
|
||||||
"SELECT COALESCE(MAX(id), 0) FROM syncapi_relations"
|
"SELECT COALESCE(MAX(id), 0) FROM syncapi_relations"
|
||||||
|
@ -175,9 +174,9 @@ func (s *relationsStatements) SelectThreads(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
txn *sql.Tx,
|
txn *sql.Tx,
|
||||||
roomID, userID string,
|
roomID, userID string,
|
||||||
r types.Range,
|
from types.StreamPosition,
|
||||||
limit int,
|
limit uint64,
|
||||||
) ([]map[string]any, types.StreamPosition, error) {
|
) ([]string, types.StreamPosition, error) {
|
||||||
var lastPos types.StreamPosition
|
var lastPos types.StreamPosition
|
||||||
var stmt *sql.Stmt
|
var stmt *sql.Stmt
|
||||||
var rows *sql.Rows
|
var rows *sql.Rows
|
||||||
|
@ -185,35 +184,34 @@ func (s *relationsStatements) SelectThreads(
|
||||||
|
|
||||||
if userID == "" {
|
if userID == "" {
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectThreadsStmt)
|
stmt = sqlutil.TxStmt(txn, s.selectThreadsStmt)
|
||||||
rows, err = stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
|
rows, err = stmt.QueryContext(ctx, roomID, from, limit)
|
||||||
} else {
|
} else {
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectThreadsWithSenderStmt)
|
stmt = sqlutil.TxStmt(txn, s.selectThreadsWithSenderStmt)
|
||||||
rows, err = stmt.QueryContext(ctx, roomID, userID, r.Low(), r.High(), limit)
|
rows, err = stmt.QueryContext(ctx, roomID, userID, from, limit)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, lastPos, err
|
return nil, lastPos, err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectThreads: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "selectThreads: rows.close() failed")
|
||||||
var result []map[string]any
|
var result []string
|
||||||
var (
|
var (
|
||||||
id types.StreamPosition
|
id types.StreamPosition
|
||||||
childEventID string
|
childEventID string
|
||||||
sender string
|
sender string
|
||||||
eventId string
|
eventId string
|
||||||
headeredEventJson string
|
headeredEventJson string
|
||||||
|
eventType string
|
||||||
)
|
)
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err = rows.Scan(&id, &childEventID, &sender, &eventId, &headeredEventJson); err != nil {
|
if err = rows.Scan(&id, &childEventID, &sender, &eventId, &headeredEventJson, &eventType); err != nil {
|
||||||
return nil, lastPos, err
|
return nil, lastPos, err
|
||||||
}
|
}
|
||||||
if id > lastPos {
|
if id > lastPos {
|
||||||
lastPos = id
|
lastPos = id
|
||||||
}
|
}
|
||||||
var event types2.HeaderedEvent
|
result = append(result, eventId)
|
||||||
json.Unmarshal(event)
|
|
||||||
result = append(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, lastPos, rows.Err()
|
return result, lastPos, rows.Err()
|
||||||
|
|
|
@ -812,8 +812,8 @@ func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID,
|
||||||
return events, prevBatch, nextBatch, nil
|
return events, prevBatch, nextBatch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) ThreadsFor(ctx context.Context, roomID, userID string, from types.StreamPosition, limit int) (
|
func (d *DatabaseTransaction) ThreadsFor(ctx context.Context, roomID, userID string, from types.StreamPosition, limit uint64) (
|
||||||
events []types.StreamEvent, prevBatch, nextBatch string, err error,
|
events []*rstypes.HeaderedEvent, prevBatch, nextBatch string, err error,
|
||||||
) {
|
) {
|
||||||
r := types.Range{
|
r := types.Range{
|
||||||
From: from,
|
From: from,
|
||||||
|
@ -831,43 +831,16 @@ func (d *DatabaseTransaction) ThreadsFor(ctx context.Context, roomID, userID str
|
||||||
r.From++
|
r.From++
|
||||||
}
|
}
|
||||||
|
|
||||||
// First look up any relations from the database. We add one to the limit here
|
// First look up any threads from the database. We add one to the limit here
|
||||||
// so that we can tell if we're overflowing, as we will only set the "next_batch"
|
// so that we can tell if we're overflowing, as we will only set the "next_batch"
|
||||||
// in the response if we are.
|
// in the response if we are.
|
||||||
relations, _, err := d.Relations.SelectThreads(ctx, d.txn, roomID, userID, limit+1)
|
eventIDs, _, err := d.Relations.SelectThreads(ctx, d.txn, roomID, userID, from, limit+1)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", "", fmt.Errorf("d.Relations.SelectRelationsInRange: %w", err)
|
return nil, "", "", fmt.Errorf("d.Relations.SelectRelationsInRange: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we specified a relation type then just get those results, otherwise collate
|
events, err = d.Events(ctx, eventIDs)
|
||||||
// them from all of the returned relation types.
|
|
||||||
entries := []types.RelationEntry{}
|
|
||||||
for _, e := range relations {
|
|
||||||
entries = append(entries, e...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there were no entries returned, there were no relations, so stop at this point.
|
|
||||||
if len(entries) == 0 {
|
|
||||||
return nil, "", "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise, let's try and work out what sensible prev_batch and next_batch values
|
|
||||||
// could be. We've requested an extra event by adding one to the limit already so
|
|
||||||
// that we can determine whether or not to provide a "next_batch", so trim off that
|
|
||||||
// event off the end if needs be.
|
|
||||||
if len(entries) > limit {
|
|
||||||
entries = entries[:len(entries)-1]
|
|
||||||
nextBatch = fmt.Sprintf("%d", entries[len(entries)-1].Position)
|
|
||||||
}
|
|
||||||
// TODO: set prevBatch? doesn't seem to affect the tests...
|
|
||||||
|
|
||||||
// Extract all of the event IDs from the relation entries so that we can pull the
|
|
||||||
// events out of the database. Then go and fetch the events.
|
|
||||||
eventIDs := make([]string, 0, len(entries))
|
|
||||||
for _, entry := range entries {
|
|
||||||
eventIDs = append(eventIDs, entry.EventID)
|
|
||||||
}
|
|
||||||
events, err = d.OutputEvents.SelectEvents(ctx, d.txn, eventIDs, nil, true)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", "", fmt.Errorf("d.OutputEvents.SelectEvents: %w", err)
|
return nil, "", "", fmt.Errorf("d.OutputEvents.SelectEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -237,7 +237,7 @@ type Relations interface {
|
||||||
SelectRelationsInRange(ctx context.Context, txn *sql.Tx, roomID, eventID, relType, eventType string, r types.Range, limit int) (map[string][]types.RelationEntry, types.StreamPosition, error)
|
SelectRelationsInRange(ctx context.Context, txn *sql.Tx, roomID, eventID, relType, eventType string, r types.Range, limit int) (map[string][]types.RelationEntry, types.StreamPosition, error)
|
||||||
// SelectThreads this will find some threads from a room
|
// SelectThreads this will find some threads from a room
|
||||||
// if userID is not empty then it will only include the threads that the user has participated
|
// if userID is not empty then it will only include the threads that the user has participated
|
||||||
SelectThreads(ctx context.Context, txn *sql.Tx, roomID, userID string, limit int) (map[string][]types.RelationEntry, types.StreamPosition, error)
|
SelectThreads(ctx context.Context, txn *sql.Tx, roomID, userID string, from types.StreamPosition, limit uint64) ([]string, types.StreamPosition, error)
|
||||||
// SelectMaxRelationID returns the maximum ID of all relations, used to determine what the boundaries
|
// SelectMaxRelationID returns the maximum ID of all relations, used to determine what the boundaries
|
||||||
// should be if there are no boundaries supplied (i.e. we want to work backwards but don't have a
|
// should be if there are no boundaries supplied (i.e. we want to work backwards but don't have a
|
||||||
// "from" or want to work forwards and don't have a "to").
|
// "from" or want to work forwards and don't have a "to").
|
||||||
|
|
Loading…
Reference in a new issue