msc2946: Make TestClientSpacesSummary pass

This commit is contained in:
Kegan Dougal 2022-02-28 17:41:15 +00:00
parent dd21572f6d
commit a9fc1ab0f2
2 changed files with 106 additions and 60 deletions

1
go.mod
View file

@ -18,6 +18,7 @@ require (
github.com/frankban/quicktest v1.14.0 // indirect github.com/frankban/quicktest v1.14.0 // indirect
github.com/getsentry/sentry-go v0.12.0 github.com/getsentry/sentry-go v0.12.0
github.com/gologme/log v1.3.0 github.com/gologme/log v1.3.0
github.com/google/uuid v1.2.0 // indirect
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/h2non/filetype v1.1.3 // indirect github.com/h2non/filetype v1.1.3 // indirect

View file

@ -26,6 +26,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
fs "github.com/matrix-org/dendrite/federationapi/api" fs "github.com/matrix-org/dendrite/federationapi/api"
@ -86,7 +87,6 @@ func federatedSpacesHandler(
rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI,
thisServer gomatrixserverlib.ServerName, thisServer gomatrixserverlib.ServerName,
) util.JSONResponse { ) util.JSONResponse {
inMemoryBatchCache := make(map[string]set)
u, err := url.Parse(fedReq.RequestURI()) u, err := url.Parse(fedReq.RequestURI())
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
@ -108,7 +108,8 @@ func federatedSpacesHandler(
rsAPI: rsAPI, rsAPI: rsAPI,
fsAPI: fsAPI, fsAPI: fsAPI,
inMemoryBatchCache: inMemoryBatchCache, // inline cache as we don't have pagination in federation mode
paginationCache: make(map[string]paginationInfo),
} }
return w.walk() return w.walk()
} }
@ -117,8 +118,11 @@ func spacesHandler(
rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI,
thisServer gomatrixserverlib.ServerName, thisServer gomatrixserverlib.ServerName,
) func(*http.Request, *userapi.Device) util.JSONResponse { ) func(*http.Request, *userapi.Device) util.JSONResponse {
// declared outside the returned handler so it persists between calls
// TODO: clear based on... time?
paginationCache := make(map[string]paginationInfo)
return func(req *http.Request, device *userapi.Device) util.JSONResponse { return func(req *http.Request, device *userapi.Device) util.JSONResponse {
inMemoryBatchCache := make(map[string]set)
// Extract the room ID from the request. Sanity check request data. // Extract the room ID from the request. Sanity check request data.
params, err := httputil.URLDecodeMapValues(mux.Vars(req)) params, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil { if err != nil {
@ -129,6 +133,7 @@ func spacesHandler(
suggestedOnly: req.URL.Query().Get("suggested_only") == "true", suggestedOnly: req.URL.Query().Get("suggested_only") == "true",
limit: parseInt(req.URL.Query().Get("limit"), 1000), limit: parseInt(req.URL.Query().Get("limit"), 1000),
maxDepth: parseInt(req.URL.Query().Get("max_depth"), -1), maxDepth: parseInt(req.URL.Query().Get("max_depth"), -1),
paginationToken: req.URL.Query().Get("from"),
rootRoomID: roomID, rootRoomID: roomID,
caller: device, caller: device,
thisServer: thisServer, thisServer: thisServer,
@ -136,12 +141,17 @@ func spacesHandler(
rsAPI: rsAPI, rsAPI: rsAPI,
fsAPI: fsAPI, fsAPI: fsAPI,
inMemoryBatchCache: inMemoryBatchCache, paginationCache: paginationCache,
} }
return w.walk() return w.walk()
} }
} }
type paginationInfo struct {
processed set
unvisited []roomVisit
}
type walker struct { type walker struct {
rootRoomID string rootRoomID string
caller *userapi.Device caller *userapi.Device
@ -153,9 +163,9 @@ type walker struct {
suggestedOnly bool suggestedOnly bool
limit int limit int
maxDepth int maxDepth int
paginationToken string
// user ID|device ID|batch_num => event/room IDs sent to client paginationCache map[string]paginationInfo
inMemoryBatchCache map[string]set
mu sync.Mutex mu sync.Mutex
} }
@ -166,25 +176,26 @@ func (w *walker) callerID() string {
return string(w.serverName) return string(w.serverName)
} }
func (w *walker) alreadySent(id string) bool { func (w *walker) newPaginationCache() (string, paginationInfo) {
w.mu.Lock() p := paginationInfo{
defer w.mu.Unlock() processed: make(set),
m, ok := w.inMemoryBatchCache[w.callerID()] unvisited: nil,
if !ok {
return false
} }
return m[id] tok := uuid.NewString()
return tok, p
} }
func (w *walker) markSent(id string) { func (w *walker) loadPaginationCache(paginationToken string) *paginationInfo {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
m := w.inMemoryBatchCache[w.callerID()] p := w.paginationCache[paginationToken]
if m == nil { return &p
m = make(set)
} }
m[id] = true
w.inMemoryBatchCache[w.callerID()] = m func (w *walker) storePaginationCache(paginationToken string, cache paginationInfo) {
w.mu.Lock()
defer w.mu.Unlock()
w.paginationCache[paginationToken] = cache
} }
type roomVisit struct { type roomVisit struct {
@ -212,13 +223,30 @@ func (w *walker) walk() util.JSONResponse {
var discoveredRooms []gomatrixserverlib.MSC2946Room var discoveredRooms []gomatrixserverlib.MSC2946Room
var cache *paginationInfo
if w.paginationToken != "" {
cache = w.loadPaginationCache(w.paginationToken)
if cache == nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.InvalidArgumentValue("invalid from"),
}
}
} else {
tok, c := w.newPaginationCache()
cache = &c
w.paginationToken = tok
// Begin walking the graph starting with the room ID in the request in a queue of unvisited rooms // Begin walking the graph starting with the room ID in the request in a queue of unvisited rooms
// Depth first -> stack data structure c.unvisited = append(c.unvisited, roomVisit{
unvisited := []roomVisit{{
roomID: w.rootRoomID, roomID: w.rootRoomID,
depth: 0, depth: 0,
}} })
processed := make(set) }
processed := cache.processed
unvisited := cache.unvisited
// Depth first -> stack data structure
for len(unvisited) > 0 { for len(unvisited) > 0 {
if len(discoveredRooms) >= w.limit { if len(discoveredRooms) >= w.limit {
break break
@ -229,11 +257,12 @@ func (w *walker) walk() util.JSONResponse {
unvisited = unvisited[:len(unvisited)-1] unvisited = unvisited[:len(unvisited)-1]
// If this room has already been processed, skip. // If this room has already been processed, skip.
// If this room exceeds the specified depth, skip. // If this room exceeds the specified depth, skip.
if processed[rv.roomID] || rv.roomID == "" || (w.maxDepth > 0 && rv.depth > w.maxDepth) { if processed.isSet(rv.roomID) || rv.roomID == "" || (w.maxDepth > 0 && rv.depth > w.maxDepth) {
continue continue
} }
// Mark this room as processed. // Mark this room as processed.
processed[rv.roomID] = true
processed.set(rv.roomID)
// if this room is not a space room, skip. // if this room is not a space room, skip.
var roomType string var roomType string
@ -278,13 +307,6 @@ func (w *walker) walk() util.JSONResponse {
} }
} }
// mark processed rooms for pagination purposes
for _, room := range discoveredRooms {
if !w.alreadySent(room.RoomID) {
w.markSent(room.RoomID)
}
}
// don't walk the children // don't walk the children
// if the parent is not a space room // if the parent is not a space room
if roomType != ConstCreateEventContentValueSpace { if roomType != ConstCreateEventContentValueSpace {
@ -309,12 +331,27 @@ func (w *walker) walk() util.JSONResponse {
}) })
} }
} }
if len(unvisited) > 0 {
// we still have more rooms so we need to send back a pagination token,
// we probably hit a room limit
cache.processed = processed
cache.unvisited = unvisited
w.storePaginationCache(w.paginationToken, *cache)
} else {
// clear the pagination token so we don't send it back to the client
// Note we do NOT nuke the cache just in case this response is lost
// and the client retries it.
w.paginationToken = ""
}
if w.caller != nil { if w.caller != nil {
// return CS API format // return CS API format
return util.JSONResponse{ return util.JSONResponse{
Code: 200, Code: 200,
JSON: MSC2946ClientResponse{ JSON: MSC2946ClientResponse{
Rooms: discoveredRooms, Rooms: discoveredRooms,
NextBatch: w.paginationToken,
}, },
} }
} }
@ -548,7 +585,15 @@ func (w *walker) childReferences(roomID string) ([]gomatrixserverlib.MSC2946Stri
return el, nil return el, nil
} }
type set map[string]bool type set map[string]struct{}
func (s set) set(val string) {
s[val] = struct{}{}
}
func (s set) isSet(val string) bool {
_, ok := s[val]
return ok
}
func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEvent { func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEvent {
if ev.StateKey() == nil { if ev.StateKey() == nil {