mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-28 17:23:09 -06:00
Implement filtering for roomId/messages
Signed-off-by: Joakim Hulthe <joakim@hulthe.net>
This commit is contained in:
parent
af41f6d454
commit
709db905ca
|
|
@ -16,6 +16,7 @@ package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
@ -45,7 +46,7 @@ type messagesReq struct {
|
||||||
fromStream *types.StreamingToken
|
fromStream *types.StreamingToken
|
||||||
device *userapi.Device
|
device *userapi.Device
|
||||||
wasToProvided bool
|
wasToProvided bool
|
||||||
limit int
|
filter gomatrixserverlib.RoomEventFilter
|
||||||
backwardOrdering bool
|
backwardOrdering bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -143,9 +144,21 @@ func OnIncomingMessagesRequest(
|
||||||
wasToProvided = false
|
wasToProvided = false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maximum number of events to return; defaults to 10.
|
// RoomEventFilter
|
||||||
limit := defaultMessagesLimit
|
filter := gomatrixserverlib.DefaultRoomEventFilter()
|
||||||
|
filter.Limit = defaultMessagesLimit
|
||||||
|
if s := req.URL.Query().Get("filter"); len(s) > 0 {
|
||||||
|
if err = json.Unmarshal([]byte(s), &filter); err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.BadJSON("The filter object could not be decoded into valid JSON: " + err.Error()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Maximum number of events to return.
|
||||||
if len(req.URL.Query().Get("limit")) > 0 {
|
if len(req.URL.Query().Get("limit")) > 0 {
|
||||||
|
var limit int
|
||||||
limit, err = strconv.Atoi(req.URL.Query().Get("limit"))
|
limit, err = strconv.Atoi(req.URL.Query().Get("limit"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -154,8 +167,10 @@ func OnIncomingMessagesRequest(
|
||||||
JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()),
|
JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Overwrite the limit set by the filter
|
||||||
|
filter.Limit = limit
|
||||||
}
|
}
|
||||||
// TODO: Implement filtering (#587)
|
|
||||||
|
|
||||||
// Check the room ID's format.
|
// Check the room ID's format.
|
||||||
if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil {
|
if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil {
|
||||||
|
|
@ -176,7 +191,7 @@ func OnIncomingMessagesRequest(
|
||||||
to: &to,
|
to: &to,
|
||||||
fromStream: fromStream,
|
fromStream: fromStream,
|
||||||
wasToProvided: wasToProvided,
|
wasToProvided: wasToProvided,
|
||||||
limit: limit,
|
filter: filter,
|
||||||
backwardOrdering: backwardOrdering,
|
backwardOrdering: backwardOrdering,
|
||||||
device: device,
|
device: device,
|
||||||
}
|
}
|
||||||
|
|
@ -190,7 +205,7 @@ func OnIncomingMessagesRequest(
|
||||||
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||||
"from": from.String(),
|
"from": from.String(),
|
||||||
"to": to.String(),
|
"to": to.String(),
|
||||||
"limit": limit,
|
"limit": filter.Limit,
|
||||||
"backwards": backwardOrdering,
|
"backwards": backwardOrdering,
|
||||||
"return_start": start.String(),
|
"return_start": start.String(),
|
||||||
"return_end": end.String(),
|
"return_end": end.String(),
|
||||||
|
|
@ -234,19 +249,17 @@ func (r *messagesReq) retrieveEvents() (
|
||||||
clientEvents []gomatrixserverlib.ClientEvent, start,
|
clientEvents []gomatrixserverlib.ClientEvent, start,
|
||||||
end types.TopologyToken, err error,
|
end types.TopologyToken, err error,
|
||||||
) {
|
) {
|
||||||
eventFilter := gomatrixserverlib.DefaultRoomEventFilter()
|
|
||||||
eventFilter.Limit = r.limit
|
|
||||||
|
|
||||||
// Retrieve the events from the local database.
|
// Retrieve the events from the local database.
|
||||||
var streamEvents []types.StreamEvent
|
var streamEvents []types.StreamEvent
|
||||||
if r.fromStream != nil {
|
if r.fromStream != nil {
|
||||||
toStream := r.to.StreamToken()
|
toStream := r.to.StreamToken()
|
||||||
streamEvents, err = r.db.GetEventsInStreamingRange(
|
streamEvents, err = r.db.GetEventsInStreamingRange(
|
||||||
r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering,
|
r.ctx, r.fromStream, &toStream, r.roomID, &r.filter, r.backwardOrdering,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
streamEvents, err = r.db.GetEventsInTopologicalRange(
|
streamEvents, err = r.db.GetEventsInTopologicalRange(
|
||||||
r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering,
|
r.ctx, r.from, r.to, r.roomID, r.filter.Limit, r.backwardOrdering,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -434,7 +447,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
|
||||||
// Check if we have backward extremities for this room.
|
// Check if we have backward extremities for this room.
|
||||||
if len(backwardExtremities) > 0 {
|
if len(backwardExtremities) > 0 {
|
||||||
// If so, retrieve as much events as needed through backfilling.
|
// If so, retrieve as much events as needed through backfilling.
|
||||||
events, err = r.backfill(r.roomID, backwardExtremities, r.limit)
|
events, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -456,7 +469,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
||||||
events []*gomatrixserverlib.HeaderedEvent, err error,
|
events []*gomatrixserverlib.HeaderedEvent, err error,
|
||||||
) {
|
) {
|
||||||
// Check if we have enough events.
|
// Check if we have enough events.
|
||||||
isSetLargeEnough := len(streamEvents) >= r.limit
|
isSetLargeEnough := len(streamEvents) >= r.filter.Limit
|
||||||
if !isSetLargeEnough {
|
if !isSetLargeEnough {
|
||||||
// it might be fine we don't have up to 'limit' events, let's find out
|
// it might be fine we don't have up to 'limit' events, let's find out
|
||||||
if r.backwardOrdering {
|
if r.backwardOrdering {
|
||||||
|
|
@ -483,7 +496,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
||||||
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
|
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
|
||||||
var pdus []*gomatrixserverlib.HeaderedEvent
|
var pdus []*gomatrixserverlib.HeaderedEvent
|
||||||
// Only ask the remote server for enough events to reach the limit.
|
// Only ask the remote server for enough events to reach the limit.
|
||||||
pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
|
pdus, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit-len(streamEvents))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue