Timeline filtering

Signed-off-by: Thibaut CHARLES cromfr@gmail.com
This commit is contained in:
Crom (Thibaut CHARLES) 2018-01-05 14:25:36 +01:00
parent cdd6b0a3d3
commit da0658dc67
No known key found for this signature in database
GPG key ID: 45A3D5F880B9E6D0
6 changed files with 161 additions and 71 deletions

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -39,10 +40,10 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event -- The event ID for the event
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
-- The 'room_id' key for the event. -- The 'room_id' key for the event. TODO Duplicate of (event_json->>'room_id')
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8. -- The JSON for the event. Stored as JSON
event_json TEXT NOT NULL, event_json JSON NOT NULL,
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
-- if there is no delta. -- if there is no delta.
add_state_ids TEXT[], add_state_ids TEXT[],
@ -51,7 +52,11 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
transaction_id TEXT -- The transaction id used to send the event, if any transaction_id TEXT -- The transaction id used to send the event, if any
); );
-- for event selection -- for event selection
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(
event_id,
room_id,
(event_json->>'type'),
(event_json->>'sender'));
` `
const insertEventSQL = "" + const insertEventSQL = "" +
@ -62,10 +67,15 @@ const insertEventSQL = "" +
const selectEventsSQL = "" + const selectEventsSQL = "" +
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" + const selectRoomRecentEventsSQL = "" +
"SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id=$1 " +
" ORDER BY id ASC LIMIT $4" " AND id > $2 AND id <= $3" +
" AND ( $4::text[] IS NULL OR (event_json->>'sender') = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT((event_json->>'sender') = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR (event_json->>'type') = ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT((event_json->>'type') = ANY($7)) )" +
" ORDER BY id DESC LIMIT $8"
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -78,11 +88,11 @@ const selectStateInRangeSQL = "" +
" ORDER BY id ASC" " ORDER BY id ASC"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt selectRoomRecentEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
} }
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@ -99,7 +109,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
return return
} }
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { if s.selectRoomRecentEventsStmt, err = db.Prepare(selectRoomRecentEventsSQL); err != nil {
return return
} }
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
@ -219,22 +229,29 @@ func (s *outputRoomEventsStatements) insertEvent(
return return
} }
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. func (s *outputRoomEventsStatements) selectRoomRecentEvents(
func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int, roomID string, fromPos, toPos types.StreamPosition, timelineFilter *gomatrix.FilterPart,
) ([]streamEvent, error) { ) ([]streamEvent, bool, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) stmt := common.TxStmt(txn, s.selectRoomRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos,
pq.StringArray(timelineFilter.Senders),
pq.StringArray(timelineFilter.NotSenders),
pq.StringArray(timelineFilter.Types),
pq.StringArray(timelineFilter.NotTypes),
timelineFilter.Limit, // TODO: limit abusive values?
)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
defer rows.Close() // nolint: errcheck defer rows.Close() // nolint: errcheck
events, err := rowsToStreamEvents(rows) events, err := rowsToStreamEvents(rows)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
return events, nil
return events, len(events) == timelineFilter.Limit, nil // TODO: len(events) == timelineFilter.Limit not accurate
} }
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -224,7 +225,7 @@ func (d *SyncServerDatabase) IncrementalSync(
ctx context.Context, ctx context.Context,
device authtypes.Device, device authtypes.Device,
fromPos, toPos types.StreamPosition, fromPos, toPos types.StreamPosition,
numRecentEventsPerRoom int, filter *gomatrix.Filter,
) (*types.Response, error) { ) (*types.Response, error) {
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil { if err != nil {
@ -244,7 +245,7 @@ func (d *SyncServerDatabase) IncrementalSync(
res := types.NewResponse(toPos) res := types.NewResponse(toPos)
for _, delta := range deltas { for _, delta := range deltas {
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, filter, res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -261,7 +262,7 @@ func (d *SyncServerDatabase) IncrementalSync(
// CompleteSync a complete /sync API response for the given user. // CompleteSync a complete /sync API response for the given user.
func (d *SyncServerDatabase) CompleteSync( func (d *SyncServerDatabase) CompleteSync(
ctx context.Context, userID string, numRecentEventsPerRoom int, ctx context.Context, userID string, filter *gomatrix.Filter,
) (*types.Response, error) { ) (*types.Response, error) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync stream position. // a consistent view of the database throughout. This includes extracting the sync stream position.
@ -275,7 +276,8 @@ func (d *SyncServerDatabase) CompleteSync(
defer common.EndTransaction(txn, &succeeded) defer common.EndTransaction(txn, &succeeded)
// Get the current stream position which we will base the sync response on. // Get the current stream position which we will base the sync response on.
pos, err := d.syncStreamPositionTx(ctx, txn) posFrom := types.StreamPosition(0)
posTo, err := d.syncStreamPositionTx(ctx, txn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -287,39 +289,53 @@ func (d *SyncServerDatabase) CompleteSync(
} }
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
res := types.NewResponse(pos) res := types.NewResponse(posTo)
for _, roomID := range roomIDs { for _, roomID := range roomIDs {
var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
return nil, err
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []streamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
)
if err != nil {
return nil, err
}
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs
recentEvents := streamEventsToEvents(nil, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true //Join response should contain events only if room isn't filtered
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) if !isRoomFiltered(filter, roomID) {
// Timeline events
var recentStreamEvents []streamEvent
var limited bool
recentStreamEvents, limited, err = d.events.selectRoomRecentEvents(
ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline)
if err != nil {
return nil, err
}
jr.Timeline.Limited = limited
recentEvents := streamEventsToEvents(nil, recentStreamEvents)
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(
recentEvents,
gomatrixserverlib.FormatSync)
// State events
var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
return nil, err
}
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
//TODO AccountData events
//TODO Ephemeral events
}
//TODO Handle jr.Timeline.prev_batch
res.Rooms.Join[roomID] = *jr res.Rooms.Join[roomID] = *jr
} }
if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil { //TODO: filter Invite events
if err = d.addInvitesToResponse(ctx, txn, userID, 0, posTo, res); err != nil {
return nil, err return nil, err
} }
//TODO handle res.Room[roomID].Leave
succeeded = true succeeded = true
return res, err return res, err
} }
@ -409,7 +425,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
txn *sql.Tx, txn *sql.Tx,
fromPos, toPos types.StreamPosition, fromPos, toPos types.StreamPosition,
delta stateDelta, delta stateDelta,
numRecentEventsPerRoom int, filter *gomatrix.Filter,
res *types.Response, res *types.Response,
) error { ) error {
endPos := toPos endPos := toPos
@ -422,8 +438,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// This is all "okay" assuming history_visibility == "shared" which it is by default. // This is all "okay" assuming history_visibility == "shared" which it is by default.
endPos = delta.membershipPos endPos = delta.membershipPos
} }
recentStreamEvents, err := d.events.selectRecentEvents( recentStreamEvents, limited, err := d.events.selectRoomRecentEvents(
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, ctx, txn, delta.roomID, fromPos, endPos, &filter.Room.Timeline,
) )
if err != nil { if err != nil {
return err return err
@ -440,7 +456,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
case "join": case "join":
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr res.Rooms.Join[delta.roomID] = *jr
case "leave": case "leave":
@ -450,7 +466,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// no longer in the room. // no longer in the room.
lr := types.NewLeaveResponse() lr := types.NewLeaveResponse()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.Timeline.Limited = limited
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr res.Rooms.Leave[delta.roomID] = *lr
} }

View file

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -286,8 +287,8 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
timeout: 1 * time.Minute, timeout: 1 * time.Minute,
since: &since, since: &since,
wantFullState: false, wantFullState: false,
limit: defaultTimelineLimit,
log: util.GetLogger(context.TODO()), log: util.GetLogger(context.TODO()),
ctx: context.TODO(), ctx: context.TODO(),
filter: gomatrix.DefaultFilter(),
} }
} }

View file

@ -16,11 +16,14 @@ package sync
import ( import (
"context" "context"
"encoding/json"
"errors"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -34,11 +37,11 @@ const defaultTimelineLimit = 20
type syncRequest struct { type syncRequest struct {
ctx context.Context ctx context.Context
device authtypes.Device device authtypes.Device
limit int
timeout time.Duration timeout time.Duration
since *types.StreamPosition // nil means that no since token was supplied since *types.StreamPosition // nil means that no since token was supplied
wantFullState bool wantFullState bool
log *log.Entry log *log.Entry
filter gomatrix.Filter
} }
func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) { func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) {
@ -49,15 +52,37 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO: Additional query params: set_presence, filter
filterStr := req.URL.Query().Get("filter")
var filter gomatrix.Filter
if filterStr != "" {
if filterStr[0] == '{' {
// Inline filter
filter = gomatrix.DefaultFilter()
err = json.Unmarshal([]byte(filterStr), &filter)
if err != nil {
return nil, err
}
err = filter.Validate()
if err != nil {
return nil, err
}
} else {
// Filter ID
// TODO retrieve filter from DB
return nil, errors.New("Filter ID retrieval not implemented")
}
}
// TODO: Additional query params: set_presence
return &syncRequest{ return &syncRequest{
ctx: req.Context(), ctx: req.Context(),
device: device, device: device,
timeout: timeout, timeout: timeout,
since: since, since: since,
wantFullState: wantFullState, wantFullState: wantFullState,
limit: defaultTimelineLimit, // TODO: read from filter
log: util.GetLogger(req.Context()), log: util.GetLogger(req.Context()),
filter: filter,
}, nil }, nil
} }

View file

@ -122,9 +122,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
// TODO: handle ignored users // TODO: handle ignored users
if req.since == nil { if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, &req.filter)
} else { } else {
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit) res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, &req.filter)
} }
if err != nil { if err != nil {

View file

@ -23,21 +23,23 @@ type Filter struct {
EventFields []string `json:"event_fields,omitempty"` EventFields []string `json:"event_fields,omitempty"`
EventFormat string `json:"event_format,omitempty"` EventFormat string `json:"event_format,omitempty"`
Presence FilterPart `json:"presence,omitempty"` Presence FilterPart `json:"presence,omitempty"`
Room struct { Room FilterRoom `json:"room,omitempty"`
AccountData FilterPart `json:"account_data,omitempty"` }
Ephemeral FilterPart `json:"ephemeral,omitempty"`
IncludeLeave bool `json:"include_leave,omitempty"` type FilterRoom struct {
NotRooms []string `json:"not_rooms,omitempty"` AccountData FilterPart `json:"account_data,omitempty"`
Rooms []string `json:"rooms,omitempty"` Ephemeral FilterPart `json:"ephemeral,omitempty"`
State FilterPart `json:"state,omitempty"` IncludeLeave bool `json:"include_leave,omitempty"`
Timeline FilterPart `json:"timeline,omitempty"` NotRooms []string `json:"not_rooms,omitempty"`
} `json:"room,omitempty"` Rooms []string `json:"rooms,omitempty"`
State FilterPart `json:"state,omitempty"`
Timeline FilterPart `json:"timeline,omitempty"`
} }
type FilterPart struct { type FilterPart struct {
NotRooms []string `json:"not_rooms,omitempty"` NotRooms []string `json:"not_rooms,omitempty"`
Rooms []string `json:"rooms,omitempty"` Rooms []string `json:"rooms,omitempty"`
Limit *int `json:"limit,omitempty"` Limit int `json:"limit,omitempty"`
NotSenders []string `json:"not_senders,omitempty"` NotSenders []string `json:"not_senders,omitempty"`
NotTypes []string `json:"not_types,omitempty"` NotTypes []string `json:"not_types,omitempty"`
Senders []string `json:"senders,omitempty"` Senders []string `json:"senders,omitempty"`
@ -50,3 +52,32 @@ func (filter *Filter) Validate() error {
} }
return nil return nil
} }
func DefaultFilter() Filter {
return Filter{
AccountData: defaultFilterPart(),
EventFields: nil,
EventFormat: "client",
Presence: defaultFilterPart(),
Room: FilterRoom{
AccountData: defaultFilterPart(),
Ephemeral: defaultFilterPart(),
IncludeLeave: false, //TODO check default value on synapse
NotRooms: nil,
Rooms: nil,
State: defaultFilterPart(),
Timeline: defaultFilterPart(),
},
}
}
func defaultFilterPart() FilterPart {
return FilterPart{
NotRooms: nil,
Rooms: nil,
Limit: 100, //TODO check this on synapse
NotSenders: nil,
NotTypes: nil,
Senders: nil,
Types: nil,
}
}