Make the User API responsible for sending account data output events (#2592)

* Make the User API responsible for sending account data output events

* Clean up producer

* Review comments
This commit is contained in:
Neil Alexander 2022-07-25 17:30:07 +01:00 committed by GitHub
parent 497ab4e1b7
commit 962b76da44
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 74 deletions

View file

@ -48,7 +48,6 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,
TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),

View file

@ -21,7 +21,6 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -32,7 +31,6 @@ import (
// SyncAPIProducer produces events for the sync API server to consume // SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct { type SyncAPIProducer struct {
TopicClientData string
TopicReceiptEvent string TopicReceiptEvent string
TopicSendToDeviceEvent string TopicSendToDeviceEvent string
TopicTypingEvent string TopicTypingEvent string
@ -42,36 +40,6 @@ type SyncAPIProducer struct {
UserAPI userapi.ClientUserAPI UserAPI userapi.ClientUserAPI
} }
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error {
m := &nats.Msg{
Subject: p.TopicClientData,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
data := eventutil.AccountData{
RoomID: roomID,
Type: dataType,
ReadMarker: readMarker,
IgnoredUsers: ignoredUsers,
}
var err error
m.Data, err = json.Marshal(data)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
}).Tracef("Producing to topic '%s'", p.TopicClientData)
_, err = p.JetStream.PublishMsg(m)
return err
}
func (p *SyncAPIProducer) SendReceipt( func (p *SyncAPIProducer) SendReceipt(
ctx context.Context, ctx context.Context,
userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp, userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -127,18 +126,6 @@ func SaveAccountData(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
var ignoredUsers *types.IgnoredUsers
if dataType == "m.ignored_user_list" {
ignoredUsers = &types.IgnoredUsers{}
_ = json.Unmarshal(body, ignoredUsers)
}
// TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, dataType, nil, ignoredUsers); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: struct{}{}, JSON: struct{}{},
@ -191,11 +178,6 @@ func SaveReadMarker(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}
// Handle the read receipt that may be included in the read marker // Handle the read receipt that may be included in the read marker
if r.Read != "" { if r.Read != "" {
return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read) return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read)

View file

@ -18,8 +18,6 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
@ -98,10 +96,6 @@ func PutTag(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if err = syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
}
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: struct{}{}, JSON: struct{}{},
@ -150,11 +144,6 @@ func DeleteTag(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
// TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
}
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: struct{}{}, JSON: struct{}{},

View file

@ -30,11 +30,13 @@ import (
"github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
synctypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage"
@ -64,7 +66,24 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
if req.DataType == "" { if req.DataType == "" {
return fmt.Errorf("data type must not be empty") return fmt.Errorf("data type must not be empty")
} }
return a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData) if err := a.DB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData); err != nil {
util.GetLogger(ctx).WithError(err).Error("a.DB.SaveAccountData failed")
return fmt.Errorf("failed to save account data: %w", err)
}
var ignoredUsers *synctypes.IgnoredUsers
if req.DataType == "m.ignored_user_list" {
ignoredUsers = &synctypes.IgnoredUsers{}
_ = json.Unmarshal(req.AccountData, ignoredUsers)
}
if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
RoomID: req.RoomID,
Type: req.DataType,
IgnoredUsers: ignoredUsers,
}); err != nil {
util.GetLogger(ctx).WithError(err).Error("a.SyncProducer.SendAccountData failed")
return fmt.Errorf("failed to send account data to output: %w", err)
}
return nil
} }
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error { func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
@ -93,7 +112,9 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P
} }
// Inform the SyncAPI about the newly created push_rules // Inform the SyncAPI about the newly created push_rules
if err = a.SyncProducer.SendAccountData(acc.UserID, "", "m.push_rules"); err != nil { if err = a.SyncProducer.SendAccountData(acc.UserID, eventutil.AccountData{
Type: "m.push_rules",
}); err != nil {
util.GetLogger(ctx).WithFields(logrus.Fields{ util.GetLogger(ctx).WithFields(logrus.Fields{
"user_id": acc.UserID, "user_id": acc.UserID,
}).WithError(err).Warn("failed to send account data to the SyncAPI") }).WithError(err).Warn("failed to send account data to the SyncAPI")
@ -732,11 +753,11 @@ func (a *UserInternalAPI) PerformPushRulesPut(
if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil { if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil {
return err return err
} }
if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
if err := a.SyncProducer.SendAccountData(req.UserID, "" /* roomID */, pushRulesAccountDataType); err != nil { Type: pushRulesAccountDataType,
}); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed") util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed")
} }
return nil return nil
} }

View file

@ -34,7 +34,7 @@ func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic stri
} }
// SendAccountData sends account data to the Sync API server. // SendAccountData sends account data to the Sync API server.
func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error { func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) error {
m := &nats.Msg{ m := &nats.Msg{
Subject: p.clientDataTopic, Subject: p.clientDataTopic,
Header: nats.Header{}, Header: nats.Header{},
@ -42,18 +42,15 @@ func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string)
m.Header.Set(jetstream.UserID, userID) m.Header.Set(jetstream.UserID, userID)
var err error var err error
m.Data, err = json.Marshal(eventutil.AccountData{ m.Data, err = json.Marshal(data)
RoomID: roomID,
Type: dataType,
})
if err != nil { if err != nil {
return err return err
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"user_id": userID, "user_id": userID,
"room_id": roomID, "room_id": data.RoomID,
"data_type": dataType, "data_type": data.Type,
}).Tracef("Producing to topic '%s'", p.clientDataTopic) }).Tracef("Producing to topic '%s'", p.clientDataTopic)
_, err = p.producer.PublishMsg(m) _, err = p.producer.PublishMsg(m)