Review comments

This commit is contained in:
Neil Alexander 2022-07-25 17:18:18 +01:00
parent 254e76efdf
commit c42dabbf69
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 16 additions and 15 deletions

View file

@ -48,7 +48,6 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,
TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),

View file

@ -31,7 +31,6 @@ import (
// SyncAPIProducer produces events for the sync API server to consume // SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct { type SyncAPIProducer struct {
TopicClientData string
TopicReceiptEvent string TopicReceiptEvent string
TopicSendToDeviceEvent string TopicSendToDeviceEvent string
TopicTypingEvent string TopicTypingEvent string

View file

@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
@ -74,7 +75,11 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
ignoredUsers = &synctypes.IgnoredUsers{} ignoredUsers = &synctypes.IgnoredUsers{}
_ = json.Unmarshal(req.AccountData, ignoredUsers) _ = json.Unmarshal(req.AccountData, ignoredUsers)
} }
if err := a.SyncProducer.SendAccountData(req.UserID, req.RoomID, req.DataType, nil, ignoredUsers); err != nil { if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
RoomID: req.RoomID,
Type: req.DataType,
IgnoredUsers: ignoredUsers,
}); err != nil {
util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed") util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed")
return fmt.Errorf("failed to send account data to output: %w", err) return fmt.Errorf("failed to send account data to output: %w", err)
} }
@ -107,7 +112,9 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P
} }
// Inform the SyncAPI about the newly created push_rules // Inform the SyncAPI about the newly created push_rules
if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules", nil, nil); err != nil { if err = a.SyncProducer.SendAccountData(acc.UserID, eventutil.AccountData{
Type: "m.push_rules",
}); err != nil {
util.GetLogger(ctx).WithFields(logrus.Fields{ util.GetLogger(ctx).WithFields(logrus.Fields{
"user_id": acc.UserID, "user_id": acc.UserID,
}).WithError(err).Warn("failed to send account data to the SyncAPI") }).WithError(err).Warn("failed to send account data to the SyncAPI")
@ -746,7 +753,9 @@ func (a *UserInternalAPI) PerformPushRulesPut(
if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil { if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil {
return err return err
} }
if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType, nil, nil); err != nil { if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
Type: pushRulesAccountDataType,
}); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed") util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed")
} }
return nil return nil

View file

@ -6,7 +6,6 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -35,7 +34,7 @@ func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic stri
} }
// SendAccountData sends account data to the Sync API server. // SendAccountData sends account data to the Sync API server.
func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error { func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) error {
m := &nats.Msg{ m := &nats.Msg{
Subject: p.clientDataTopic, Subject: p.clientDataTopic,
Header: nats.Header{}, Header: nats.Header{},
@ -43,20 +42,15 @@ func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string,
m.Header.Set(jetstream.UserID, userID) m.Header.Set(jetstream.UserID, userID)
var err error var err error
m.Data, err = json.Marshal(eventutil.AccountData{ m.Data, err = json.Marshal(data)
RoomID: roomID,
Type: dataType,
ReadMarker: readMarker,
IgnoredUsers: ignoredUsers,
})
if err != nil { if err != nil {
return err return err
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"user_id": userID, "user_id": userID,
"room_id": roomID, "room_id": data.RoomID,
"data_type": dataType, "data_type": data.Type,
}).Tracef("Producing to topic '%s'", p.clientDataTopic) }).Tracef("Producing to topic '%s'", p.clientDataTopic)
_, err = p.producer.PublishMsg(m) _, err = p.producer.PublishMsg(m)