Return state on calls to /message and lazy load members

This commit is contained in:
Till Faelligen 2022-02-22 14:00:13 +01:00
parent bbe7d37928
commit ec86cd362d
4 changed files with 44 additions and 31 deletions

View file

@ -44,7 +44,7 @@ func Context(
syncDB storage.Database, syncDB storage.Database,
roomID, eventID string, roomID, eventID string,
) util.JSONResponse { ) util.JSONResponse {
filter, err := parseContextParams(req) filter, err := parseRoomEventFilter(req)
if err != nil { if err != nil {
errMsg := "" errMsg := ""
switch err.(type) { switch err.(type) {
@ -164,7 +164,7 @@ func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter
return newState return newState
} }
func parseContextParams(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) { func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
// Default room filter // Default room filter
filter := &gomatrixserverlib.RoomEventFilter{Limit: 10} filter := &gomatrixserverlib.RoomEventFilter{Limit: 10}

View file

@ -55,13 +55,13 @@ func Test_parseContextParams(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
gotFilter, err := parseContextParams(tt.req) gotFilter, err := parseRoomEventFilter(tt.req)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("parseContextParams() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("parseRoomEventFilter() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
if !reflect.DeepEqual(gotFilter, tt.wantFilter) { if !reflect.DeepEqual(gotFilter, tt.wantFilter) {
t.Errorf("parseContextParams() gotFilter = %v, want %v", gotFilter, tt.wantFilter) t.Errorf("parseRoomEventFilter() gotFilter = %v, want %v", gotFilter, tt.wantFilter)
} }
}) })
} }

View file

@ -19,7 +19,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"sort" "sort"
"strconv"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -45,8 +44,8 @@ type messagesReq struct {
fromStream *types.StreamingToken fromStream *types.StreamingToken
device *userapi.Device device *userapi.Device
wasToProvided bool wasToProvided bool
limit int
backwardOrdering bool backwardOrdering bool
filter *gomatrixserverlib.RoomEventFilter
} }
type messagesResp struct { type messagesResp struct {
@ -54,10 +53,9 @@ type messagesResp struct {
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token
End string `json:"end"` End string `json:"end"`
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
State []gomatrixserverlib.ClientEvent `json:"state"`
} }
const defaultMessagesLimit = 10
// OnIncomingMessagesRequest implements the /messages endpoint from the // OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API. // client-server API.
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
@ -83,6 +81,14 @@ func OnIncomingMessagesRequest(
} }
} }
filter, err := parseRoomEventFilter(req)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("unable to parse filter"),
}
}
// Extract parameters from the request's URL. // Extract parameters from the request's URL.
// Pagination tokens. // Pagination tokens.
var fromStream *types.StreamingToken var fromStream *types.StreamingToken
@ -143,18 +149,6 @@ func OnIncomingMessagesRequest(
wasToProvided = false wasToProvided = false
} }
// Maximum number of events to return; defaults to 10.
limit := defaultMessagesLimit
if len(req.URL.Query().Get("limit")) > 0 {
limit, err = strconv.Atoi(req.URL.Query().Get("limit"))
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()),
}
}
}
// TODO: Implement filtering (#587) // TODO: Implement filtering (#587)
// Check the room ID's format. // Check the room ID's format.
@ -176,7 +170,7 @@ func OnIncomingMessagesRequest(
to: &to, to: &to,
fromStream: fromStream, fromStream: fromStream,
wasToProvided: wasToProvided, wasToProvided: wasToProvided,
limit: limit, filter: filter,
backwardOrdering: backwardOrdering, backwardOrdering: backwardOrdering,
device: device, device: device,
} }
@ -187,10 +181,27 @@ func OnIncomingMessagesRequest(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
// at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set
state := []gomatrixserverlib.ClientEvent{}
if filter.LazyLoadMembers {
memberShipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
for _, evt := range clientEvents {
memberShip, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
continue
}
memberShipToUser[evt.Sender] = memberShip
}
for _, evt := range memberShipToUser {
state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
}
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{ util.GetLogger(req.Context()).WithFields(logrus.Fields{
"from": from.String(), "from": from.String(),
"to": to.String(), "to": to.String(),
"limit": limit, "limit": filter.Limit,
"backwards": backwardOrdering, "backwards": backwardOrdering,
"return_start": start.String(), "return_start": start.String(),
"return_end": end.String(), "return_end": end.String(),
@ -200,6 +211,7 @@ func OnIncomingMessagesRequest(
Chunk: clientEvents, Chunk: clientEvents,
Start: start.String(), Start: start.String(),
End: end.String(), End: end.String(),
State: state,
} }
if emptyFromSupplied { if emptyFromSupplied {
res.StartStream = fromStream.String() res.StartStream = fromStream.String()
@ -234,19 +246,18 @@ func (r *messagesReq) retrieveEvents() (
clientEvents []gomatrixserverlib.ClientEvent, start, clientEvents []gomatrixserverlib.ClientEvent, start,
end types.TopologyToken, err error, end types.TopologyToken, err error,
) { ) {
eventFilter := gomatrixserverlib.DefaultRoomEventFilter() eventFilter := r.filter
eventFilter.Limit = r.limit
// Retrieve the events from the local database. // Retrieve the events from the local database.
var streamEvents []types.StreamEvent var streamEvents []types.StreamEvent
if r.fromStream != nil { if r.fromStream != nil {
toStream := r.to.StreamToken() toStream := r.to.StreamToken()
streamEvents, err = r.db.GetEventsInStreamingRange( streamEvents, err = r.db.GetEventsInStreamingRange(
r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering, r.ctx, r.fromStream, &toStream, r.roomID, eventFilter, r.backwardOrdering,
) )
} else { } else {
streamEvents, err = r.db.GetEventsInTopologicalRange( streamEvents, err = r.db.GetEventsInTopologicalRange(
r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering, r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering,
) )
} }
if err != nil { if err != nil {
@ -434,7 +445,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// Check if we have backward extremities for this room. // Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 { if len(backwardExtremities) > 0 {
// If so, retrieve as much events as needed through backfilling. // If so, retrieve as much events as needed through backfilling.
events, err = r.backfill(r.roomID, backwardExtremities, r.limit) events, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit)
if err != nil { if err != nil {
return return
} }
@ -456,7 +467,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
events []*gomatrixserverlib.HeaderedEvent, err error, events []*gomatrixserverlib.HeaderedEvent, err error,
) { ) {
// Check if we have enough events. // Check if we have enough events.
isSetLargeEnough := len(streamEvents) >= r.limit isSetLargeEnough := len(streamEvents) >= r.filter.Limit
if !isSetLargeEnough { if !isSetLargeEnough {
// it might be fine we don't have up to 'limit' events, let's find out // it might be fine we don't have up to 'limit' events, let's find out
if r.backwardOrdering { if r.backwardOrdering {
@ -483,7 +494,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []*gomatrixserverlib.HeaderedEvent var pdus []*gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit. // Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents)) pdus, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit-len(streamEvents))
if err != nil { if err != nil {
return return
} }

View file

@ -597,3 +597,5 @@ Device list doesn't change if remote server is down
/context/ on non world readable room does not work /context/ on non world readable room does not work
/context/ returns correct number of events /context/ returns correct number of events
/context/ with lazy_load_members filter works /context/ with lazy_load_members filter works
GET /rooms/:room_id/messages lazy loads members correctly