User directory for nearby Pinecone peers

This commit is contained in:
Neil Alexander 2022-03-28 14:28:03 +01:00
parent 0692be44d9
commit f5357fa8b0
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
10 changed files with 201 additions and 22 deletions

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
@ -326,6 +327,9 @@ func (m *DendriteMonolith) Start() {
// This is different to rsAPI which can be the http client which doesn't need this dependency // This is different to rsAPI which can be the http client which doesn't need this dependency
rsAPI.SetFederationAPI(fsAPI, keyRing) rsAPI.SetFederationAPI(fsAPI, keyRing)
userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation)
roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
@ -333,13 +337,14 @@ func (m *DendriteMonolith) Start() {
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,
FederationAPI: fsAPI, FederationAPI: fsAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
UserAPI: m.userAPI, UserAPI: m.userAPI,
KeyAPI: keyAPI, KeyAPI: keyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation), ExtPublicRoomsProvider: roomProvider,
ExtUserDirectoryProvider: userProvider,
} }
monolith.AddAllPublicRoutes( monolith.AddAllPublicRoutes(
base.ProcessContext, base.ProcessContext,
@ -363,6 +368,7 @@ func (m *DendriteMonolith) Start() {
pHTTP := m.PineconeQUIC.HTTP() pHTTP := m.PineconeQUIC.HTTP()
pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux)
pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux)
pHTTP.Mux().HandleFunc(users.PublicURL, userProvider.FederatedUserProfiles)
// Build both ends of a HTTP multiplex. // Build both ends of a HTTP multiplex.
h2s := &http2.Server{} h2s := &http2.Server{}

View file

@ -17,6 +17,7 @@ package authtypes
// Profile represents the profile for a Matrix account. // Profile represents the profile for a Matrix account.
type Profile struct { type Profile struct {
Localpart string `json:"local_part"` Localpart string `json:"local_part"`
ServerName string `json:"server_name,omitempty"` // NOTSPEC: only set by Pinecone user provider
DisplayName string `json:"display_name"` DisplayName string `json:"display_name"`
AvatarURL string `json:"avatar_url"` AvatarURL string `json:"avatar_url"`
} }

View file

@ -45,6 +45,7 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
fsAPI federationAPI.FederationInternalAPI, fsAPI federationAPI.FederationInternalAPI,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
userDirectoryProvider userapi.UserDirectoryProvider,
keyAPI keyserverAPI.KeyInternalAPI, keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider, extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs, mscCfg *config.MSCs,
@ -58,7 +59,7 @@ func AddPublicRoutes(
routing.Setup( routing.Setup(
router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI, router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI,
userAPI, federation, userAPI, userDirectoryProvider, federation,
syncProducer, transactionsCache, fsAPI, keyAPI, syncProducer, transactionsCache, fsAPI, keyAPI,
extRoomsProvider, mscCfg, extRoomsProvider, mscCfg,
) )

View file

@ -51,6 +51,7 @@ func Setup(
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
userDirectoryProvider userapi.UserDirectoryProvider,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
syncProducer *producers.SyncAPIProducer, syncProducer *producers.SyncAPIProducer,
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
@ -904,6 +905,7 @@ func Setup(
device, device,
userAPI, userAPI,
rsAPI, rsAPI,
userDirectoryProvider,
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
postContent.SearchString, postContent.SearchString,
postContent.Limit, postContent.Limit,

View file

@ -16,6 +16,7 @@ package routing
import ( import (
"context" "context"
"database/sql"
"fmt" "fmt"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -35,6 +36,7 @@ func SearchUserDirectory(
device *userapi.Device, device *userapi.Device,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
provider userapi.UserDirectoryProvider,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
searchString string, searchString string,
limit int, limit int,
@ -50,13 +52,12 @@ func SearchUserDirectory(
} }
// First start searching local users. // First start searching local users.
userReq := &userapi.QuerySearchProfilesRequest{ userReq := &userapi.QuerySearchProfilesRequest{
SearchString: searchString, SearchString: searchString,
Limit: limit, Limit: limit,
} }
userRes := &userapi.QuerySearchProfilesResponse{} userRes := &userapi.QuerySearchProfilesResponse{}
if err := userAPI.QuerySearchProfiles(ctx, userReq, userRes); err != nil { if err := provider.QuerySearchProfiles(ctx, userReq, userRes); err != nil {
errRes := util.ErrorResponse(fmt.Errorf("userAPI.QuerySearchProfiles: %w", err)) errRes := util.ErrorResponse(fmt.Errorf("userAPI.QuerySearchProfiles: %w", err))
return &errRes return &errRes
} }
@ -67,8 +68,16 @@ func SearchUserDirectory(
break break
} }
userID := fmt.Sprintf("@%s:%s", user.Localpart, serverName) var userID string
if user.ServerName != "" {
userID = fmt.Sprintf("@%s:%s", user.Localpart, user.ServerName)
} else {
userID = fmt.Sprintf("@%s:%s", user.Localpart, serverName)
}
if _, ok := results[userID]; !ok { if _, ok := results[userID]; !ok {
if userID == "_server" {
continue
}
results[userID] = authtypes.FullyQualifiedProfile{ results[userID] = authtypes.FullyQualifiedProfile{
UserID: userID, UserID: userID,
DisplayName: user.DisplayName, DisplayName: user.DisplayName,
@ -87,7 +96,7 @@ func SearchUserDirectory(
Limit: limit - len(results), Limit: limit - len(results),
} }
stateRes := &api.QueryKnownUsersResponse{} stateRes := &api.QueryKnownUsersResponse{}
if err := rsAPI.QueryKnownUsers(ctx, stateReq, stateRes); err != nil { if err := rsAPI.QueryKnownUsers(ctx, stateReq, stateRes); err != nil && err != sql.ErrNoRows {
errRes := util.ErrorResponse(fmt.Errorf("rsAPI.QueryKnownUsers: %w", err)) errRes := util.ErrorResponse(fmt.Errorf("rsAPI.QueryKnownUsers: %w", err))
return &errRes return &errRes
} }

View file

@ -35,6 +35,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
@ -198,6 +199,9 @@ func main() {
rsComponent.SetFederationAPI(fsAPI, keyRing) rsComponent.SetFederationAPI(fsAPI, keyRing)
userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation)
roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
@ -205,13 +209,14 @@ func main() {
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,
FederationAPI: fsAPI, FederationAPI: fsAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
UserAPI: userAPI, UserAPI: userAPI,
KeyAPI: keyAPI, KeyAPI: keyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation), ExtPublicRoomsProvider: roomProvider,
ExtUserDirectoryProvider: userProvider,
} }
monolith.AddAllPublicRoutes( monolith.AddAllPublicRoutes(
base.ProcessContext, base.ProcessContext,
@ -256,6 +261,7 @@ func main() {
pHTTP := pQUIC.HTTP() pHTTP := pQUIC.HTTP()
pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux)
pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux)
pHTTP.Mux().HandleFunc(users.PublicURL, userProvider.FederatedUserProfiles)
// Build both ends of a HTTP multiplex. // Build both ends of a HTTP multiplex.
httpServer := &http.Server{ httpServer := &http.Server{

View file

@ -0,0 +1,145 @@
package users
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
type PineconeUserProvider struct {
r *pineconeRouter.Router
s *pineconeSessions.Sessions
userAPI userapi.UserProfileAPI
fedClient *gomatrixserverlib.FederationClient
}
const PublicURL = "/_matrix/p2p/profiles"
func NewPineconeUserProvider(
r *pineconeRouter.Router,
s *pineconeSessions.Sessions,
userAPI userapi.UserProfileAPI,
fedClient *gomatrixserverlib.FederationClient,
) *PineconeUserProvider {
p := &PineconeUserProvider{
r: r,
s: s,
userAPI: userAPI,
fedClient: fedClient,
}
return p
}
func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *http.Request) {
req := &userapi.QuerySearchProfilesRequest{Limit: 25}
res := &userapi.QuerySearchProfilesResponse{}
if err := clienthttputil.UnmarshalJSONRequest(r, &req); err != nil {
w.WriteHeader(400)
return
}
if err := p.userAPI.QuerySearchProfiles(r.Context(), req, res); err != nil {
w.WriteHeader(400)
return
}
j, err := json.Marshal(res)
if err != nil {
w.WriteHeader(400)
return
}
w.WriteHeader(200)
_, _ = w.Write(j)
}
func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error {
list := map[string]struct{}{}
for _, k := range p.r.Peers() {
list[k.PublicKey] = struct{}{}
}
res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list)
return nil
}
// bulkFetchUserDirectoriesFromServers fetches users from the list of homeservers.
// Returns a list of user profiles.
func bulkFetchUserDirectoriesFromServers(
ctx context.Context, req *userapi.QuerySearchProfilesRequest,
fedClient *gomatrixserverlib.FederationClient,
homeservers map[string]struct{},
) (profiles []authtypes.Profile) {
jsonBody, err := json.Marshal(req)
if err != nil {
return nil
}
limit := 200
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
// goroutines send rooms to this channel
profileCh := make(chan authtypes.Profile, int(limit))
// signalling channel to tell goroutines to stop sending rooms and quit
done := make(chan bool)
// signalling to say when we can close the room channel
var wg sync.WaitGroup
wg.Add(len(homeservers))
// concurrently query for public rooms
reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5)
for hs := range homeservers {
go func(homeserverDomain string) {
defer wg.Done()
util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users")
jsonBodyReader := bytes.NewBuffer(jsonBody)
httpReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("matrix://%s%s", homeserverDomain, PublicURL), jsonBodyReader)
if err != nil {
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchUserDirectoriesFromServers: failed to create request",
)
}
res := &userapi.QuerySearchProfilesResponse{}
if err = fedClient.DoRequestAndParseResponse(reqctx, httpReq, res); err != nil {
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchUserDirectoriesFromServers: failed to query hs",
)
return
}
for _, profile := range res.Profiles {
profile.ServerName = homeserverDomain
// atomically send a room or stop
select {
case profileCh <- profile:
case <-done:
case <-reqctx.Done():
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending profiles")
return
}
}
}(hs)
}
select {
case <-time.After(5 * time.Second):
default:
wg.Wait()
}
reqcancel()
close(done)
close(profileCh)
for profile := range profileCh {
profiles = append(profiles, profile)
}
return profiles
}

View file

@ -33,7 +33,7 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI,
federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI,
keyAPI, nil, &cfg.MSCs, keyAPI, nil, &cfg.MSCs,
) )

View file

@ -51,16 +51,21 @@ type Monolith struct {
KeyAPI keyAPI.KeyInternalAPI KeyAPI keyAPI.KeyInternalAPI
// Optional // Optional
ExtPublicRoomsProvider api.ExtraPublicRoomsProvider ExtPublicRoomsProvider api.ExtraPublicRoomsProvider
ExtUserDirectoryProvider userapi.UserDirectoryProvider
} }
// AddAllPublicRoutes attaches all public paths to the given router // AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) { func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) {
userDirectoryProvider := m.ExtUserDirectoryProvider
if userDirectoryProvider == nil {
userDirectoryProvider = m.UserAPI
}
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
process, csMux, synapseMux, &m.Config.ClientAPI, process, csMux, synapseMux, &m.Config.ClientAPI,
m.FedClient, m.RoomserverAPI, m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationAPI, m.UserAPI, m.KeyAPI, m.FederationAPI, m.UserAPI, userDirectoryProvider, m.KeyAPI,
m.ExtPublicRoomsProvider, &m.Config.MSCs, m.ExtPublicRoomsProvider, &m.Config.MSCs,
) )
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(

View file

@ -54,6 +54,10 @@ type UserInternalAPI interface {
QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error
} }
type UserDirectoryProvider interface {
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
}
// UserProfileAPI provides functions for getting user profiles // UserProfileAPI provides functions for getting user profiles
type UserProfileAPI interface { type UserProfileAPI interface {
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error