diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index af6cb1f6e..89168301c 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -148,7 +148,7 @@ func main() { eduInputAPI = base.EDUServerClient() } - psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI) + psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI, userAPI) if base.UseHTTPAPIs { pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI) psAPI = base.PushServerHTTPClient() diff --git a/cmd/dendrite-polylith-multi/personalities/pushserver.go b/cmd/dendrite-polylith-multi/personalities/pushserver.go index 97b6e19f1..414450af3 100644 --- a/cmd/dendrite-polylith-multi/personalities/pushserver.go +++ b/cmd/dendrite-polylith-multi/personalities/pushserver.go @@ -22,7 +22,7 @@ import ( ) func PushServer(base *basepkg.BaseDendrite, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI) { - intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI) + intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI, base.UserAPIClient()) pushserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/pushserver/api/api.go b/pushserver/api/api.go index b4aa8235d..67fa582c0 100644 --- a/pushserver/api/api.go +++ b/pushserver/api/api.go @@ -3,6 +3,7 @@ package api import ( "context" + "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/gomatrixserverlib" ) @@ -10,6 +11,9 @@ type PushserverInternalAPI interface { PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error PerformPusherDeletion(ctx context.Context, req *PerformPusherDeletionRequest, res *struct{}) error QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error + + PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error + QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error } type QueryPushersRequest struct { @@ -51,3 +55,16 @@ const ( EmailKind PusherKind = "email" HTTPKind PusherKind = "http" ) + +type PerformPushRulesPutRequest struct { + UserID string `json:"user_id"` + RuleSets *pushrules.AccountRuleSets `json:"rule_sets"` +} + +type QueryPushRulesRequest struct { + UserID string `json:"user_id"` +} + +type QueryPushRulesResponse struct { + RuleSets *pushrules.AccountRuleSets `json:"rule_sets"` +} diff --git a/pushserver/internal/api.go b/pushserver/internal/api.go index 183d2a54e..2362e3877 100644 --- a/pushserver/internal/api.go +++ b/pushserver/internal/api.go @@ -2,11 +2,16 @@ package internal import ( "context" + "encoding/json" + "fmt" "time" + "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/setup/config" + uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/sirupsen/logrus" @@ -14,16 +19,20 @@ import ( // PushserverInternalAPI implements api.PushserverInternalAPI type PushserverInternalAPI struct { - Cfg *config.PushServer - DB storage.Database + Cfg *config.PushServer + DB storage.Database + userAPI uapi.UserInternalAPI + syncProducer *producers.SyncAPIProducer } func NewPushserverAPI( - cfg *config.PushServer, pushserverDB storage.Database, + cfg *config.PushServer, pushserverDB storage.Database, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPIProducer, ) *PushserverInternalAPI { a := &PushserverInternalAPI{ - Cfg: cfg, - DB: pushserverDB, + Cfg: cfg, + DB: pushserverDB, + userAPI: userAPI, + syncProducer: syncProducer, } return a } @@ -71,3 +80,56 @@ func (a *PushserverInternalAPI) QueryPushers(ctx context.Context, req *api.Query res.Pushers, err = a.DB.GetPushers(ctx, req.Localpart) return err } + +func (a *PushserverInternalAPI) PerformPushRulesPut( + ctx context.Context, + req *api.PerformPushRulesPutRequest, + _ *struct{}, +) error { + bs, err := json.Marshal(&req.RuleSets) + if err != nil { + return err + } + userReq := uapi.InputAccountDataRequest{ + UserID: req.UserID, + DataType: pushRulesAccountDataType, + AccountData: json.RawMessage(bs), + } + var userRes uapi.InputAccountDataResponse // empty + if err := a.userAPI.InputAccountData(ctx, &userReq, &userRes); err != nil { + return err + } + + if err := a.syncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil { + util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed") + } + + return nil +} + +func (a *PushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error { + userReq := uapi.QueryAccountDataRequest{ + UserID: req.UserID, + DataType: pushRulesAccountDataType, + } + var userRes uapi.QueryAccountDataResponse + if err := a.userAPI.QueryAccountData(ctx, &userReq, &userRes); err != nil { + return err + } + bs, ok := userRes.GlobalAccountData[pushRulesAccountDataType] + if !ok { + // TODO: should this return the default rules? The default + // rules are written to accounts DB on account creation, so + // this error is unexpected. + return fmt.Errorf("push rules account data not found") + } + var data pushrules.AccountRuleSets + if err := json.Unmarshal([]byte(bs), &data); err != nil { + util.GetLogger(ctx).WithError(err).Error("json.Unmarshal of push rules failed") + return err + } + res.RuleSets = &data + return nil +} + +const pushRulesAccountDataType = "m.push_rules" diff --git a/pushserver/inthttp/client.go b/pushserver/inthttp/client.go index 47e05b100..1de62fbe8 100644 --- a/pushserver/inthttp/client.go +++ b/pushserver/inthttp/client.go @@ -68,3 +68,23 @@ func (h *httpPushserverInternalAPI) QueryPushers(ctx context.Context, req *api.Q apiURL := h.pushserverURL + QueryPushersPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) } + +func (h *httpPushserverInternalAPI) PerformPushRulesPut( + ctx context.Context, + request *api.PerformPushRulesPutRequest, + response *struct{}, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPushRulesPut") + defer span.Finish() + + apiURL := h.pushserverURL + PerformPushRulesPutPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +func (h *httpPushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPushRules") + defer span.Finish() + + apiURL := h.pushserverURL + QueryPushRulesPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} diff --git a/pushserver/inthttp/server.go b/pushserver/inthttp/server.go index 876d71914..9482d1955 100644 --- a/pushserver/inthttp/server.go +++ b/pushserver/inthttp/server.go @@ -41,4 +41,45 @@ func AddRoutes(r api.PushserverInternalAPI, internalAPIMux *mux.Router) { }), ) + internalAPIMux.Handle(QueryPushRulesPath, + httputil.MakeInternalAPI("queryPushRules", func(req *http.Request) util.JSONResponse { + request := api.QueryPushRulesRequest{} + response := api.QueryPushRulesResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.QueryPushRules(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + + internalAPIMux.Handle(PerformPusherDeletionPath, + httputil.MakeInternalAPI("performPusherDeletion", func(req *http.Request) util.JSONResponse { + request := api.PerformPushRulesPutRequest{} + response := struct{}{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.PerformPushRulesPut(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + + internalAPIMux.Handle(QueryPushRulesPath, + httputil.MakeInternalAPI("queryPushRules", func(req *http.Request) util.JSONResponse { + request := api.QueryPushRulesRequest{} + response := api.QueryPushRulesResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.QueryPushRules(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/pushserver/producers/syncapi.go b/pushserver/producers/syncapi.go new file mode 100644 index 000000000..279d08674 --- /dev/null +++ b/pushserver/producers/syncapi.go @@ -0,0 +1,41 @@ +package producers + +import ( + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/eventutil" + log "github.com/sirupsen/logrus" +) + +// SyncAPIProducer produces messages for the Sync API server to consume. +type SyncAPIProducer struct { + Producer sarama.SyncProducer + ClientDataTopic string +} + +// SendAccountData sends account data to the Sync API server. +func (p *SyncAPIProducer) SendAccountData(userID string, roomID string, dataType string) error { + var m sarama.ProducerMessage + + data := eventutil.AccountData{ + RoomID: roomID, + Type: dataType, + } + value, err := json.Marshal(data) + if err != nil { + return err + } + + m.Topic = string(p.ClientDataTopic) + m.Key = sarama.StringEncoder(userID) + m.Value = sarama.ByteEncoder(value) + log.WithFields(log.Fields{ + "user_id": userID, + "room_id": roomID, + "data_type": dataType, + }).Infof("Producing to topic '%s'", m.Topic) + + _, _, err = p.Producer.SendMessage(&m) + return err +} diff --git a/pushserver/pushserver.go b/pushserver/pushserver.go index 675d502c4..4fb8098d4 100644 --- a/pushserver/pushserver.go +++ b/pushserver/pushserver.go @@ -5,9 +5,12 @@ import ( "github.com/matrix-org/dendrite/pushserver/api" "github.com/matrix-org/dendrite/pushserver/internal" "github.com/matrix-org/dendrite/pushserver/inthttp" + "github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/storage" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/kafka" + uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" ) @@ -22,14 +25,25 @@ func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) { func NewInternalAPI( cfg *config.PushServer, rsAPI roomserverAPI.RoomserverInternalAPI, + userAPI uapi.UserInternalAPI, ) api.PushserverInternalAPI { db, err := storage.Open(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to push server db") } + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncProducer := &producers.SyncAPIProducer{ + Producer: producer, + // TODO: user API should handle syncs for account data. Right now, + // it's handled by clientapi, and hence uses its topic. When user + // API handles it for all account data, we can remove it from + // here. + ClientDataTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), + } + psAPI := internal.NewPushserverAPI( - cfg, db, + cfg, db, userAPI, syncProducer, ) return psAPI