From f5d8f0e72fbec0cdc5f61e6fed098f3d71c6f4d7 Mon Sep 17 00:00:00 2001 From: Tommie Gannert Date: Tue, 19 Oct 2021 11:46:26 +0200 Subject: [PATCH] Add push rules query/put API in Pushserver. This manipulates account data over User API, and fires sync messages for changes. Those sync messages should, according to an existing TODO in clientapi, be moved to userapi. Forks clientapi/producers/syncapi.go to pushserver/ for later extension. --- cmd/dendrite-monolith-server/main.go | 2 +- .../personalities/pushserver.go | 2 +- pushserver/api/api.go | 17 +++++ pushserver/internal/api.go | 72 +++++++++++++++++-- pushserver/inthttp/client.go | 20 ++++++ pushserver/inthttp/server.go | 41 +++++++++++ pushserver/producers/syncapi.go | 41 +++++++++++ pushserver/pushserver.go | 16 ++++- 8 files changed, 203 insertions(+), 8 deletions(-) create mode 100644 pushserver/producers/syncapi.go diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index af6cb1f6e..89168301c 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -148,7 +148,7 @@ func main() { eduInputAPI = base.EDUServerClient() } - psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI) + psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI, userAPI) if base.UseHTTPAPIs { pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI) psAPI = base.PushServerHTTPClient() diff --git a/cmd/dendrite-polylith-multi/personalities/pushserver.go b/cmd/dendrite-polylith-multi/personalities/pushserver.go index 97b6e19f1..414450af3 100644 --- a/cmd/dendrite-polylith-multi/personalities/pushserver.go +++ b/cmd/dendrite-polylith-multi/personalities/pushserver.go @@ -22,7 +22,7 @@ import ( ) func PushServer(base *basepkg.BaseDendrite, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI) { - intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI) + intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI, base.UserAPIClient()) pushserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/pushserver/api/api.go b/pushserver/api/api.go index b4aa8235d..67fa582c0 100644 --- a/pushserver/api/api.go +++ b/pushserver/api/api.go @@ -3,6 +3,7 @@ package api import ( "context" + "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/gomatrixserverlib" ) @@ -10,6 +11,9 @@ type PushserverInternalAPI interface { PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error PerformPusherDeletion(ctx context.Context, req *PerformPusherDeletionRequest, res *struct{}) error QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error + + PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error + QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error } type QueryPushersRequest struct { @@ -51,3 +55,16 @@ const ( EmailKind PusherKind = "email" HTTPKind PusherKind = "http" ) + +type PerformPushRulesPutRequest struct { + UserID string `json:"user_id"` + RuleSets *pushrules.AccountRuleSets `json:"rule_sets"` +} + +type QueryPushRulesRequest struct { + UserID string `json:"user_id"` +} + +type QueryPushRulesResponse struct { + RuleSets *pushrules.AccountRuleSets `json:"rule_sets"` +} diff --git a/pushserver/internal/api.go b/pushserver/internal/api.go index 183d2a54e..2362e3877 100644 --- a/pushserver/internal/api.go +++ b/pushserver/internal/api.go @@ -2,11 +2,16 @@ package internal import ( "context" + "encoding/json" + "fmt" "time" + "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/setup/config" + uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/sirupsen/logrus" @@ -14,16 +19,20 @@ import ( // PushserverInternalAPI implements api.PushserverInternalAPI type PushserverInternalAPI struct { - Cfg *config.PushServer - DB storage.Database + Cfg *config.PushServer + DB storage.Database + userAPI uapi.UserInternalAPI + syncProducer *producers.SyncAPIProducer } func NewPushserverAPI( - cfg *config.PushServer, pushserverDB storage.Database, + cfg *config.PushServer, pushserverDB storage.Database, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPIProducer, ) *PushserverInternalAPI { a := &PushserverInternalAPI{ - Cfg: cfg, - DB: pushserverDB, + Cfg: cfg, + DB: pushserverDB, + userAPI: userAPI, + syncProducer: syncProducer, } return a } @@ -71,3 +80,56 @@ func (a *PushserverInternalAPI) QueryPushers(ctx context.Context, req *api.Query res.Pushers, err = a.DB.GetPushers(ctx, req.Localpart) return err } + +func (a *PushserverInternalAPI) PerformPushRulesPut( + ctx context.Context, + req *api.PerformPushRulesPutRequest, + _ *struct{}, +) error { + bs, err := json.Marshal(&req.RuleSets) + if err != nil { + return err + } + userReq := uapi.InputAccountDataRequest{ + UserID: req.UserID, + DataType: pushRulesAccountDataType, + AccountData: json.RawMessage(bs), + } + var userRes uapi.InputAccountDataResponse // empty + if err := a.userAPI.InputAccountData(ctx, &userReq, &userRes); err != nil { + return err + } + + if err := a.syncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil { + util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed") + } + + return nil +} + +func (a *PushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error { + userReq := uapi.QueryAccountDataRequest{ + UserID: req.UserID, + DataType: pushRulesAccountDataType, + } + var userRes uapi.QueryAccountDataResponse + if err := a.userAPI.QueryAccountData(ctx, &userReq, &userRes); err != nil { + return err + } + bs, ok := userRes.GlobalAccountData[pushRulesAccountDataType] + if !ok { + // TODO: should this return the default rules? The default + // rules are written to accounts DB on account creation, so + // this error is unexpected. + return fmt.Errorf("push rules account data not found") + } + var data pushrules.AccountRuleSets + if err := json.Unmarshal([]byte(bs), &data); err != nil { + util.GetLogger(ctx).WithError(err).Error("json.Unmarshal of push rules failed") + return err + } + res.RuleSets = &data + return nil +} + +const pushRulesAccountDataType = "m.push_rules" diff --git a/pushserver/inthttp/client.go b/pushserver/inthttp/client.go index 47e05b100..1de62fbe8 100644 --- a/pushserver/inthttp/client.go +++ b/pushserver/inthttp/client.go @@ -68,3 +68,23 @@ func (h *httpPushserverInternalAPI) QueryPushers(ctx context.Context, req *api.Q apiURL := h.pushserverURL + QueryPushersPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) } + +func (h *httpPushserverInternalAPI) PerformPushRulesPut( + ctx context.Context, + request *api.PerformPushRulesPutRequest, + response *struct{}, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPushRulesPut") + defer span.Finish() + + apiURL := h.pushserverURL + PerformPushRulesPutPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +func (h *httpPushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPushRules") + defer span.Finish() + + apiURL := h.pushserverURL + QueryPushRulesPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} diff --git a/pushserver/inthttp/server.go b/pushserver/inthttp/server.go index 876d71914..9482d1955 100644 --- a/pushserver/inthttp/server.go +++ b/pushserver/inthttp/server.go @@ -41,4 +41,45 @@ func AddRoutes(r api.PushserverInternalAPI, internalAPIMux *mux.Router) { }), ) + internalAPIMux.Handle(QueryPushRulesPath, + httputil.MakeInternalAPI("queryPushRules", func(req *http.Request) util.JSONResponse { + request := api.QueryPushRulesRequest{} + response := api.QueryPushRulesResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.QueryPushRules(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + + internalAPIMux.Handle(PerformPusherDeletionPath, + httputil.MakeInternalAPI("performPusherDeletion", func(req *http.Request) util.JSONResponse { + request := api.PerformPushRulesPutRequest{} + response := struct{}{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.PerformPushRulesPut(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + + internalAPIMux.Handle(QueryPushRulesPath, + httputil.MakeInternalAPI("queryPushRules", func(req *http.Request) util.JSONResponse { + request := api.QueryPushRulesRequest{} + response := api.QueryPushRulesResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.QueryPushRules(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/pushserver/producers/syncapi.go b/pushserver/producers/syncapi.go new file mode 100644 index 000000000..279d08674 --- /dev/null +++ b/pushserver/producers/syncapi.go @@ -0,0 +1,41 @@ +package producers + +import ( + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/eventutil" + log "github.com/sirupsen/logrus" +) + +// SyncAPIProducer produces messages for the Sync API server to consume. +type SyncAPIProducer struct { + Producer sarama.SyncProducer + ClientDataTopic string +} + +// SendAccountData sends account data to the Sync API server. +func (p *SyncAPIProducer) SendAccountData(userID string, roomID string, dataType string) error { + var m sarama.ProducerMessage + + data := eventutil.AccountData{ + RoomID: roomID, + Type: dataType, + } + value, err := json.Marshal(data) + if err != nil { + return err + } + + m.Topic = string(p.ClientDataTopic) + m.Key = sarama.StringEncoder(userID) + m.Value = sarama.ByteEncoder(value) + log.WithFields(log.Fields{ + "user_id": userID, + "room_id": roomID, + "data_type": dataType, + }).Infof("Producing to topic '%s'", m.Topic) + + _, _, err = p.Producer.SendMessage(&m) + return err +} diff --git a/pushserver/pushserver.go b/pushserver/pushserver.go index 675d502c4..4fb8098d4 100644 --- a/pushserver/pushserver.go +++ b/pushserver/pushserver.go @@ -5,9 +5,12 @@ import ( "github.com/matrix-org/dendrite/pushserver/api" "github.com/matrix-org/dendrite/pushserver/internal" "github.com/matrix-org/dendrite/pushserver/inthttp" + "github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/storage" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/kafka" + uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" ) @@ -22,14 +25,25 @@ func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) { func NewInternalAPI( cfg *config.PushServer, rsAPI roomserverAPI.RoomserverInternalAPI, + userAPI uapi.UserInternalAPI, ) api.PushserverInternalAPI { db, err := storage.Open(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to push server db") } + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncProducer := &producers.SyncAPIProducer{ + Producer: producer, + // TODO: user API should handle syncs for account data. Right now, + // it's handled by clientapi, and hence uses its topic. When user + // API handles it for all account data, we can remove it from + // here. + ClientDataTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), + } + psAPI := internal.NewPushserverAPI( - cfg, db, + cfg, db, userAPI, syncProducer, ) return psAPI