diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 8ef4bbc71..a7773759a 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -326,6 +327,9 @@ func (m *DendriteMonolith) Start() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsAPI.SetFederationAPI(fsAPI, keyRing) + userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -333,13 +337,14 @@ func (m *DendriteMonolith) Start() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: m.userAPI, - KeyAPI: keyAPI, - ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation), + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: m.userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: roomProvider, + ExtUserDirectoryProvider: userProvider, } monolith.AddAllPublicRoutes( base.ProcessContext, @@ -363,6 +368,7 @@ func (m *DendriteMonolith) Start() { pHTTP := m.PineconeQUIC.HTTP() pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) + pHTTP.Mux().HandleFunc(users.PublicURL, userProvider.FederatedUserProfiles) // Build both ends of a HTTP multiplex. h2s := &http2.Server{} diff --git a/clientapi/auth/authtypes/profile.go b/clientapi/auth/authtypes/profile.go index 902850bc0..29468c168 100644 --- a/clientapi/auth/authtypes/profile.go +++ b/clientapi/auth/authtypes/profile.go @@ -17,6 +17,7 @@ package authtypes // Profile represents the profile for a Matrix account. type Profile struct { Localpart string `json:"local_part"` + ServerName string `json:"server_name,omitempty"` // NOTSPEC: only set by Pinecone user provider DisplayName string `json:"display_name"` AvatarURL string `json:"avatar_url"` } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 31a53a706..d0ef368d1 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -45,6 +45,7 @@ func AddPublicRoutes( transactionsCache *transactions.Cache, fsAPI federationAPI.FederationInternalAPI, userAPI userapi.UserInternalAPI, + userDirectoryProvider userapi.UserDirectoryProvider, keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, @@ -58,7 +59,7 @@ func AddPublicRoutes( routing.Setup( router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI, - userAPI, federation, + userAPI, userDirectoryProvider, federation, syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg, ) diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index e173831ab..218087bb3 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -51,6 +51,7 @@ func Setup( rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, userAPI userapi.UserInternalAPI, + userDirectoryProvider userapi.UserDirectoryProvider, federation *gomatrixserverlib.FederationClient, syncProducer *producers.SyncAPIProducer, transactionsCache *transactions.Cache, @@ -904,6 +905,7 @@ func Setup( device, userAPI, rsAPI, + userDirectoryProvider, cfg.Matrix.ServerName, postContent.SearchString, postContent.Limit, diff --git a/clientapi/routing/userdirectory.go b/clientapi/routing/userdirectory.go index 2659bc9cc..62e405dd8 100644 --- a/clientapi/routing/userdirectory.go +++ b/clientapi/routing/userdirectory.go @@ -16,6 +16,7 @@ package routing import ( "context" + "database/sql" "fmt" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -35,6 +36,7 @@ func SearchUserDirectory( device *userapi.Device, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, + provider userapi.UserDirectoryProvider, serverName gomatrixserverlib.ServerName, searchString string, limit int, @@ -50,13 +52,12 @@ func SearchUserDirectory( } // First start searching local users. - userReq := &userapi.QuerySearchProfilesRequest{ SearchString: searchString, Limit: limit, } userRes := &userapi.QuerySearchProfilesResponse{} - if err := userAPI.QuerySearchProfiles(ctx, userReq, userRes); err != nil { + if err := provider.QuerySearchProfiles(ctx, userReq, userRes); err != nil { errRes := util.ErrorResponse(fmt.Errorf("userAPI.QuerySearchProfiles: %w", err)) return &errRes } @@ -67,8 +68,16 @@ func SearchUserDirectory( break } - userID := fmt.Sprintf("@%s:%s", user.Localpart, serverName) + var userID string + if user.ServerName != "" { + userID = fmt.Sprintf("@%s:%s", user.Localpart, user.ServerName) + } else { + userID = fmt.Sprintf("@%s:%s", user.Localpart, serverName) + } if _, ok := results[userID]; !ok { + if userID == "_server" { + continue + } results[userID] = authtypes.FullyQualifiedProfile{ UserID: userID, DisplayName: user.DisplayName, @@ -87,7 +96,7 @@ func SearchUserDirectory( Limit: limit - len(results), } stateRes := &api.QueryKnownUsersResponse{} - if err := rsAPI.QueryKnownUsers(ctx, stateReq, stateRes); err != nil { + if err := rsAPI.QueryKnownUsers(ctx, stateReq, stateRes); err != nil && err != sql.ErrNoRows { errRes := util.ErrorResponse(fmt.Errorf("rsAPI.QueryKnownUsers: %w", err)) return &errRes } diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 45f186985..fab121086 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -35,6 +35,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -198,6 +199,9 @@ func main() { rsComponent.SetFederationAPI(fsAPI, keyRing) + userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -205,13 +209,14 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, - ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation), + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: roomProvider, + ExtUserDirectoryProvider: userProvider, } monolith.AddAllPublicRoutes( base.ProcessContext, @@ -256,6 +261,7 @@ func main() { pHTTP := pQUIC.HTTP() pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) + pHTTP.Mux().HandleFunc(users.PublicURL, userProvider.FederatedUserProfiles) // Build both ends of a HTTP multiplex. httpServer := &http.Server{ diff --git a/cmd/dendrite-demo-pinecone/users/users.go b/cmd/dendrite-demo-pinecone/users/users.go new file mode 100644 index 000000000..ffbd27ee9 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/users/users.go @@ -0,0 +1,145 @@ +package users + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +type PineconeUserProvider struct { + r *pineconeRouter.Router + s *pineconeSessions.Sessions + userAPI userapi.UserProfileAPI + fedClient *gomatrixserverlib.FederationClient +} + +const PublicURL = "/_matrix/p2p/profiles" + +func NewPineconeUserProvider( + r *pineconeRouter.Router, + s *pineconeSessions.Sessions, + userAPI userapi.UserProfileAPI, + fedClient *gomatrixserverlib.FederationClient, +) *PineconeUserProvider { + p := &PineconeUserProvider{ + r: r, + s: s, + userAPI: userAPI, + fedClient: fedClient, + } + return p +} + +func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *http.Request) { + req := &userapi.QuerySearchProfilesRequest{Limit: 25} + res := &userapi.QuerySearchProfilesResponse{} + if err := clienthttputil.UnmarshalJSONRequest(r, &req); err != nil { + w.WriteHeader(400) + return + } + if err := p.userAPI.QuerySearchProfiles(r.Context(), req, res); err != nil { + w.WriteHeader(400) + return + } + j, err := json.Marshal(res) + if err != nil { + w.WriteHeader(400) + return + } + w.WriteHeader(200) + _, _ = w.Write(j) +} + +func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error { + list := map[string]struct{}{} + for _, k := range p.r.Peers() { + list[k.PublicKey] = struct{}{} + } + res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list) + return nil +} + +// bulkFetchUserDirectoriesFromServers fetches users from the list of homeservers. +// Returns a list of user profiles. +func bulkFetchUserDirectoriesFromServers( + ctx context.Context, req *userapi.QuerySearchProfilesRequest, + fedClient *gomatrixserverlib.FederationClient, + homeservers map[string]struct{}, +) (profiles []authtypes.Profile) { + jsonBody, err := json.Marshal(req) + if err != nil { + return nil + } + + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + profileCh := make(chan authtypes.Profile, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5) + for hs := range homeservers { + go func(homeserverDomain string) { + defer wg.Done() + util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users") + + jsonBodyReader := bytes.NewBuffer(jsonBody) + httpReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("matrix://%s%s", homeserverDomain, PublicURL), jsonBodyReader) + if err != nil { + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchUserDirectoriesFromServers: failed to create request", + ) + } + res := &userapi.QuerySearchProfilesResponse{} + if err = fedClient.DoRequestAndParseResponse(reqctx, httpReq, res); err != nil { + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchUserDirectoriesFromServers: failed to query hs", + ) + return + } + for _, profile := range res.Profiles { + profile.ServerName = homeserverDomain + // atomically send a room or stop + select { + case profileCh <- profile: + case <-done: + case <-reqctx.Done(): + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending profiles") + return + } + } + }(hs) + } + + select { + case <-time.After(5 * time.Second): + default: + wg.Wait() + } + reqcancel() + close(done) + close(profileCh) + + for profile := range profileCh { + profiles = append(profiles, profile) + } + + return profiles +} diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index a2036de35..978d8b0a4 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -33,7 +33,7 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { clientapi.AddPublicRoutes( base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, - federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, + federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI, keyAPI, nil, &cfg.MSCs, ) diff --git a/setup/monolith.go b/setup/monolith.go index 8e6240d37..cf6872f9c 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -51,16 +51,21 @@ type Monolith struct { KeyAPI keyAPI.KeyInternalAPI // Optional - ExtPublicRoomsProvider api.ExtraPublicRoomsProvider + ExtPublicRoomsProvider api.ExtraPublicRoomsProvider + ExtUserDirectoryProvider userapi.UserDirectoryProvider } // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) { + userDirectoryProvider := m.ExtUserDirectoryProvider + if userDirectoryProvider == nil { + userDirectoryProvider = m.UserAPI + } clientapi.AddPublicRoutes( process, csMux, synapseMux, &m.Config.ClientAPI, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), - m.FederationAPI, m.UserAPI, m.KeyAPI, + m.FederationAPI, m.UserAPI, userDirectoryProvider, m.KeyAPI, m.ExtPublicRoomsProvider, &m.Config.MSCs, ) federationapi.AddPublicRoutes( diff --git a/userapi/api/api.go b/userapi/api/api.go index 513a060c4..a9544f00d 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -54,6 +54,10 @@ type UserInternalAPI interface { QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error } +type UserDirectoryProvider interface { + QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error +} + // UserProfileAPI provides functions for getting user profiles type UserProfileAPI interface { QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error