From dc3a9b826d761629ed002b036d9ee466ff18aeee Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 25 Jul 2022 16:49:54 +0100 Subject: [PATCH] Make the User API responsible for sending account data output events --- clientapi/routing/account_data.go | 13 ------------- userapi/internal/api.go | 22 +++++++++++++++++----- userapi/producers/syncapi.go | 9 ++++++--- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go index a5a3014ab..8075af478 100644 --- a/clientapi/routing/account_data.go +++ b/clientapi/routing/account_data.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/eventutil" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" @@ -127,18 +126,6 @@ func SaveAccountData( return util.ErrorResponse(err) } - var ignoredUsers *types.IgnoredUsers - if dataType == "m.ignored_user_list" { - ignoredUsers = &types.IgnoredUsers{} - _ = json.Unmarshal(body, ignoredUsers) - } - - // TODO: user API should do this since it's account data - if err := syncProducer.SendData(userID, roomID, dataType, nil, ignoredUsers); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed") - return jsonerror.InternalServerError() - } - return util.JSONResponse{ Code: http.StatusOK, JSON: struct{}{}, diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 27ed15a01..ea8aad684 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -35,6 +35,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + synctypes "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" @@ -64,7 +65,20 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc if req.DataType == "" { return fmt.Errorf("data type must not be empty") } - return a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData) + if err := a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData); err != nil { + util.GetLogger(ctx).WithError(err).Error("a.DB.SaveAccountData failed") + return fmt.Errorf("failed to save account data: %w", err) + } + var ignoredUsers *synctypes.IgnoredUsers + if req.DataType == "m.ignored_user_list" { + ignoredUsers = &synctypes.IgnoredUsers{} + _ = json.Unmarshal(req.AccountData, ignoredUsers) + } + if err := a.SyncProducer.SendAccountData(req.UserID, req.RoomID, req.DataType, nil, ignoredUsers); err != nil { + util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed") + return fmt.Errorf("failed to send account data to output: %w", err) + } + return nil } func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error { @@ -93,7 +107,7 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P } // Inform the SyncAPI about the newly created push_rules - if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules"); err != nil { + if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules", nil, nil); err != nil { util.GetLogger(ctx).WithFields(logrus.Fields{ "user_id": acc.UserID, }).WithError(err).Warn("failed to send account data to the SyncAPI") @@ -732,11 +746,9 @@ func (a *UserInternalAPI) PerformPushRulesPut( if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil { return err } - - if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil { + if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType, nil, nil); err != nil { util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed") } - return nil } diff --git a/userapi/producers/syncapi.go b/userapi/producers/syncapi.go index 4a206f333..00cb1aa1f 100644 --- a/userapi/producers/syncapi.go +++ b/userapi/producers/syncapi.go @@ -6,6 +6,7 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" @@ -34,7 +35,7 @@ func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic stri } // SendAccountData sends account data to the Sync API server. -func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error { +func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error { m := &nats.Msg{ Subject: p.clientDataTopic, Header: nats.Header{}, @@ -43,8 +44,10 @@ func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) var err error m.Data, err = json.Marshal(eventutil.AccountData{ - RoomID: roomID, - Type: dataType, + RoomID: roomID, + Type: dataType, + ReadMarker: readMarker, + IgnoredUsers: ignoredUsers, }) if err != nil { return err