Add push rules query/put API in Pushserver.

This manipulates account data over User API, and fires sync messages
for changes. Those sync messages should, according to an existing TODO
in clientapi, be moved to userapi.

Forks clientapi/producers/syncapi.go to pushserver/ for later extension.
This commit is contained in:
Tommie Gannert 2021-10-19 11:46:26 +02:00
parent d8c8bfc551
commit f5d8f0e72f
8 changed files with 203 additions and 8 deletions

View file

@ -148,7 +148,7 @@ func main() {
eduInputAPI = base.EDUServerClient()
}
psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI)
psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI, userAPI)
if base.UseHTTPAPIs {
pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI)
psAPI = base.PushServerHTTPClient()

View file

@ -22,7 +22,7 @@ import (
)
func PushServer(base *basepkg.BaseDendrite, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI) {
intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI)
intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI, base.UserAPIClient())
pushserver.AddInternalRoutes(base.InternalAPIMux, intAPI)

View file

@ -3,6 +3,7 @@ package api
import (
"context"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/gomatrixserverlib"
)
@ -10,6 +11,9 @@ type PushserverInternalAPI interface {
PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error
PerformPusherDeletion(ctx context.Context, req *PerformPusherDeletionRequest, res *struct{}) error
QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error
PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error
QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error
}
type QueryPushersRequest struct {
@ -51,3 +55,16 @@ const (
EmailKind PusherKind = "email"
HTTPKind PusherKind = "http"
)
type PerformPushRulesPutRequest struct {
UserID string `json:"user_id"`
RuleSets *pushrules.AccountRuleSets `json:"rule_sets"`
}
type QueryPushRulesRequest struct {
UserID string `json:"user_id"`
}
type QueryPushRulesResponse struct {
RuleSets *pushrules.AccountRuleSets `json:"rule_sets"`
}

View file

@ -2,11 +2,16 @@ package internal
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/setup/config"
uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@ -14,16 +19,20 @@ import (
// PushserverInternalAPI implements api.PushserverInternalAPI
type PushserverInternalAPI struct {
Cfg *config.PushServer
DB storage.Database
Cfg *config.PushServer
DB storage.Database
userAPI uapi.UserInternalAPI
syncProducer *producers.SyncAPIProducer
}
func NewPushserverAPI(
cfg *config.PushServer, pushserverDB storage.Database,
cfg *config.PushServer, pushserverDB storage.Database, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPIProducer,
) *PushserverInternalAPI {
a := &PushserverInternalAPI{
Cfg: cfg,
DB: pushserverDB,
Cfg: cfg,
DB: pushserverDB,
userAPI: userAPI,
syncProducer: syncProducer,
}
return a
}
@ -71,3 +80,56 @@ func (a *PushserverInternalAPI) QueryPushers(ctx context.Context, req *api.Query
res.Pushers, err = a.DB.GetPushers(ctx, req.Localpart)
return err
}
func (a *PushserverInternalAPI) PerformPushRulesPut(
ctx context.Context,
req *api.PerformPushRulesPutRequest,
_ *struct{},
) error {
bs, err := json.Marshal(&req.RuleSets)
if err != nil {
return err
}
userReq := uapi.InputAccountDataRequest{
UserID: req.UserID,
DataType: pushRulesAccountDataType,
AccountData: json.RawMessage(bs),
}
var userRes uapi.InputAccountDataResponse // empty
if err := a.userAPI.InputAccountData(ctx, &userReq, &userRes); err != nil {
return err
}
if err := a.syncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed")
}
return nil
}
func (a *PushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error {
userReq := uapi.QueryAccountDataRequest{
UserID: req.UserID,
DataType: pushRulesAccountDataType,
}
var userRes uapi.QueryAccountDataResponse
if err := a.userAPI.QueryAccountData(ctx, &userReq, &userRes); err != nil {
return err
}
bs, ok := userRes.GlobalAccountData[pushRulesAccountDataType]
if !ok {
// TODO: should this return the default rules? The default
// rules are written to accounts DB on account creation, so
// this error is unexpected.
return fmt.Errorf("push rules account data not found")
}
var data pushrules.AccountRuleSets
if err := json.Unmarshal([]byte(bs), &data); err != nil {
util.GetLogger(ctx).WithError(err).Error("json.Unmarshal of push rules failed")
return err
}
res.RuleSets = &data
return nil
}
const pushRulesAccountDataType = "m.push_rules"

View file

@ -68,3 +68,23 @@ func (h *httpPushserverInternalAPI) QueryPushers(ctx context.Context, req *api.Q
apiURL := h.pushserverURL + QueryPushersPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}
func (h *httpPushserverInternalAPI) PerformPushRulesPut(
ctx context.Context,
request *api.PerformPushRulesPutRequest,
response *struct{},
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPushRulesPut")
defer span.Finish()
apiURL := h.pushserverURL + PerformPushRulesPutPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpPushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPushRules")
defer span.Finish()
apiURL := h.pushserverURL + QueryPushRulesPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}

View file

@ -41,4 +41,45 @@ func AddRoutes(r api.PushserverInternalAPI, internalAPIMux *mux.Router) {
}),
)
internalAPIMux.Handle(QueryPushRulesPath,
httputil.MakeInternalAPI("queryPushRules", func(req *http.Request) util.JSONResponse {
request := api.QueryPushRulesRequest{}
response := api.QueryPushRulesResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.QueryPushRules(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformPusherDeletionPath,
httputil.MakeInternalAPI("performPusherDeletion", func(req *http.Request) util.JSONResponse {
request := api.PerformPushRulesPutRequest{}
response := struct{}{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.PerformPushRulesPut(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(QueryPushRulesPath,
httputil.MakeInternalAPI("queryPushRules", func(req *http.Request) util.JSONResponse {
request := api.QueryPushRulesRequest{}
response := api.QueryPushRulesResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.QueryPushRules(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -0,0 +1,41 @@
package producers
import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/eventutil"
log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces messages for the Sync API server to consume.
type SyncAPIProducer struct {
Producer sarama.SyncProducer
ClientDataTopic string
}
// SendAccountData sends account data to the Sync API server.
func (p *SyncAPIProducer) SendAccountData(userID string, roomID string, dataType string) error {
var m sarama.ProducerMessage
data := eventutil.AccountData{
RoomID: roomID,
Type: dataType,
}
value, err := json.Marshal(data)
if err != nil {
return err
}
m.Topic = string(p.ClientDataTopic)
m.Key = sarama.StringEncoder(userID)
m.Value = sarama.ByteEncoder(value)
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
}).Infof("Producing to topic '%s'", m.Topic)
_, _, err = p.Producer.SendMessage(&m)
return err
}

View file

@ -5,9 +5,12 @@ import (
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/internal"
"github.com/matrix-org/dendrite/pushserver/inthttp"
"github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/kafka"
uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
)
@ -22,14 +25,25 @@ func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) {
func NewInternalAPI(
cfg *config.PushServer,
rsAPI roomserverAPI.RoomserverInternalAPI,
userAPI uapi.UserInternalAPI,
) api.PushserverInternalAPI {
db, err := storage.Open(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to push server db")
}
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
// TODO: user API should handle syncs for account data. Right now,
// it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from
// here.
ClientDataTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
}
psAPI := internal.NewPushserverAPI(
cfg, db,
cfg, db, userAPI, syncProducer,
)
return psAPI