Get state deltas without filters (#2810)

This makes the following changes:
- get state deltas without the user supplied filter, so we can actually
"calculate" state transitions
- closes `stmt` when using SQLite
- Adds presence for users who newly joined a room, even if the syncing
user already knows about the presence status (should fix
https://github.com/matrix-org/complement/pull/516)
This commit is contained in:
Till 2022-10-19 14:05:39 +02:00 committed by GitHub
parent 8cbe14bd6d
commit e79bfd8fd5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 144 additions and 65 deletions

View file

@ -28,8 +28,9 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
const outputRoomEventsSchema = `
@ -133,7 +134,7 @@ const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
const selectStateInRangeFilteredSQL = "" +
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
@ -146,6 +147,15 @@ const selectStateInRangeSQL = "" +
" ORDER BY id ASC" +
" LIMIT $9"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND room_id = ANY($3)" +
" ORDER BY id ASC" +
" LIMIT $4"
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
@ -171,20 +181,21 @@ const selectContextAfterEventSQL = "" +
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectEventsWitFilterStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
selectSearchStmt *sql.Stmt
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectEventsWitFilterStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeFilteredStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
selectSearchStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -214,6 +225,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectRecentEventsStmt, selectRecentEventsSQL},
{&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
{&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
{&s.selectStateInRangeFilteredStmt, selectStateInRangeFilteredSQL},
{&s.selectStateInRangeStmt, selectStateInRangeSQL},
{&s.updateEventJSONStmt, updateEventJSONSQL},
{&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL},
@ -240,17 +252,28 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
ctx context.Context, txn *sql.Tx, r types.Range,
stateFilter *gomatrixserverlib.StateFilter, roomIDs []string,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
senders, notSenders := getSendersStateFilterFilter(stateFilter)
rows, err := stmt.QueryContext(
ctx, r.Low(), r.High(), pq.StringArray(roomIDs),
pq.StringArray(senders),
pq.StringArray(notSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
stateFilter.ContainsURL,
stateFilter.Limit,
)
var rows *sql.Rows
var err error
if stateFilter != nil {
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeFilteredStmt)
senders, notSenders := getSendersStateFilterFilter(stateFilter)
rows, err = stmt.QueryContext(
ctx, r.Low(), r.High(), pq.StringArray(roomIDs),
pq.StringArray(senders),
pq.StringArray(notSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
stateFilter.ContainsURL,
stateFilter.Limit,
)
} else {
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
rows, err = stmt.QueryContext(
ctx, r.Low(), r.High(), pq.StringArray(roomIDs),
r.High()-r.Low(),
)
}
if err != nil {
return nil, nil, err
}

View file

@ -5,10 +5,11 @@ import (
"database/sql"
"fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type DatabaseTransaction struct {
@ -277,6 +278,7 @@ func (d *DatabaseTransaction) GetBackwardTopologyPos(
// exclusive of oldPos, inclusive of newPos, for the rooms in which
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
// nolint:gocyclo
func (d *DatabaseTransaction) GetStateDeltas(
ctx context.Context, device *userapi.Device,
r types.Range, userID string,
@ -311,7 +313,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
}
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs)
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, d.txn, r, nil, allRoomIDs)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil, nil
@ -326,6 +328,22 @@ func (d *DatabaseTransaction) GetStateDeltas(
return nil, nil, err
}
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeededFiltered, eventMapFiltered, err := d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil, nil
}
return nil, nil, err
}
stateFiltered, err := d.fetchStateEvents(ctx, d.txn, stateNeededFiltered, eventMapFiltered)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil, nil
}
return nil, nil, err
}
// find out which rooms this user is peeking, if any.
// We do this before joins so any peeks get overwritten
peeks, err := d.Peeks.SelectPeeksInRange(ctx, d.txn, userID, device.ID, r)
@ -371,6 +389,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
// If our membership is now join but the previous membership wasn't
// then this is a "join transition", so we'll insert this room.
if prevMembership != membership {
newlyJoinedRooms[roomID] = true
// Get the full room state, as we'll send that down for a newly
// joined room instead of a delta.
var s []types.StreamEvent
@ -383,8 +402,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
// Add the information for this room into the state so that
// it will get added with all of the rest of the joined rooms.
state[roomID] = s
newlyJoinedRooms[roomID] = true
stateFiltered[roomID] = s
}
// We won't add joined rooms into the delta at this point as they
@ -395,7 +413,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
deltas = append(deltas, types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
StateEvents: d.StreamEventsToEvents(device, stateFiltered[roomID]),
RoomID: roomID,
})
break
@ -407,7 +425,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, types.StateDelta{
Membership: gomatrixserverlib.Join,
StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
StateEvents: d.StreamEventsToEvents(device, stateFiltered[joinedRoomID]),
RoomID: joinedRoomID,
NewlyJoined: newlyJoinedRooms[joinedRoomID],
})

View file

@ -29,8 +29,9 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
const outputRoomEventsSchema = `
@ -189,21 +190,36 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for _, roomID := range roomIDs {
inputParams = append(inputParams, roomID)
}
stmt, params, err := prepareWithFilters(
s.db, txn, stmtSQL, inputParams,
stateFilter.Senders, stateFilter.NotSenders,
stateFilter.Types, stateFilter.NotTypes,
nil, stateFilter.ContainsURL, stateFilter.Limit, FilterOrderAsc,
var (
stmt *sql.Stmt
params []any
err error
)
if stateFilter != nil {
stmt, params, err = prepareWithFilters(
s.db, txn, stmtSQL, inputParams,
stateFilter.Senders, stateFilter.NotSenders,
stateFilter.Types, stateFilter.NotTypes,
nil, stateFilter.ContainsURL, stateFilter.Limit, FilterOrderAsc,
)
} else {
stmt, params, err = prepareWithFilters(
s.db, txn, stmtSQL, inputParams,
nil, nil,
nil, nil,
nil, nil, int(r.High()-r.Low()), FilterOrderAsc,
)
}
if err != nil {
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
}
defer internal.CloseAndLogIfError(ctx, stmt, "selectStateInRange: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, nil, err
}
defer rows.Close() // nolint: errcheck
defer internal.CloseAndLogIfError(ctx, rows, "selectStateInRange: rows.close() failed")
// Fetch all the state change events for all rooms between the two positions then loop each event and:
// - Keep a cache of the event by ID (99% of state change events are for the event itself)
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
@ -269,6 +285,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
) (id int64, err error) {
var nullableID sql.NullInt64
stmt := sqlutil.TxStmt(txn, s.selectMaxEventIDStmt)
defer internal.CloseAndLogIfError(ctx, stmt, "SelectMaxEventID: stmt.close() failed")
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid {
id = nullableID.Int64
@ -323,6 +340,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
return 0, err
}
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
defer internal.CloseAndLogIfError(ctx, insertStmt, "InsertEvent: stmt.close() failed")
_, err = insertStmt.ExecContext(
ctx,
streamPos,
@ -367,6 +385,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
if err != nil {
return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
}
defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
@ -415,6 +434,8 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents(
if err != nil {
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectEarlyEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, err
@ -456,6 +477,8 @@ func (s *outputRoomEventsStatements) SelectEvents(
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, err
@ -558,6 +581,10 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
filter.Types, filter.NotTypes,
nil, filter.ContainsURL, filter.Limit, FilterOrderDesc,
)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectContextBeforeEvent: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
@ -596,6 +623,10 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
filter.Types, filter.NotTypes,
nil, filter.ContainsURL, filter.Limit, FilterOrderAsc,
)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectContextAfterEvent: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {

View file

@ -194,7 +194,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
return newPos
@ -225,7 +225,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter,
stateFilter *gomatrixserverlib.StateFilter,
res *types.Response,
req *types.SyncRequest,
) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
@ -290,8 +290,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
hasMembershipChange := false
for _, recentEvent := range recentStreamEvents {
if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
if membership, _ := recentEvent.Membership(); membership == gomatrixserverlib.Join {
req.MembershipChanges[*recentEvent.StateKey()] = struct{}{}
}
hasMembershipChange = true
break
}
}
@ -318,9 +320,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = jr
req.Response.Rooms.Join[delta.RoomID] = jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
@ -329,7 +331,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.RoomID] = jr
req.Response.Rooms.Peek[delta.RoomID] = jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
@ -342,7 +344,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = lr
req.Response.Rooms.Leave[delta.RoomID] = lr
}
return latestPosition, nil

View file

@ -121,7 +121,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
prevPresence := pres.(*types.PresenceInternal)
currentlyActive := prevPresence.CurrentlyActive()
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
if skip {
_, membershipChange := req.MembershipChanges[presence.UserID]
if skip && !membershipChange {
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
continue
}

View file

@ -91,15 +91,16 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
})
return &types.SyncRequest{
Context: req.Context(), //
Log: logger, //
Device: &device, //
Response: types.NewResponse(), // Populated by all streams
Filter: filter, //
Since: since, //
Timeout: timeout, //
Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, //
Context: req.Context(), //
Log: logger, //
Device: &device, //
Response: types.NewResponse(), // Populated by all streams
Filter: filter, //
Since: since, //
Timeout: timeout, //
Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, //
MembershipChanges: make(map[string]struct{}), // Populated by the PDU stream
}, nil
}

