mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-20 20:43:09 -06:00
Try to optimize init syncs by using lateral joins, not getting the
complete state for the current room if we discard it anyway
This commit is contained in:
parent
f98003c030
commit
c068c6eb62
|
|
@ -159,6 +159,7 @@ func Context(
|
||||||
}).Debug("applied history visibility (context eventsBefore/eventsAfter)")
|
}).Debug("applied history visibility (context eventsBefore/eventsAfter)")
|
||||||
|
|
||||||
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent
|
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent
|
||||||
|
stateFilter.LazyLoadMembers = false
|
||||||
state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil)
|
state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("unable to fetch current room state")
|
logrus.WithError(err).Error("unable to fetch current room state")
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ type DatabaseTransaction interface {
|
||||||
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
||||||
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
|
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
|
||||||
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
|
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
|
||||||
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
|
||||||
GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
|
GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
|
||||||
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
||||||
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
|
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
|
||||||
|
|
|
||||||
|
|
@ -275,6 +275,15 @@ func (s *currentRoomStateStatements) SelectCurrentState(
|
||||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
|
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
|
||||||
senders, notSenders := getSendersStateFilterFilter(stateFilter)
|
senders, notSenders := getSendersStateFilterFilter(stateFilter)
|
||||||
|
// We're going to query members later, so remove them from this request
|
||||||
|
if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
|
||||||
|
notTypes := &[]string{gomatrixserverlib.MRoomMember}
|
||||||
|
if stateFilter.NotTypes != nil {
|
||||||
|
*stateFilter.NotTypes = append(*stateFilter.NotTypes, gomatrixserverlib.MRoomMember)
|
||||||
|
} else {
|
||||||
|
stateFilter.NotTypes = notTypes
|
||||||
|
}
|
||||||
|
}
|
||||||
rows, err := stmt.QueryContext(ctx, roomID,
|
rows, err := stmt.QueryContext(ctx, roomID,
|
||||||
pq.StringArray(senders),
|
pq.StringArray(senders),
|
||||||
pq.StringArray(notSenders),
|
pq.StringArray(notSenders),
|
||||||
|
|
|
||||||
|
|
@ -21,16 +21,14 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const outputRoomEventsSchema = `
|
const outputRoomEventsSchema = `
|
||||||
|
|
@ -109,14 +107,27 @@ const selectRecentEventsSQL = "" +
|
||||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
" ORDER BY id DESC LIMIT $8"
|
" ORDER BY id DESC LIMIT $8"
|
||||||
|
|
||||||
const selectRecentEventsForSyncSQL = "" +
|
const selectRecentEventsForSyncSQL = `
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
WITH room_ids AS (
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
SELECT unnest($1::text[]) AS room_id
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
)
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
SELECT x.*
|
||||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
FROM room_ids,
|
||||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
LATERAL (
|
||||||
" ORDER BY id DESC LIMIT $8"
|
SELECT room_id, event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility
|
||||||
|
FROM syncapi_output_room_events recent_events
|
||||||
|
WHERE
|
||||||
|
recent_events.room_id = room_ids.room_id
|
||||||
|
AND recent_events.exclude_from_sync = FALSE
|
||||||
|
AND id > $2 AND id <= $3
|
||||||
|
AND ( $4::text[] IS NULL OR sender = ANY($4) )
|
||||||
|
AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )
|
||||||
|
AND ( $6::text[] IS NULL OR type LIKE ANY($6) )
|
||||||
|
AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )
|
||||||
|
ORDER BY recent_events.id DESC
|
||||||
|
LIMIT $8
|
||||||
|
) AS x
|
||||||
|
`
|
||||||
|
|
||||||
const selectEarlyEventsSQL = "" +
|
const selectEarlyEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||||
|
|
@ -396,11 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
|
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
|
||||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
|
||||||
// from sync.
|
// from sync.
|
||||||
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
func (s *outputRoomEventsStatements) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) {
|
||||||
ctx context.Context, txn *sql.Tx,
|
|
||||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
|
||||||
chronologicalOrder bool, onlySyncEvents bool,
|
|
||||||
) ([]types.StreamEvent, bool, error) {
|
|
||||||
var stmt *sql.Stmt
|
var stmt *sql.Stmt
|
||||||
if onlySyncEvents {
|
if onlySyncEvents {
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
|
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
|
||||||
|
|
@ -408,8 +415,9 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
|
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
|
||||||
}
|
}
|
||||||
senders, notSenders := getSendersRoomEventFilter(eventFilter)
|
senders, notSenders := getSendersRoomEventFilter(eventFilter)
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(
|
rows, err := stmt.QueryContext(
|
||||||
ctx, roomID, r.Low(), r.High(),
|
ctx, pq.StringArray(roomIDs), ra.Low(), ra.High(),
|
||||||
pq.StringArray(senders),
|
pq.StringArray(senders),
|
||||||
pq.StringArray(notSenders),
|
pq.StringArray(notSenders),
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
|
||||||
|
|
@ -417,34 +425,80 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
eventFilter.Limit+1,
|
eventFilter.Limit+1,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
|
||||||
events, err := rowsToStreamEvents(rows)
|
|
||||||
if err != nil {
|
result := make(map[string]types.RecentEvents)
|
||||||
return nil, false, err
|
|
||||||
}
|
for rows.Next() {
|
||||||
if chronologicalOrder {
|
var (
|
||||||
// The events need to be returned from oldest to latest, which isn't
|
roomID string
|
||||||
// necessary the way the SQL query returns them, so a sort is necessary to
|
eventID string
|
||||||
// ensure the events are in the right order in the slice.
|
streamPos types.StreamPosition
|
||||||
sort.SliceStable(events, func(i int, j int) bool {
|
eventBytes []byte
|
||||||
return events[i].StreamPosition < events[j].StreamPosition
|
excludeFromSync bool
|
||||||
})
|
sessionID *int64
|
||||||
}
|
txnID *string
|
||||||
// we queried for 1 more than the limit, so if we returned one more mark limited=true
|
transactionID *api.TransactionID
|
||||||
limited := false
|
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||||
if len(events) > eventFilter.Limit {
|
)
|
||||||
limited = true
|
if err := rows.Scan(&roomID, &eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||||
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
|
return nil, err
|
||||||
if chronologicalOrder {
|
|
||||||
events = events[1:]
|
|
||||||
} else {
|
|
||||||
events = events[:len(events)-1]
|
|
||||||
}
|
}
|
||||||
|
// TODO: Handle redacted events
|
||||||
|
var ev gomatrixserverlib.HeaderedEvent
|
||||||
|
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if sessionID != nil && txnID != nil {
|
||||||
|
transactionID = &api.TransactionID{
|
||||||
|
SessionID: *sessionID,
|
||||||
|
TransactionID: *txnID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r := result[roomID]
|
||||||
|
|
||||||
|
ev.Visibility = historyVisibility
|
||||||
|
r.Events = append(r.Events, types.StreamEvent{
|
||||||
|
HeaderedEvent: &ev,
|
||||||
|
StreamPosition: streamPos,
|
||||||
|
TransactionID: transactionID,
|
||||||
|
ExcludeFromSync: excludeFromSync,
|
||||||
|
})
|
||||||
|
|
||||||
|
result[roomID] = r
|
||||||
}
|
}
|
||||||
|
|
||||||
return events, limited, nil
|
if chronologicalOrder {
|
||||||
|
for roomID, evs := range result {
|
||||||
|
// The events need to be returned from oldest to latest, which isn't
|
||||||
|
// necessary the way the SQL query returns them, so a sort is necessary to
|
||||||
|
// ensure the events are in the right order in the slice.
|
||||||
|
sort.SliceStable(evs.Events, func(i int, j int) bool {
|
||||||
|
return evs.Events[i].StreamPosition < evs.Events[j].StreamPosition
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(evs.Events) > eventFilter.Limit {
|
||||||
|
evs.Limited = true
|
||||||
|
evs.Events = evs.Events[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
result[roomID] = evs
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for roomID, evs := range result {
|
||||||
|
if len(evs.Events) > eventFilter.Limit {
|
||||||
|
evs.Limited = true
|
||||||
|
evs.Events = evs.Events[:len(evs.Events)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
result[roomID] = evs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectEarlyEvents returns the earliest events in the given room, starting
|
// selectEarlyEvents returns the earliest events in the given room, starting
|
||||||
|
|
|
||||||
|
|
@ -151,8 +151,8 @@ func (d *DatabaseTransaction) GetRoomSummary(ctx context.Context, roomID, userID
|
||||||
return summary, nil
|
return summary, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
|
func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) {
|
||||||
return d.OutputEvents.SelectRecentEvents(ctx, d.txn, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
|
return d.OutputEvents.SelectRecentEvents(ctx, d.txn, roomIDs, r, eventFilter, chronologicalOrder, onlySyncEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
|
func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
|
||||||
|
|
@ -370,19 +370,25 @@ func (d *DatabaseTransaction) GetStateDeltas(
|
||||||
}
|
}
|
||||||
|
|
||||||
// get all the state events ever (i.e. for all available rooms) between these two positions
|
// get all the state events ever (i.e. for all available rooms) between these two positions
|
||||||
stateNeededFiltered, eventMapFiltered, err := d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs)
|
stateFiltered := state
|
||||||
if err != nil {
|
// avoid hitting the database if the result would be the same as above
|
||||||
if err == sql.ErrNoRows {
|
if !isStatefilterEmpty(stateFilter) {
|
||||||
return nil, nil, nil
|
var stateNeededFiltered map[string]map[string]bool
|
||||||
|
var eventMapFiltered map[string]types.StreamEvent
|
||||||
|
stateNeededFiltered, eventMapFiltered, err = d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return nil, nil, err
|
stateFiltered, err = d.fetchStateEvents(ctx, d.txn, stateNeededFiltered, eventMapFiltered)
|
||||||
}
|
if err != nil {
|
||||||
stateFiltered, err := d.fetchStateEvents(ctx, d.txn, stateNeededFiltered, eventMapFiltered)
|
if err == sql.ErrNoRows {
|
||||||
if err != nil {
|
return nil, nil, nil
|
||||||
if err == sql.ErrNoRows {
|
}
|
||||||
return nil, nil, nil
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return nil, nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// find out which rooms this user is peeking, if any.
|
// find out which rooms this user is peeking, if any.
|
||||||
|
|
@ -701,6 +707,28 @@ func (d *DatabaseTransaction) MaxStreamPositionForRelations(ctx context.Context)
|
||||||
return types.StreamPosition(id), err
|
return types.StreamPosition(id), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isStatefilterEmpty(filter *gomatrixserverlib.StateFilter) bool {
|
||||||
|
if filter == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case filter.NotTypes != nil && len(*filter.NotTypes) > 0:
|
||||||
|
return false
|
||||||
|
case filter.Types != nil && len(*filter.Types) > 0:
|
||||||
|
return false
|
||||||
|
case filter.Senders != nil && len(*filter.Senders) > 0:
|
||||||
|
return false
|
||||||
|
case filter.NotSenders != nil && len(*filter.NotSenders) > 0:
|
||||||
|
return false
|
||||||
|
case filter.NotRooms != nil && len(*filter.NotRooms) > 0:
|
||||||
|
return false
|
||||||
|
case filter.ContainsURL != nil:
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (
|
func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (
|
||||||
events []types.StreamEvent, prevBatch, nextBatch string, err error,
|
events []types.StreamEvent, prevBatch, nextBatch string, err error,
|
||||||
) {
|
) {
|
||||||
|
|
|
||||||
|
|
@ -366,11 +366,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
return streamPos, err
|
return streamPos, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
func (s *outputRoomEventsStatements) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) {
|
||||||
ctx context.Context, txn *sql.Tx,
|
|
||||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
|
||||||
chronologicalOrder bool, onlySyncEvents bool,
|
|
||||||
) ([]types.StreamEvent, bool, error) {
|
|
||||||
var query string
|
var query string
|
||||||
if onlySyncEvents {
|
if onlySyncEvents {
|
||||||
query = selectRecentEventsForSyncSQL
|
query = selectRecentEventsForSyncSQL
|
||||||
|
|
@ -381,25 +377,25 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
stmt, params, err := prepareWithFilters(
|
stmt, params, err := prepareWithFilters(
|
||||||
s.db, txn, query,
|
s.db, txn, query,
|
||||||
[]interface{}{
|
[]interface{}{
|
||||||
roomID, r.Low(), r.High(),
|
roomIDs, r.Low(), r.High(),
|
||||||
},
|
},
|
||||||
eventFilter.Senders, eventFilter.NotSenders,
|
eventFilter.Senders, eventFilter.NotSenders,
|
||||||
eventFilter.Types, eventFilter.NotTypes,
|
eventFilter.Types, eventFilter.NotTypes,
|
||||||
nil, eventFilter.ContainsURL, eventFilter.Limit+1, FilterOrderDesc,
|
nil, eventFilter.ContainsURL, eventFilter.Limit+1, FilterOrderDesc,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
|
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
|
defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, params...)
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
|
||||||
events, err := rowsToStreamEvents(rows)
|
events, err := rowsToStreamEvents(rows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if chronologicalOrder {
|
if chronologicalOrder {
|
||||||
// The events need to be returned from oldest to latest, which isn't
|
// The events need to be returned from oldest to latest, which isn't
|
||||||
|
|
@ -410,9 +406,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// we queried for 1 more than the limit, so if we returned one more mark limited=true
|
// we queried for 1 more than the limit, so if we returned one more mark limited=true
|
||||||
limited := false
|
|
||||||
if len(events) > eventFilter.Limit {
|
if len(events) > eventFilter.Limit {
|
||||||
limited = true
|
|
||||||
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
|
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
|
||||||
if chronologicalOrder {
|
if chronologicalOrder {
|
||||||
events = events[1:]
|
events = events[1:]
|
||||||
|
|
@ -420,7 +414,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
events = events[:len(events)-1]
|
events = events[:len(events)-1]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return events, limited, nil
|
return map[string]types.RecentEvents{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
||||||
|
|
|
||||||
|
|
@ -156,12 +156,12 @@ func TestRecentEventsPDU(t *testing.T) {
|
||||||
tc := testCases[i]
|
tc := testCases[i]
|
||||||
t.Run(tc.Name, func(st *testing.T) {
|
t.Run(tc.Name, func(st *testing.T) {
|
||||||
var filter gomatrixserverlib.RoomEventFilter
|
var filter gomatrixserverlib.RoomEventFilter
|
||||||
var gotEvents []types.StreamEvent
|
var gotEvents map[string]types.RecentEvents
|
||||||
var limited bool
|
var limited bool
|
||||||
filter.Limit = tc.Limit
|
filter.Limit = tc.Limit
|
||||||
WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) {
|
WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) {
|
||||||
var err error
|
var err error
|
||||||
gotEvents, limited, err = snapshot.RecentEvents(ctx, r.ID, types.Range{
|
gotEvents, err = snapshot.RecentEvents(ctx, []string{r.ID}, types.Range{
|
||||||
From: tc.From,
|
From: tc.From,
|
||||||
To: tc.To,
|
To: tc.To,
|
||||||
}, &filter, !tc.ReverseOrder, true)
|
}, &filter, !tc.ReverseOrder, true)
|
||||||
|
|
@ -169,15 +169,18 @@ func TestRecentEventsPDU(t *testing.T) {
|
||||||
st.Fatalf("failed to do sync: %s", err)
|
st.Fatalf("failed to do sync: %s", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
streamEvents := gotEvents[r.ID]
|
||||||
|
limited = streamEvents.Limited
|
||||||
if limited != tc.WantLimited {
|
if limited != tc.WantLimited {
|
||||||
st.Errorf("got limited=%v want %v", limited, tc.WantLimited)
|
st.Errorf("got limited=%v want %v", limited, tc.WantLimited)
|
||||||
}
|
}
|
||||||
if len(gotEvents) != len(tc.WantEvents) {
|
if len(streamEvents.Events) != len(tc.WantEvents) {
|
||||||
st.Errorf("got %d events, want %d", len(gotEvents), len(tc.WantEvents))
|
st.Errorf("got %d events, want %d", len(gotEvents), len(tc.WantEvents))
|
||||||
}
|
}
|
||||||
for j := range gotEvents {
|
|
||||||
if !reflect.DeepEqual(gotEvents[j].JSON(), tc.WantEvents[j].JSON()) {
|
for j := range streamEvents.Events {
|
||||||
st.Errorf("event %d got %s want %s", j, string(gotEvents[j].JSON()), string(tc.WantEvents[j].JSON()))
|
if !reflect.DeepEqual(streamEvents.Events[j].JSON(), tc.WantEvents[j].JSON()) {
|
||||||
|
st.Errorf("event %d got %s want %s", j, string(streamEvents.Events[j].JSON()), string(tc.WantEvents[j].JSON()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -66,7 +66,7 @@ type Events interface {
|
||||||
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
||||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
||||||
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
||||||
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
|
||||||
// SelectEarlyEvents returns the earliest events in the given room.
|
// SelectEarlyEvents returns the earliest events in the given room.
|
||||||
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error)
|
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error)
|
||||||
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *gomatrixserverlib.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
|
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *gomatrixserverlib.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
|
||||||
|
|
|
||||||
|
|
@ -82,19 +82,30 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars
|
s := time.Now()
|
||||||
// TODO: This might be inefficient, when joined to many and/or large rooms.
|
recentEvents, err := snapshot.RecentEvents(ctx, joinedRoomIDs, r, &eventFilter, true, true)
|
||||||
|
if err != nil {
|
||||||
|
return from
|
||||||
|
}
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"duration": time.Since(s),
|
||||||
|
"rooms": len(joinedRoomIDs)}).
|
||||||
|
Debugf("got recent events for all rooms")
|
||||||
|
// Build up a /sync response. Add joined rooms.
|
||||||
|
s = time.Now()
|
||||||
for _, roomID := range joinedRoomIDs {
|
for _, roomID := range joinedRoomIDs {
|
||||||
|
events := recentEvents[roomID]
|
||||||
|
// Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars
|
||||||
|
// TODO: This might be inefficient, when joined to many and/or large rooms.
|
||||||
joinedUsers := p.notifier.JoinedUsers(roomID)
|
joinedUsers := p.notifier.JoinedUsers(roomID)
|
||||||
for _, sharedUser := range joinedUsers {
|
for _, sharedUser := range joinedUsers {
|
||||||
p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser)
|
p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Build up a /sync response. Add joined rooms.
|
// get the join response for each room
|
||||||
for _, roomID := range joinedRoomIDs {
|
|
||||||
jr, jerr := p.getJoinResponseForCompleteSync(
|
jr, jerr := p.getJoinResponseForCompleteSync(
|
||||||
ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
|
ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, false,
|
||||||
|
events.Events, events.Limited,
|
||||||
)
|
)
|
||||||
if jerr != nil {
|
if jerr != nil {
|
||||||
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
|
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
|
|
@ -106,6 +117,10 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
req.Response.Rooms.Join[roomID] = jr
|
req.Response.Rooms.Join[roomID] = jr
|
||||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||||
}
|
}
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"duration": time.Since(s),
|
||||||
|
"rooms": len(joinedRoomIDs)}).
|
||||||
|
Debugf("built join response for all rooms")
|
||||||
|
|
||||||
// Add peeked rooms.
|
// Add peeked rooms.
|
||||||
peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
||||||
|
|
@ -113,11 +128,25 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
|
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
for _, peek := range peeks {
|
if len(peeks) > 0 {
|
||||||
if !peek.Deleted {
|
peekRooms := make([]string, 0, len(peeks))
|
||||||
|
for _, peek := range peeks {
|
||||||
|
if !peek.Deleted {
|
||||||
|
peekRooms = append(peekRooms, peek.RoomID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
recentEvents, err = snapshot.RecentEvents(ctx, peekRooms, r, &eventFilter, true, true)
|
||||||
|
if err != nil {
|
||||||
|
return from
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, roomID := range peekRooms {
|
||||||
var jr *types.JoinResponse
|
var jr *types.JoinResponse
|
||||||
|
events := recentEvents[roomID]
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
ctx, snapshot, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
|
ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, true,
|
||||||
|
events.Events, events.Limited,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
|
|
@ -126,7 +155,7 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
req.Response.Rooms.Peek[peek.RoomID] = jr
|
req.Response.Rooms.Peek[roomID] = jr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -227,7 +256,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
req *types.SyncRequest,
|
req *types.SyncRequest,
|
||||||
) (types.StreamPosition, error) {
|
) (types.StreamPosition, error) {
|
||||||
|
var err error
|
||||||
originalLimit := eventFilter.Limit
|
originalLimit := eventFilter.Limit
|
||||||
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
|
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
|
||||||
if r.Backwards && originalLimit < recentEventBackwardsLimit {
|
if r.Backwards && originalLimit < recentEventBackwardsLimit {
|
||||||
|
|
@ -238,8 +267,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
recentStreamEvents, limited, err := snapshot.RecentEvents(
|
dbEvents, err := snapshot.RecentEvents(
|
||||||
ctx, delta.RoomID, r,
|
ctx, []string{delta.RoomID}, r,
|
||||||
eventFilter, true, true,
|
eventFilter, true, true,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -248,6 +277,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
|
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
recentStreamEvents := dbEvents[delta.RoomID].Events
|
||||||
|
limited := dbEvents[delta.RoomID].Limited
|
||||||
|
|
||||||
recentEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
|
recentEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
|
||||||
snapshot.StreamEventsToEvents(device, recentStreamEvents),
|
snapshot.StreamEventsToEvents(device, recentStreamEvents),
|
||||||
gomatrixserverlib.TopologicalOrderByPrevEvents,
|
gomatrixserverlib.TopologicalOrderByPrevEvents,
|
||||||
|
|
@ -428,25 +461,16 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
snapshot storage.DatabaseTransaction,
|
snapshot storage.DatabaseTransaction,
|
||||||
roomID string,
|
roomID string,
|
||||||
r types.Range,
|
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
eventFilter *gomatrixserverlib.RoomEventFilter,
|
|
||||||
wantFullState bool,
|
wantFullState bool,
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
isPeek bool,
|
isPeek bool,
|
||||||
|
recentStreamEvents []types.StreamEvent,
|
||||||
|
limited bool,
|
||||||
) (jr *types.JoinResponse, err error) {
|
) (jr *types.JoinResponse, err error) {
|
||||||
jr = types.NewJoinResponse()
|
jr = types.NewJoinResponse()
|
||||||
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
||||||
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
||||||
recentStreamEvents, limited, err := snapshot.RecentEvents(
|
|
||||||
ctx, roomID, r, eventFilter, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return jr, nil
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Work our way through the timeline events and pick out the event IDs
|
// Work our way through the timeline events and pick out the event IDs
|
||||||
// of any state events that appear in the timeline. We'll specifically
|
// of any state events that appear in the timeline. We'll specifically
|
||||||
|
|
|
||||||
|
|
@ -63,6 +63,11 @@ type StreamEvent struct {
|
||||||
ExcludeFromSync bool
|
ExcludeFromSync bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RecentEvents struct {
|
||||||
|
Limited bool
|
||||||
|
Events []StreamEvent
|
||||||
|
}
|
||||||
|
|
||||||
// Range represents a range between two stream positions.
|
// Range represents a range between two stream positions.
|
||||||
type Range struct {
|
type Range struct {
|
||||||
// From is the position the client has already received.
|
// From is the position the client has already received.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue