From ceb3874469eb60bc0ffe816acfddb2b368a48a4f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Mar 2022 10:47:35 +0100 Subject: [PATCH 1/5] Allow stored session parameters to be overwritten in the registration request (#2309) * Allow stored session parameters to be overwritten in the registration request * Remove logging * Close request body * Use `httputil.UnmarshalJSON` as that should enforce UTF-8 correctness * Return `M_NOT_JSON` on read error * Whoops, return the value of `httputil.UnmarshalJSON` * Remove redundant comment --- clientapi/routing/register.go | 51 ++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/clientapi/routing/register.go b/clientapi/routing/register.go index af2e99ed0..7d84f2494 100644 --- a/clientapi/routing/register.go +++ b/clientapi/routing/register.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" + "github.com/tidwall/gjson" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/tokens" @@ -525,22 +526,37 @@ func Register( userAPI userapi.UserRegisterAPI, cfg *config.ClientAPI, ) util.JSONResponse { + defer req.Body.Close() // nolint: errcheck + reqBody, err := ioutil.ReadAll(req.Body) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.NotJSON("Unable to read request body"), + } + } + var r registerRequest - resErr := httputil.UnmarshalJSONRequest(req, &r) - if resErr != nil { + sessionID := gjson.GetBytes(reqBody, "auth.session").String() + if sessionID == "" { + // Generate a new, random session ID + sessionID = util.RandomString(sessionIDLength) + } else if data, ok := sessions.getParams(sessionID); ok { + // Use the parameters from the session as our defaults. + // Some of these might end up being overwritten if the + // values are specified again in the request body. + r.Username = data.Username + r.Password = data.Password + r.DeviceID = data.DeviceID + r.InitialDisplayName = data.InitialDisplayName + r.InhibitLogin = data.InhibitLogin + } + if resErr := httputil.UnmarshalJSON(reqBody, &r); resErr != nil { return *resErr } if req.URL.Query().Get("kind") == "guest" { return handleGuestRegistration(req, r, cfg, userAPI) } - // Retrieve or generate the sessionID - sessionID := r.Auth.Session - if sessionID == "" { - // Generate a new, random session ID - sessionID = util.RandomString(sessionIDLength) - } - // Don't allow numeric usernames less than MAX_INT64. if _, err := strconv.ParseInt(r.Username, 10, 64); err == nil { return util.JSONResponse{ @@ -568,7 +584,7 @@ func Register( case r.Type == authtypes.LoginTypeApplicationService && accessTokenErr == nil: // Spec-compliant case (the access_token is specified and the login type // is correctly set, so it's an appservice registration) - if resErr = validateApplicationServiceUsername(r.Username); resErr != nil { + if resErr := validateApplicationServiceUsername(r.Username); resErr != nil { return *resErr } case accessTokenErr == nil: @@ -581,11 +597,11 @@ func Register( default: // Spec-compliant case (neither the access_token nor the login type are // specified, so it's a normal user registration) - if resErr = validateUsername(r.Username); resErr != nil { + if resErr := validateUsername(r.Username); resErr != nil { return *resErr } } - if resErr = validatePassword(r.Password); resErr != nil { + if resErr := validatePassword(r.Password); resErr != nil { return *resErr } @@ -835,24 +851,17 @@ func completeRegistration( } }() - if data, ok := sessions.getParams(sessionID); ok { - username = data.Username - password = data.Password - deviceID = data.DeviceID - displayName = data.InitialDisplayName - inhibitLogin = data.InhibitLogin - } if username == "" { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON("missing username"), + JSON: jsonerror.MissingArgument("Missing username"), } } // Blank passwords are only allowed by registered application services if password == "" && appserviceID == "" { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON("missing password"), + JSON: jsonerror.MissingArgument("Missing password"), } } var accRes userapi.PerformAccountCreationResponse From 34b9c8c67000bd27db73c987e629c9df0d2c0f28 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Mar 2022 10:55:53 +0100 Subject: [PATCH 2/5] Ensure Dendrite has stopped in Pinecone demo `Stop()` --- build/gobind-pinecone/monolith.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 865457010..ea75d5275 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -401,11 +401,12 @@ func (m *DendriteMonolith) Start() { } func (m *DendriteMonolith) Stop() { + m.processContext.ShutdownDendrite() _ = m.listener.Close() m.PineconeMulticast.Stop() _ = m.PineconeQUIC.Close() - m.processContext.ShutdownDendrite() _ = m.PineconeRouter.Close() + m.processContext.WaitForComponentsToFinish() } const MaxFrameSize = types.MaxFrameSize From 8099bcbc8b19298dc3f392571e53fe86839d6277 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Mar 2022 11:44:25 +0100 Subject: [PATCH 3/5] P2P demo tweaks --- build/gobind-pinecone/monolith.go | 1 + build/gobind-yggdrasil/monolith.go | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index ea75d5275..8ef4bbc71 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -281,6 +281,7 @@ func (m *DendriteMonolith) Start() { cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) cfg.Global.PrivateKey = sk cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) + cfg.Global.JetStream.InMemory = true cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/%s", m.StorageDirectory, prefix)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-account.db", m.StorageDirectory, prefix)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-mediaapi.db", m.StorageDirectory)) diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 3329485aa..a2c9bcff6 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -36,6 +37,7 @@ type DendriteMonolith struct { StorageDirectory string listener net.Listener httpServer *http.Server + processContext *process.ProcessContext } func (m *DendriteMonolith) BaseURL() string { @@ -87,6 +89,7 @@ func (m *DendriteMonolith) Start() { cfg.Global.PrivateKey = ygg.PrivateKey() cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", m.StorageDirectory)) + cfg.Global.JetStream.InMemory = true cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-account.db", m.StorageDirectory)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-mediaapi.db", m.StorageDirectory)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-syncapi.db", m.StorageDirectory)) @@ -101,6 +104,7 @@ func (m *DendriteMonolith) Start() { } base := base.NewBaseDendrite(cfg, "Monolith") + m.processContext = base.ProcessContext defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() @@ -197,9 +201,12 @@ func (m *DendriteMonolith) Start() { }() } -func (m *DendriteMonolith) Suspend() { - m.logger.Info("Suspending monolith") +func (m *DendriteMonolith) Stop() { if err := m.httpServer.Close(); err != nil { m.logger.Warn("Error stopping HTTP server:", err) } + if m.processContext != nil { + m.processContext.ShutdownDendrite() + m.processContext.WaitForComponentsToFinish() + } } From 0692be44d91a42945dde0eab3e2f7481cc2e0896 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Mar 2022 13:31:17 +0100 Subject: [PATCH 4/5] Fix account availability on register --- userapi/internal/api.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/userapi/internal/api.go b/userapi/internal/api.go index afe57da25..206c6f7de 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -776,12 +776,8 @@ func (a *UserInternalAPI) QueryNumericLocalpart(ctx context.Context, res *api.Qu } func (a *UserInternalAPI) QueryAccountAvailability(ctx context.Context, req *api.QueryAccountAvailabilityRequest, res *api.QueryAccountAvailabilityResponse) error { - _, err := a.DB.CheckAccountAvailability(ctx, req.Localpart) - if err == sql.ErrNoRows { - res.Available = true - return nil - } - res.Available = false + var err error + res.Available, err = a.DB.CheckAccountAvailability(ctx, req.Localpart) return err } From 7972915806348847ecd9a9b8a1b1ff0609cb883c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Mar 2022 16:25:26 +0100 Subject: [PATCH 5/5] User directory for nearby Pinecone peers (P2P demo) (#2311) * User directory for nearby Pinecone peers * Fix mux routing * Use config to determine which server notices user to exclude --- build/gobind-pinecone/monolith.go | 21 ++- clientapi/auth/authtypes/profile.go | 1 + clientapi/clientapi.go | 3 +- clientapi/routing/routing.go | 2 + clientapi/routing/userdirectory.go | 14 +- cmd/dendrite-demo-pinecone/main.go | 21 ++- cmd/dendrite-demo-pinecone/users/users.go | 145 ++++++++++++++++++ .../personalities/clientapi.go | 2 +- setup/base/base.go | 1 + setup/monolith.go | 9 +- userapi/api/api.go | 4 + userapi/storage/postgres/profile_table.go | 11 +- userapi/storage/postgres/storage.go | 4 +- userapi/storage/sqlite3/profile_table.go | 10 +- userapi/storage/sqlite3/storage.go | 4 +- userapi/storage/storage.go | 6 +- userapi/storage/storage_wasm.go | 3 +- userapi/userapi_test.go | 2 +- 18 files changed, 226 insertions(+), 37 deletions(-) create mode 100644 cmd/dendrite-demo-pinecone/users/users.go diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 8ef4bbc71..346c5d1ec 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -326,6 +327,9 @@ func (m *DendriteMonolith) Start() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsAPI.SetFederationAPI(fsAPI, keyRing) + userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -333,13 +337,14 @@ func (m *DendriteMonolith) Start() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: m.userAPI, - KeyAPI: keyAPI, - ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation), + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: m.userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: roomProvider, + ExtUserDirectoryProvider: userProvider, } monolith.AddAllPublicRoutes( base.ProcessContext, @@ -357,10 +362,12 @@ func (m *DendriteMonolith) Start() { httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() + pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) pHTTP := m.PineconeQUIC.HTTP() + pHTTP.Mux().Handle(users.PublicURL, pMux) pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) diff --git a/clientapi/auth/authtypes/profile.go b/clientapi/auth/authtypes/profile.go index 902850bc0..29468c168 100644 --- a/clientapi/auth/authtypes/profile.go +++ b/clientapi/auth/authtypes/profile.go @@ -17,6 +17,7 @@ package authtypes // Profile represents the profile for a Matrix account. type Profile struct { Localpart string `json:"local_part"` + ServerName string `json:"server_name,omitempty"` // NOTSPEC: only set by Pinecone user provider DisplayName string `json:"display_name"` AvatarURL string `json:"avatar_url"` } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 31a53a706..d0ef368d1 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -45,6 +45,7 @@ func AddPublicRoutes( transactionsCache *transactions.Cache, fsAPI federationAPI.FederationInternalAPI, userAPI userapi.UserInternalAPI, + userDirectoryProvider userapi.UserDirectoryProvider, keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, @@ -58,7 +59,7 @@ func AddPublicRoutes( routing.Setup( router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI, - userAPI, federation, + userAPI, userDirectoryProvider, federation, syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider, mscCfg, ) diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index e173831ab..218087bb3 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -51,6 +51,7 @@ func Setup( rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, userAPI userapi.UserInternalAPI, + userDirectoryProvider userapi.UserDirectoryProvider, federation *gomatrixserverlib.FederationClient, syncProducer *producers.SyncAPIProducer, transactionsCache *transactions.Cache, @@ -904,6 +905,7 @@ func Setup( device, userAPI, rsAPI, + userDirectoryProvider, cfg.Matrix.ServerName, postContent.SearchString, postContent.Limit, diff --git a/clientapi/routing/userdirectory.go b/clientapi/routing/userdirectory.go index 2659bc9cc..ab73cf430 100644 --- a/clientapi/routing/userdirectory.go +++ b/clientapi/routing/userdirectory.go @@ -16,6 +16,7 @@ package routing import ( "context" + "database/sql" "fmt" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -35,6 +36,7 @@ func SearchUserDirectory( device *userapi.Device, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, + provider userapi.UserDirectoryProvider, serverName gomatrixserverlib.ServerName, searchString string, limit int, @@ -50,13 +52,12 @@ func SearchUserDirectory( } // First start searching local users. - userReq := &userapi.QuerySearchProfilesRequest{ SearchString: searchString, Limit: limit, } userRes := &userapi.QuerySearchProfilesResponse{} - if err := userAPI.QuerySearchProfiles(ctx, userReq, userRes); err != nil { + if err := provider.QuerySearchProfiles(ctx, userReq, userRes); err != nil { errRes := util.ErrorResponse(fmt.Errorf("userAPI.QuerySearchProfiles: %w", err)) return &errRes } @@ -67,7 +68,12 @@ func SearchUserDirectory( break } - userID := fmt.Sprintf("@%s:%s", user.Localpart, serverName) + var userID string + if user.ServerName != "" { + userID = fmt.Sprintf("@%s:%s", user.Localpart, user.ServerName) + } else { + userID = fmt.Sprintf("@%s:%s", user.Localpart, serverName) + } if _, ok := results[userID]; !ok { results[userID] = authtypes.FullyQualifiedProfile{ UserID: userID, @@ -87,7 +93,7 @@ func SearchUserDirectory( Limit: limit - len(results), } stateRes := &api.QueryKnownUsersResponse{} - if err := rsAPI.QueryKnownUsers(ctx, stateReq, stateRes); err != nil { + if err := rsAPI.QueryKnownUsers(ctx, stateReq, stateRes); err != nil && err != sql.ErrNoRows { errRes := util.ErrorResponse(fmt.Errorf("rsAPI.QueryKnownUsers: %w", err)) return &errRes } diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 45f186985..87054dc82 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -35,6 +35,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -198,6 +199,9 @@ func main() { rsComponent.SetFederationAPI(fsAPI, keyRing) + userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -205,13 +209,14 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, - ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation), + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: roomProvider, + ExtUserDirectoryProvider: userProvider, } monolith.AddAllPublicRoutes( base.ProcessContext, @@ -250,10 +255,12 @@ func main() { embed.Embed(httpRouter, *instancePort, "Pinecone Demo") pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() + pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) pHTTP := pQUIC.HTTP() + pHTTP.Mux().Handle(users.PublicURL, pMux) pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) diff --git a/cmd/dendrite-demo-pinecone/users/users.go b/cmd/dendrite-demo-pinecone/users/users.go new file mode 100644 index 000000000..ffbd27ee9 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/users/users.go @@ -0,0 +1,145 @@ +package users + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +type PineconeUserProvider struct { + r *pineconeRouter.Router + s *pineconeSessions.Sessions + userAPI userapi.UserProfileAPI + fedClient *gomatrixserverlib.FederationClient +} + +const PublicURL = "/_matrix/p2p/profiles" + +func NewPineconeUserProvider( + r *pineconeRouter.Router, + s *pineconeSessions.Sessions, + userAPI userapi.UserProfileAPI, + fedClient *gomatrixserverlib.FederationClient, +) *PineconeUserProvider { + p := &PineconeUserProvider{ + r: r, + s: s, + userAPI: userAPI, + fedClient: fedClient, + } + return p +} + +func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *http.Request) { + req := &userapi.QuerySearchProfilesRequest{Limit: 25} + res := &userapi.QuerySearchProfilesResponse{} + if err := clienthttputil.UnmarshalJSONRequest(r, &req); err != nil { + w.WriteHeader(400) + return + } + if err := p.userAPI.QuerySearchProfiles(r.Context(), req, res); err != nil { + w.WriteHeader(400) + return + } + j, err := json.Marshal(res) + if err != nil { + w.WriteHeader(400) + return + } + w.WriteHeader(200) + _, _ = w.Write(j) +} + +func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error { + list := map[string]struct{}{} + for _, k := range p.r.Peers() { + list[k.PublicKey] = struct{}{} + } + res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list) + return nil +} + +// bulkFetchUserDirectoriesFromServers fetches users from the list of homeservers. +// Returns a list of user profiles. +func bulkFetchUserDirectoriesFromServers( + ctx context.Context, req *userapi.QuerySearchProfilesRequest, + fedClient *gomatrixserverlib.FederationClient, + homeservers map[string]struct{}, +) (profiles []authtypes.Profile) { + jsonBody, err := json.Marshal(req) + if err != nil { + return nil + } + + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + profileCh := make(chan authtypes.Profile, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5) + for hs := range homeservers { + go func(homeserverDomain string) { + defer wg.Done() + util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users") + + jsonBodyReader := bytes.NewBuffer(jsonBody) + httpReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("matrix://%s%s", homeserverDomain, PublicURL), jsonBodyReader) + if err != nil { + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchUserDirectoriesFromServers: failed to create request", + ) + } + res := &userapi.QuerySearchProfilesResponse{} + if err = fedClient.DoRequestAndParseResponse(reqctx, httpReq, res); err != nil { + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchUserDirectoriesFromServers: failed to query hs", + ) + return + } + for _, profile := range res.Profiles { + profile.ServerName = homeserverDomain + // atomically send a room or stop + select { + case profileCh <- profile: + case <-done: + case <-reqctx.Done(): + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending profiles") + return + } + } + }(hs) + } + + select { + case <-time.After(5 * time.Second): + default: + wg.Wait() + } + reqcancel() + close(done) + close(profileCh) + + for profile := range profileCh { + profiles = append(profiles, profile) + } + + return profiles +} diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index a2036de35..978d8b0a4 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -33,7 +33,7 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { clientapi.AddPublicRoutes( base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, - federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, + federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI, keyAPI, nil, &cfg.MSCs, ) diff --git a/setup/base/base.go b/setup/base/base.go index c3ed05416..6135e080e 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -289,6 +289,7 @@ func (b *BaseDendrite) CreateAccountsDB() userdb.Database { b.Cfg.UserAPI.BCryptCost, b.Cfg.UserAPI.OpenIDTokenLifetimeMS, userapi.DefaultLoginTokenLifetime, + b.Cfg.Global.ServerNotices.LocalPart, ) if err != nil { logrus.WithError(err).Panicf("failed to connect to accounts db") diff --git a/setup/monolith.go b/setup/monolith.go index 8e6240d37..cf6872f9c 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -51,16 +51,21 @@ type Monolith struct { KeyAPI keyAPI.KeyInternalAPI // Optional - ExtPublicRoomsProvider api.ExtraPublicRoomsProvider + ExtPublicRoomsProvider api.ExtraPublicRoomsProvider + ExtUserDirectoryProvider userapi.UserDirectoryProvider } // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) { + userDirectoryProvider := m.ExtUserDirectoryProvider + if userDirectoryProvider == nil { + userDirectoryProvider = m.UserAPI + } clientapi.AddPublicRoutes( process, csMux, synapseMux, &m.Config.ClientAPI, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), - m.FederationAPI, m.UserAPI, m.KeyAPI, + m.FederationAPI, m.UserAPI, userDirectoryProvider, m.KeyAPI, m.ExtPublicRoomsProvider, &m.Config.MSCs, ) federationapi.AddPublicRoutes( diff --git a/userapi/api/api.go b/userapi/api/api.go index 513a060c4..a9544f00d 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -54,6 +54,10 @@ type UserInternalAPI interface { QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error } +type UserDirectoryProvider interface { + QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error +} + // UserProfileAPI provides functions for getting user profiles type UserProfileAPI interface { QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error diff --git a/userapi/storage/postgres/profile_table.go b/userapi/storage/postgres/profile_table.go index 32a4b5506..6d336eb8e 100644 --- a/userapi/storage/postgres/profile_table.go +++ b/userapi/storage/postgres/profile_table.go @@ -53,6 +53,7 @@ const selectProfilesBySearchSQL = "" + "SELECT localpart, display_name, avatar_url FROM account_profiles WHERE localpart LIKE $1 OR display_name LIKE $1 LIMIT $2" type profilesStatements struct { + serverNoticesLocalpart string insertProfileStmt *sql.Stmt selectProfileByLocalpartStmt *sql.Stmt setAvatarURLStmt *sql.Stmt @@ -60,8 +61,10 @@ type profilesStatements struct { selectProfilesBySearchStmt *sql.Stmt } -func NewPostgresProfilesTable(db *sql.DB) (tables.ProfileTable, error) { - s := &profilesStatements{} +func NewPostgresProfilesTable(db *sql.DB, serverNoticesLocalpart string) (tables.ProfileTable, error) { + s := &profilesStatements{ + serverNoticesLocalpart: serverNoticesLocalpart, + } _, err := db.Exec(profilesSchema) if err != nil { return nil, err @@ -126,7 +129,9 @@ func (s *profilesStatements) SelectProfilesBySearch( if err := rows.Scan(&profile.Localpart, &profile.DisplayName, &profile.AvatarURL); err != nil { return nil, err } - profiles = append(profiles, profile) + if profile.Localpart != s.serverNoticesLocalpart { + profiles = append(profiles, profile) + } } return profiles, nil } diff --git a/userapi/storage/postgres/storage.go b/userapi/storage/postgres/storage.go index c74a999f4..b2a517605 100644 --- a/userapi/storage/postgres/storage.go +++ b/userapi/storage/postgres/storage.go @@ -30,7 +30,7 @@ import ( ) // NewDatabase creates a new accounts and profiles database -func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration) (*shared.Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { db, err := sqlutil.Open(dbProperties) if err != nil { return nil, err @@ -77,7 +77,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err != nil { return nil, fmt.Errorf("NewPostgresOpenIDTable: %w", err) } - profilesTable, err := NewPostgresProfilesTable(db) + profilesTable, err := NewPostgresProfilesTable(db, serverNoticesLocalpart) if err != nil { return nil, fmt.Errorf("NewPostgresProfilesTable: %w", err) } diff --git a/userapi/storage/sqlite3/profile_table.go b/userapi/storage/sqlite3/profile_table.go index d85b19c7b..3050ff4b5 100644 --- a/userapi/storage/sqlite3/profile_table.go +++ b/userapi/storage/sqlite3/profile_table.go @@ -54,6 +54,7 @@ const selectProfilesBySearchSQL = "" + type profilesStatements struct { db *sql.DB + serverNoticesLocalpart string insertProfileStmt *sql.Stmt selectProfileByLocalpartStmt *sql.Stmt setAvatarURLStmt *sql.Stmt @@ -61,9 +62,10 @@ type profilesStatements struct { selectProfilesBySearchStmt *sql.Stmt } -func NewSQLiteProfilesTable(db *sql.DB) (tables.ProfileTable, error) { +func NewSQLiteProfilesTable(db *sql.DB, serverNoticesLocalpart string) (tables.ProfileTable, error) { s := &profilesStatements{ - db: db, + db: db, + serverNoticesLocalpart: serverNoticesLocalpart, } _, err := db.Exec(profilesSchema) if err != nil { @@ -131,7 +133,9 @@ func (s *profilesStatements) SelectProfilesBySearch( if err := rows.Scan(&profile.Localpart, &profile.DisplayName, &profile.AvatarURL); err != nil { return nil, err } - profiles = append(profiles, profile) + if profile.Localpart != s.serverNoticesLocalpart { + profiles = append(profiles, profile) + } } return profiles, nil } diff --git a/userapi/storage/sqlite3/storage.go b/userapi/storage/sqlite3/storage.go index b5bb96c42..03c013f00 100644 --- a/userapi/storage/sqlite3/storage.go +++ b/userapi/storage/sqlite3/storage.go @@ -31,7 +31,7 @@ import ( ) // NewDatabase creates a new accounts and profiles database -func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration) (*shared.Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { db, err := sqlutil.Open(dbProperties) if err != nil { return nil, err @@ -78,7 +78,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err != nil { return nil, fmt.Errorf("NewSQLiteOpenIDTable: %w", err) } - profilesTable, err := NewSQLiteProfilesTable(db) + profilesTable, err := NewSQLiteProfilesTable(db, serverNoticesLocalpart) if err != nil { return nil, fmt.Errorf("NewSQLiteProfilesTable: %w", err) } diff --git a/userapi/storage/storage.go b/userapi/storage/storage.go index 4711439af..f372fe7dc 100644 --- a/userapi/storage/storage.go +++ b/userapi/storage/storage.go @@ -30,12 +30,12 @@ import ( // NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme) // and sets postgres connection parameters -func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration) (Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime) + return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime) + return postgres.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/userapi/storage/storage_wasm.go b/userapi/storage/storage_wasm.go index 701dcd833..779f77568 100644 --- a/userapi/storage/storage_wasm.go +++ b/userapi/storage/storage_wasm.go @@ -29,10 +29,11 @@ func NewDatabase( bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, + serverNoticesLocalpart string, ) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime) + return sqlite3.NewDatabase(dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/userapi/userapi_test.go b/userapi/userapi_test.go index 25319c4bf..8c3608bd8 100644 --- a/userapi/userapi_test.go +++ b/userapi/userapi_test.go @@ -52,7 +52,7 @@ func MustMakeInternalAPI(t *testing.T, opts apiTestOpts) (api.UserInternalAPI, s MaxOpenConnections: 1, MaxIdleConnections: 1, } - accountDB, err := storage.NewDatabase(dbopts, serverName, bcrypt.MinCost, config.DefaultOpenIDTokenLifetimeMS, opts.loginTokenLifetime) + accountDB, err := storage.NewDatabase(dbopts, serverName, bcrypt.MinCost, config.DefaultOpenIDTokenLifetimeMS, opts.loginTokenLifetime, "") if err != nil { t.Fatalf("failed to create account DB: %s", err) }