View file

@ -4,9 +4,10 @@ import (
"context"
"time"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type SyncRequest struct {
@ -22,6 +23,8 @@ type SyncRequest struct {
// Updated by the PDU stream.
Rooms map[string]string
// Updated by the PDU stream.
MembershipChanges map[string]struct{}
// Updated by the PDU stream.
IgnoredUsers IgnoredUsers
}

View file

@ -39,12 +39,6 @@ Events in rooms with AS-hosted room aliases are sent to AS server
Inviting an AS-hosted user asks the AS server
Accesing an AS-hosted room alias asks the AS server
# Flakey, need additional investigation
Messages that notify from another user increment notification_count
Messages that highlight from another user increment unread highlight count
Notifications can be viewed with GET /notifications
# More flakey
Guest users can join guest_access rooms

View file

@ -746,4 +746,10 @@ Existing members see new member's presence
Inbound federation can return missing events for joined visibility
outliers whose auth_events are in a different room are correctly rejected
Messages that notify from another user increment notification_count
Messages that highlight from another user increment unread highlight count
Messages that highlight from another user increment unread highlight count
Newly joined room has correct timeline in incremental sync
When user joins a room the state is included in the next sync
When user joins a room the state is included in a gapped sync
Messages that notify from another user increment notification_count
Messages that highlight from another user increment unread highlight count
Notifications can be viewed with GET /notifications