diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 557355604..080d4d9fa 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,7 +48,6 @@ func AddPublicRoutes( syncProducer := &producers.SyncAPIProducer{ JetStream: js, - TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 54d32f1bb..5933ce1a8 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -31,7 +31,6 @@ import ( // SyncAPIProducer produces events for the sync API server to consume type SyncAPIProducer struct { - TopicClientData string TopicReceiptEvent string TopicSendToDeviceEvent string TopicTypingEvent string diff --git a/userapi/internal/api.go b/userapi/internal/api.go index ea8aad684..422eb076e 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/clientapi/userutil" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/sqlutil" keyapi "github.com/matrix-org/dendrite/keyserver/api" @@ -74,7 +75,11 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc ignoredUsers = &synctypes.IgnoredUsers{} _ = json.Unmarshal(req.AccountData, ignoredUsers) } - if err := a.SyncProducer.SendAccountData(req.UserID, req.RoomID, req.DataType, nil, ignoredUsers); err != nil { + if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{ + RoomID: req.RoomID, + Type: req.DataType, + IgnoredUsers: ignoredUsers, + }); err != nil { util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed") return fmt.Errorf("failed to send account data to output: %w", err) } @@ -107,7 +112,9 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P } // Inform the SyncAPI about the newly created push_rules - if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules", nil, nil); err != nil { + if err = a.SyncProducer.SendAccountData(acc.UserID, eventutil.AccountData{ + Type: "m.push_rules", + }); err != nil { util.GetLogger(ctx).WithFields(logrus.Fields{ "user_id": acc.UserID, }).WithError(err).Warn("failed to send account data to the SyncAPI") @@ -746,7 +753,9 @@ func (a *UserInternalAPI) PerformPushRulesPut( if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil { return err } - if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType, nil, nil); err != nil { + if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{ + Type: pushRulesAccountDataType, + }); err != nil { util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed") } return nil diff --git a/userapi/producers/syncapi.go b/userapi/producers/syncapi.go index 00cb1aa1f..27cfc2848 100644 --- a/userapi/producers/syncapi.go +++ b/userapi/producers/syncapi.go @@ -6,7 +6,6 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" @@ -35,7 +34,7 @@ func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic stri } // SendAccountData sends account data to the Sync API server. -func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error { +func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) error { m := &nats.Msg{ Subject: p.clientDataTopic, Header: nats.Header{}, @@ -43,20 +42,15 @@ func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string, m.Header.Set(jetstream.UserID, userID) var err error - m.Data, err = json.Marshal(eventutil.AccountData{ - RoomID: roomID, - Type: dataType, - ReadMarker: readMarker, - IgnoredUsers: ignoredUsers, - }) + m.Data, err = json.Marshal(data) if err != nil { return err } log.WithFields(log.Fields{ "user_id": userID, - "room_id": roomID, - "data_type": dataType, + "room_id": data.RoomID, + "data_type": data.Type, }).Tracef("Producing to topic '%s'", p.clientDataTopic) _, err = p.producer.PublishMsg(m)