Return state on calls to /message and lazy load members (#2218)
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
parent
471fda810a
commit
af610df85a
|
@ -44,7 +44,7 @@ func Context(
|
||||||
syncDB storage.Database,
|
syncDB storage.Database,
|
||||||
roomID, eventID string,
|
roomID, eventID string,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
filter, err := parseContextParams(req)
|
filter, err := parseRoomEventFilter(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := ""
|
errMsg := ""
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
|
@ -164,7 +164,7 @@ func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter
|
||||||
return newState
|
return newState
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseContextParams(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
|
func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
|
||||||
// Default room filter
|
// Default room filter
|
||||||
filter := &gomatrixserverlib.RoomEventFilter{Limit: 10}
|
filter := &gomatrixserverlib.RoomEventFilter{Limit: 10}
|
||||||
|
|
||||||
|
|
|
@ -55,13 +55,13 @@ func Test_parseContextParams(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
gotFilter, err := parseContextParams(tt.req)
|
gotFilter, err := parseRoomEventFilter(tt.req)
|
||||||
if (err != nil) != tt.wantErr {
|
if (err != nil) != tt.wantErr {
|
||||||
t.Errorf("parseContextParams() error = %v, wantErr %v", err, tt.wantErr)
|
t.Errorf("parseRoomEventFilter() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(gotFilter, tt.wantFilter) {
|
if !reflect.DeepEqual(gotFilter, tt.wantFilter) {
|
||||||
t.Errorf("parseContextParams() gotFilter = %v, want %v", gotFilter, tt.wantFilter)
|
t.Errorf("parseRoomEventFilter() gotFilter = %v, want %v", gotFilter, tt.wantFilter)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -45,8 +44,8 @@ type messagesReq struct {
|
||||||
fromStream *types.StreamingToken
|
fromStream *types.StreamingToken
|
||||||
device *userapi.Device
|
device *userapi.Device
|
||||||
wasToProvided bool
|
wasToProvided bool
|
||||||
limit int
|
|
||||||
backwardOrdering bool
|
backwardOrdering bool
|
||||||
|
filter *gomatrixserverlib.RoomEventFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
type messagesResp struct {
|
type messagesResp struct {
|
||||||
|
@ -54,10 +53,9 @@ type messagesResp struct {
|
||||||
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token
|
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token
|
||||||
End string `json:"end"`
|
End string `json:"end"`
|
||||||
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
|
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
|
||||||
|
State []gomatrixserverlib.ClientEvent `json:"state"`
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultMessagesLimit = 10
|
|
||||||
|
|
||||||
// OnIncomingMessagesRequest implements the /messages endpoint from the
|
// OnIncomingMessagesRequest implements the /messages endpoint from the
|
||||||
// client-server API.
|
// client-server API.
|
||||||
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
|
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
|
||||||
|
@ -83,6 +81,14 @@ func OnIncomingMessagesRequest(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
filter, err := parseRoomEventFilter(req)
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.InvalidArgumentValue("unable to parse filter"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Extract parameters from the request's URL.
|
// Extract parameters from the request's URL.
|
||||||
// Pagination tokens.
|
// Pagination tokens.
|
||||||
var fromStream *types.StreamingToken
|
var fromStream *types.StreamingToken
|
||||||
|
@ -143,18 +149,6 @@ func OnIncomingMessagesRequest(
|
||||||
wasToProvided = false
|
wasToProvided = false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maximum number of events to return; defaults to 10.
|
|
||||||
limit := defaultMessagesLimit
|
|
||||||
if len(req.URL.Query().Get("limit")) > 0 {
|
|
||||||
limit, err = strconv.Atoi(req.URL.Query().Get("limit"))
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return util.JSONResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO: Implement filtering (#587)
|
// TODO: Implement filtering (#587)
|
||||||
|
|
||||||
// Check the room ID's format.
|
// Check the room ID's format.
|
||||||
|
@ -176,7 +170,7 @@ func OnIncomingMessagesRequest(
|
||||||
to: &to,
|
to: &to,
|
||||||
fromStream: fromStream,
|
fromStream: fromStream,
|
||||||
wasToProvided: wasToProvided,
|
wasToProvided: wasToProvided,
|
||||||
limit: limit,
|
filter: filter,
|
||||||
backwardOrdering: backwardOrdering,
|
backwardOrdering: backwardOrdering,
|
||||||
device: device,
|
device: device,
|
||||||
}
|
}
|
||||||
|
@ -187,10 +181,27 @@ func OnIncomingMessagesRequest(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set
|
||||||
|
state := []gomatrixserverlib.ClientEvent{}
|
||||||
|
if filter.LazyLoadMembers {
|
||||||
|
memberShipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||||
|
for _, evt := range clientEvents {
|
||||||
|
memberShip, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
memberShipToUser[evt.Sender] = memberShip
|
||||||
|
}
|
||||||
|
for _, evt := range memberShipToUser {
|
||||||
|
state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||||
"from": from.String(),
|
"from": from.String(),
|
||||||
"to": to.String(),
|
"to": to.String(),
|
||||||
"limit": limit,
|
"limit": filter.Limit,
|
||||||
"backwards": backwardOrdering,
|
"backwards": backwardOrdering,
|
||||||
"return_start": start.String(),
|
"return_start": start.String(),
|
||||||
"return_end": end.String(),
|
"return_end": end.String(),
|
||||||
|
@ -200,6 +211,7 @@ func OnIncomingMessagesRequest(
|
||||||
Chunk: clientEvents,
|
Chunk: clientEvents,
|
||||||
Start: start.String(),
|
Start: start.String(),
|
||||||
End: end.String(),
|
End: end.String(),
|
||||||
|
State: state,
|
||||||
}
|
}
|
||||||
if emptyFromSupplied {
|
if emptyFromSupplied {
|
||||||
res.StartStream = fromStream.String()
|
res.StartStream = fromStream.String()
|
||||||
|
@ -234,19 +246,18 @@ func (r *messagesReq) retrieveEvents() (
|
||||||
clientEvents []gomatrixserverlib.ClientEvent, start,
|
clientEvents []gomatrixserverlib.ClientEvent, start,
|
||||||
end types.TopologyToken, err error,
|
end types.TopologyToken, err error,
|
||||||
) {
|
) {
|
||||||
eventFilter := gomatrixserverlib.DefaultRoomEventFilter()
|
eventFilter := r.filter
|
||||||
eventFilter.Limit = r.limit
|
|
||||||
|
|
||||||
// Retrieve the events from the local database.
|
// Retrieve the events from the local database.
|
||||||
var streamEvents []types.StreamEvent
|
var streamEvents []types.StreamEvent
|
||||||
if r.fromStream != nil {
|
if r.fromStream != nil {
|
||||||
toStream := r.to.StreamToken()
|
toStream := r.to.StreamToken()
|
||||||
streamEvents, err = r.db.GetEventsInStreamingRange(
|
streamEvents, err = r.db.GetEventsInStreamingRange(
|
||||||
r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering,
|
r.ctx, r.fromStream, &toStream, r.roomID, eventFilter, r.backwardOrdering,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
streamEvents, err = r.db.GetEventsInTopologicalRange(
|
streamEvents, err = r.db.GetEventsInTopologicalRange(
|
||||||
r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering,
|
r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -434,7 +445,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
|
||||||
// Check if we have backward extremities for this room.
|
// Check if we have backward extremities for this room.
|
||||||
if len(backwardExtremities) > 0 {
|
if len(backwardExtremities) > 0 {
|
||||||
// If so, retrieve as much events as needed through backfilling.
|
// If so, retrieve as much events as needed through backfilling.
|
||||||
events, err = r.backfill(r.roomID, backwardExtremities, r.limit)
|
events, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -456,7 +467,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
||||||
events []*gomatrixserverlib.HeaderedEvent, err error,
|
events []*gomatrixserverlib.HeaderedEvent, err error,
|
||||||
) {
|
) {
|
||||||
// Check if we have enough events.
|
// Check if we have enough events.
|
||||||
isSetLargeEnough := len(streamEvents) >= r.limit
|
isSetLargeEnough := len(streamEvents) >= r.filter.Limit
|
||||||
if !isSetLargeEnough {
|
if !isSetLargeEnough {
|
||||||
// it might be fine we don't have up to 'limit' events, let's find out
|
// it might be fine we don't have up to 'limit' events, let's find out
|
||||||
if r.backwardOrdering {
|
if r.backwardOrdering {
|
||||||
|
@ -483,7 +494,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
||||||
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
|
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
|
||||||
var pdus []*gomatrixserverlib.HeaderedEvent
|
var pdus []*gomatrixserverlib.HeaderedEvent
|
||||||
// Only ask the remote server for enough events to reach the limit.
|
// Only ask the remote server for enough events to reach the limit.
|
||||||
pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
|
pdus, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit-len(streamEvents))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -597,6 +597,7 @@ Device list doesn't change if remote server is down
|
||||||
/context/ on non world readable room does not work
|
/context/ on non world readable room does not work
|
||||||
/context/ returns correct number of events
|
/context/ returns correct number of events
|
||||||
/context/ with lazy_load_members filter works
|
/context/ with lazy_load_members filter works
|
||||||
|
GET /rooms/:room_id/messages lazy loads members correctly
|
||||||
Can query remote device keys using POST after notification
|
Can query remote device keys using POST after notification
|
||||||
Device deletion propagates over federation
|
Device deletion propagates over federation
|
||||||
Get left notifs in sync and /keys/changes when other user leaves
|
Get left notifs in sync and /keys/changes when other user leaves
|
||||||
|
|
Loading…
Reference in a new issue