mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
Apply RoomEventFilter when getting events
This commit is contained in:
parent
2c80586360
commit
8e59b392ea
|
|
@ -30,11 +30,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ContextRespsonse struct {
|
type ContextRespsonse struct {
|
||||||
End string `json:"end,omitempty"`
|
End string `json:"end"`
|
||||||
Event gomatrixserverlib.ClientEvent `json:"event"`
|
Event gomatrixserverlib.ClientEvent `json:"event"`
|
||||||
EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"`
|
EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"`
|
||||||
EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"`
|
EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"`
|
||||||
Start string `json:"start,omitempty"`
|
Start string `json:"start"`
|
||||||
State []gomatrixserverlib.ClientEvent `json:"state"`
|
State []gomatrixserverlib.ClientEvent `json:"state"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -70,7 +70,7 @@ func Context(
|
||||||
}
|
}
|
||||||
|
|
||||||
stateFilter := gomatrixserverlib.StateFilter{
|
stateFilter := gomatrixserverlib.StateFilter{
|
||||||
Limit: filter.Limit,
|
Limit: 100,
|
||||||
NotSenders: filter.NotSenders,
|
NotSenders: filter.NotSenders,
|
||||||
NotTypes: filter.NotTypes,
|
NotTypes: filter.NotTypes,
|
||||||
Senders: filter.Senders,
|
Senders: filter.Senders,
|
||||||
|
|
@ -104,13 +104,13 @@ func Context(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter.Limit/2)
|
eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter)
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
logrus.WithError(err).Error("unable to fetch before events")
|
logrus.WithError(err).Error("unable to fetch before events")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
_, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, roomID, filter.Limit/2)
|
_, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, roomID, filter)
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
logrus.WithError(err).Error("unable to fetch after events")
|
logrus.WithError(err).Error("unable to fetch after events")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
|
|
@ -121,14 +121,16 @@ func Context(
|
||||||
newState := applyLazyLoadMembers(filter, eventsAfterClient, eventsBeforeClient, state)
|
newState := applyLazyLoadMembers(filter, eventsAfterClient, eventsBeforeClient, state)
|
||||||
|
|
||||||
response := ContextRespsonse{
|
response := ContextRespsonse{
|
||||||
End: "end",
|
|
||||||
Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll),
|
Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll),
|
||||||
EventsAfter: eventsAfterClient,
|
EventsAfter: eventsAfterClient,
|
||||||
EventsBefore: eventsBeforeClient,
|
EventsBefore: eventsBeforeClient,
|
||||||
Start: "start",
|
|
||||||
State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll),
|
State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(response.State) > filter.Limit {
|
||||||
|
response.State = response.State[len(response.State)-filter.Limit:]
|
||||||
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: response,
|
JSON: response,
|
||||||
|
|
@ -139,7 +141,7 @@ func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter
|
||||||
if filter == nil || !filter.LazyLoadMembers {
|
if filter == nil || !filter.LazyLoadMembers {
|
||||||
return state
|
return state
|
||||||
}
|
}
|
||||||
allEvents := append(eventsAfter, eventsBefore...)
|
allEvents := append(eventsBefore, eventsAfter...)
|
||||||
x := make(map[string]bool)
|
x := make(map[string]bool)
|
||||||
// get members who actually send an event
|
// get members who actually send an event
|
||||||
for _, e := range allEvents {
|
for _, e := range allEvents {
|
||||||
|
|
|
||||||
|
|
@ -139,7 +139,7 @@ type Database interface {
|
||||||
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
|
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
|
||||||
|
|
||||||
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextAfterEvent(ctx context.Context, id int, roomID string, limit int) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error)
|
SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -134,10 +134,20 @@ const selectContextEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||||
|
|
||||||
const selectContextBeforeEventSQL = "" +
|
const selectContextBeforeEventSQL = "" +
|
||||||
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2 ORDER BY id DESC LIMIT $3"
|
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
|
||||||
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
|
" ORDER BY id DESC LIMIT $3"
|
||||||
|
|
||||||
const selectContextAfterEventSQL = "" +
|
const selectContextAfterEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2 ORDER BY id ASC LIMIT $3"
|
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
|
||||||
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
|
" ORDER BY id ASC LIMIT $3"
|
||||||
|
|
||||||
const selectEventIDsAfterSQL = "" +
|
const selectEventIDsAfterSQL = "" +
|
||||||
"SELECT event_id FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
"SELECT event_id FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
||||||
|
|
@ -454,9 +464,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||||
ctx context.Context, txn *sql.Tx, id int, roomID string, limit int,
|
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
|
||||||
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||||
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(ctx, roomID, id, limit)
|
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
|
||||||
|
ctx, roomID, id, filter.Limit,
|
||||||
|
pq.StringArray(filter.Senders),
|
||||||
|
pq.StringArray(filter.NotSenders),
|
||||||
|
pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
|
||||||
|
pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -480,9 +496,15 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||||
ctx context.Context, txn *sql.Tx, id int, roomID string, limit int,
|
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
|
||||||
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||||
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(ctx, roomID, id, limit)
|
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
|
||||||
|
ctx, roomID, id, filter.Limit,
|
||||||
|
pq.StringArray(filter.Senders),
|
||||||
|
pq.StringArray(filter.NotSenders),
|
||||||
|
pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
|
||||||
|
pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -960,11 +960,11 @@ func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID strin
|
||||||
return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
|
return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, limit)
|
return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
|
||||||
}
|
}
|
||||||
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, limit int) (int, []*gomatrixserverlib.HeaderedEvent, error) {
|
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, limit)
|
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Database) SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error) {
|
func (s *Database) SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error) {
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,8 @@ const selectRoomIDsWithMembershipSQL = "" +
|
||||||
|
|
||||||
const selectCurrentStateSQL = "" +
|
const selectCurrentStateSQL = "" +
|
||||||
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1"
|
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1"
|
||||||
// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
|
|
||||||
|
// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
|
||||||
|
|
||||||
const selectJoinedUsersSQL = "" +
|
const selectJoinedUsersSQL = "" +
|
||||||
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -98,10 +99,10 @@ const selectContextEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||||
|
|
||||||
const selectContextBeforeEventSQL = "" +
|
const selectContextBeforeEventSQL = "" +
|
||||||
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2 ORDER BY id DESC LIMIT $3"
|
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
|
||||||
|
|
||||||
const selectContextAfterEventSQL = "" +
|
const selectContextAfterEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2 ORDER BY id ASC LIMIT $3"
|
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
||||||
|
|
||||||
const selectEventIDsAfterSQL = "" +
|
const selectEventIDsAfterSQL = "" +
|
||||||
"SELECT event_id FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
"SELECT event_id FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
||||||
|
|
@ -493,9 +494,19 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||||
ctx context.Context, txn *sql.Tx, id int, roomID string, limit int,
|
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
|
||||||
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||||
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(ctx, roomID, id, limit)
|
stmt, params, err := prepareWithFilters(
|
||||||
|
s.db, txn, selectContextBeforeEventSQL,
|
||||||
|
[]interface{}{
|
||||||
|
roomID, id,
|
||||||
|
},
|
||||||
|
filter.Senders, filter.NotSenders,
|
||||||
|
filter.Types, filter.NotTypes,
|
||||||
|
nil, filter.Limit, FilterOrderDesc,
|
||||||
|
)
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -519,9 +530,22 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||||
ctx context.Context, txn *sql.Tx, id int, roomID string, limit int,
|
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
|
||||||
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||||
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(ctx, roomID, id, limit)
|
logrus.Debugf("getting events after: %+v", filter)
|
||||||
|
stmt, params, err := prepareWithFilters(
|
||||||
|
s.db, txn, selectContextAfterEventSQL,
|
||||||
|
[]interface{}{
|
||||||
|
roomID, id,
|
||||||
|
},
|
||||||
|
filter.Senders, filter.NotSenders,
|
||||||
|
filter.Types, filter.NotTypes,
|
||||||
|
nil, filter.Limit, FilterOrderAsc,
|
||||||
|
)
|
||||||
|
|
||||||
|
logrus.Debugf("params, %+v", params)
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -540,7 +564,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||||
}
|
}
|
||||||
evts = append(evts, evt)
|
evts = append(evts, evt)
|
||||||
}
|
}
|
||||||
|
logrus.Debugf("returning events: %+v", evts)
|
||||||
return lastID, evts, rows.Err()
|
return lastID, evts, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -65,8 +65,8 @@ type Events interface {
|
||||||
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
|
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
|
||||||
|
|
||||||
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, limit int) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error)
|
SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue