mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-22 22:31:56 -06:00
Reuse the existing lazyload cache on /context and /messages (#2367)
This commit is contained in:
parent
0eb5bd1e13
commit
c07f347f00
|
@ -22,6 +22,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
roomserver "github.com/matrix-org/dendrite/roomserver/api"
|
roomserver "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
@ -44,6 +45,7 @@ func Context(
|
||||||
rsAPI roomserver.RoomserverInternalAPI,
|
rsAPI roomserver.RoomserverInternalAPI,
|
||||||
syncDB storage.Database,
|
syncDB storage.Database,
|
||||||
roomID, eventID string,
|
roomID, eventID string,
|
||||||
|
lazyLoadCache *caching.LazyLoadCache,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
filter, err := parseRoomEventFilter(req)
|
filter, err := parseRoomEventFilter(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -129,7 +131,7 @@ func Context(
|
||||||
|
|
||||||
eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatAll)
|
eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatAll)
|
||||||
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll)
|
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll)
|
||||||
newState := applyLazyLoadMembers(filter, eventsAfterClient, eventsBeforeClient, state)
|
newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache)
|
||||||
|
|
||||||
response := ContextRespsonse{
|
response := ContextRespsonse{
|
||||||
Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll),
|
Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll),
|
||||||
|
@ -148,15 +150,25 @@ func Context(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent, state []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
func applyLazyLoadMembers(
|
||||||
|
device *userapi.Device,
|
||||||
|
filter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent,
|
||||||
|
state []*gomatrixserverlib.HeaderedEvent,
|
||||||
|
lazyLoadCache *caching.LazyLoadCache,
|
||||||
|
) []*gomatrixserverlib.HeaderedEvent {
|
||||||
if filter == nil || !filter.LazyLoadMembers {
|
if filter == nil || !filter.LazyLoadMembers {
|
||||||
return state
|
return state
|
||||||
}
|
}
|
||||||
allEvents := append(eventsBefore, eventsAfter...)
|
allEvents := append(eventsBefore, eventsAfter...)
|
||||||
x := make(map[string]bool)
|
x := make(map[string]struct{})
|
||||||
// get members who actually send an event
|
// get members who actually send an event
|
||||||
for _, e := range allEvents {
|
for _, e := range allEvents {
|
||||||
x[e.Sender] = true
|
// Don't add membership events the client should already know about
|
||||||
|
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, e.RoomID, e.Sender); cached {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
x[e.Sender] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
newState := []*gomatrixserverlib.HeaderedEvent{}
|
newState := []*gomatrixserverlib.HeaderedEvent{}
|
||||||
|
@ -166,8 +178,9 @@ func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter
|
||||||
newState = append(newState, event)
|
newState = append(newState, event)
|
||||||
} else {
|
} else {
|
||||||
// did the user send an event?
|
// did the user send an event?
|
||||||
if x[event.Sender()] {
|
if _, ok := x[event.Sender()]; ok {
|
||||||
membershipEvents = append(membershipEvents, event)
|
membershipEvents = append(membershipEvents, event)
|
||||||
|
lazyLoadCache.StoreLazyLoadedUser(device, event.RoomID(), event.Sender(), event.EventID())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
@ -64,6 +65,7 @@ func OnIncomingMessagesRequest(
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
srp *sync.RequestPool,
|
srp *sync.RequestPool,
|
||||||
|
lazyLoadCache *caching.LazyLoadCache,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -200,6 +202,10 @@ func OnIncomingMessagesRequest(
|
||||||
if filter.LazyLoadMembers {
|
if filter.LazyLoadMembers {
|
||||||
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||||
for _, evt := range clientEvents {
|
for _, evt := range clientEvents {
|
||||||
|
// Don't add membership events the client should already know about
|
||||||
|
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
|
||||||
|
continue
|
||||||
|
}
|
||||||
membership, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
membership, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
|
util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
|
||||||
|
@ -207,10 +213,11 @@ func OnIncomingMessagesRequest(
|
||||||
}
|
}
|
||||||
if membership != nil {
|
if membership != nil {
|
||||||
membershipToUser[evt.Sender] = membership
|
membershipToUser[evt.Sender] = membership
|
||||||
|
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, evt := range membershipToUser {
|
for _, evt := range membershipToUser {
|
||||||
state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
|
state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
@ -38,6 +39,7 @@ func Setup(
|
||||||
userAPI userapi.UserInternalAPI, federation *gomatrixserverlib.FederationClient,
|
userAPI userapi.UserInternalAPI, federation *gomatrixserverlib.FederationClient,
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
|
lazyLoadCache *caching.LazyLoadCache,
|
||||||
) {
|
) {
|
||||||
v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter()
|
v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter()
|
||||||
|
|
||||||
|
@ -51,7 +53,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg, srp)
|
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg, srp, lazyLoadCache)
|
||||||
})).Methods(http.MethodGet, http.MethodOptions)
|
})).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
v3mux.Handle("/user/{userId}/filter",
|
v3mux.Handle("/user/{userId}/filter",
|
||||||
|
@ -89,6 +91,7 @@ func Setup(
|
||||||
req, device,
|
req, device,
|
||||||
rsAPI, syncDB,
|
rsAPI, syncDB,
|
||||||
vars["roomId"], vars["eventId"],
|
vars["roomId"], vars["eventId"],
|
||||||
|
lazyLoadCache,
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodGet, http.MethodOptions)
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
|
@ -148,5 +148,5 @@ func AddPublicRoutes(
|
||||||
logrus.WithError(err).Panicf("failed to start presence consumer")
|
logrus.WithError(err).Panicf("failed to start presence consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
|
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg, lazyLoadCache)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue