mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Get state deltas without filters
This commit is contained in:
parent
d72d4f8d5d
commit
50a4f51f2f
|
|
@ -28,8 +28,9 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const outputRoomEventsSchema = `
|
const outputRoomEventsSchema = `
|
||||||
|
|
@ -138,13 +139,8 @@ const selectStateInRangeSQL = "" +
|
||||||
" FROM syncapi_output_room_events" +
|
" FROM syncapi_output_room_events" +
|
||||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||||
" AND room_id = ANY($3)" +
|
" AND room_id = ANY($3)" +
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
|
||||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
|
||||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
|
||||||
" AND ( $8::bool IS NULL OR contains_url = $8 )" +
|
|
||||||
" ORDER BY id ASC" +
|
" ORDER BY id ASC" +
|
||||||
" LIMIT $9"
|
" LIMIT $4"
|
||||||
|
|
||||||
const deleteEventsForRoomSQL = "" +
|
const deleteEventsForRoomSQL = "" +
|
||||||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||||
|
|
@ -241,14 +237,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||||
stateFilter *gomatrixserverlib.StateFilter, roomIDs []string,
|
stateFilter *gomatrixserverlib.StateFilter, roomIDs []string,
|
||||||
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
|
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
|
||||||
senders, notSenders := getSendersStateFilterFilter(stateFilter)
|
|
||||||
rows, err := stmt.QueryContext(
|
rows, err := stmt.QueryContext(
|
||||||
ctx, r.Low(), r.High(), pq.StringArray(roomIDs),
|
ctx, r.Low(), r.High(), pq.StringArray(roomIDs),
|
||||||
pq.StringArray(senders),
|
|
||||||
pq.StringArray(notSenders),
|
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
|
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
|
|
||||||
stateFilter.ContainsURL,
|
|
||||||
stateFilter.Limit,
|
stateFilter.Limit,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,11 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type DatabaseTransaction struct {
|
type DatabaseTransaction struct {
|
||||||
|
|
@ -371,6 +372,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
|
||||||
// If our membership is now join but the previous membership wasn't
|
// If our membership is now join but the previous membership wasn't
|
||||||
// then this is a "join transition", so we'll insert this room.
|
// then this is a "join transition", so we'll insert this room.
|
||||||
if prevMembership != membership {
|
if prevMembership != membership {
|
||||||
|
newlyJoinedRooms[roomID] = true
|
||||||
// Get the full room state, as we'll send that down for a newly
|
// Get the full room state, as we'll send that down for a newly
|
||||||
// joined room instead of a delta.
|
// joined room instead of a delta.
|
||||||
var s []types.StreamEvent
|
var s []types.StreamEvent
|
||||||
|
|
@ -384,7 +386,6 @@ func (d *DatabaseTransaction) GetStateDeltas(
|
||||||
// Add the information for this room into the state so that
|
// Add the information for this room into the state so that
|
||||||
// it will get added with all of the rest of the joined rooms.
|
// it will get added with all of the rest of the joined rooms.
|
||||||
state[roomID] = s
|
state[roomID] = s
|
||||||
newlyJoinedRooms[roomID] = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We won't add joined rooms into the delta at this point as they
|
// We won't add joined rooms into the delta at this point as they
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,9 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const outputRoomEventsSchema = `
|
const outputRoomEventsSchema = `
|
||||||
|
|
@ -191,9 +192,9 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||||
}
|
}
|
||||||
stmt, params, err := prepareWithFilters(
|
stmt, params, err := prepareWithFilters(
|
||||||
s.db, txn, stmtSQL, inputParams,
|
s.db, txn, stmtSQL, inputParams,
|
||||||
stateFilter.Senders, stateFilter.NotSenders,
|
nil, nil,
|
||||||
stateFilter.Types, stateFilter.NotTypes,
|
nil, nil,
|
||||||
nil, stateFilter.ContainsURL, stateFilter.Limit, FilterOrderAsc,
|
nil, nil, stateFilter.Limit, FilterOrderAsc,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -318,7 +318,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
|
||||||
// If we are limited by the filter AND the history visibility filter
|
// If we are limited by the filter AND the history visibility filter
|
||||||
// didn't "remove" events, return that the response is limited.
|
// didn't "remove" events, return that the response is limited.
|
||||||
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
|
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Join[delta.RoomID] = jr
|
res.Rooms.Join[delta.RoomID] = jr
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue