diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 4a46a7676..570b6f957 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -280,13 +280,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res continue } - // Clear our local user profile cache, if this is a membership event - if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { - if err = t.userAPI.PerformDeleteProfile(ctx, &userapi.PerformDeleteProfileRequest{UserID: event.Sender()}, &struct{}{}); err != nil { - // non-fatal error, log and continue - util.GetLogger(ctx).WithError(err).Warnf("Transaction: couldn't delete user profile for %s", event.Sender()) - } - } // pass the event to the roomserver which will do auth checks // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently // discarded by the caller of this function diff --git a/userapi/api/api.go b/userapi/api/api.go index e64c77e70..4b0a82436 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -61,7 +61,6 @@ type MediaUserAPI interface { type FederationUserAPI interface { QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error - PerformDeleteProfile(ctx context.Context, req *PerformDeleteProfileRequest, res *struct{}) error } // api functions required by the sync api @@ -630,9 +629,3 @@ type PerformForgetThreePIDRequest QueryLocalpartForThreePIDRequest type PerformSaveThreePIDAssociationRequest struct { ThreePID, Localpart, Medium string } - -type PerformDeleteProfileRequest struct { - UserID string -} - -type PerformDeleteProfileResponse struct{} diff --git a/userapi/api/api_trace.go b/userapi/api/api_trace.go index 153f37a5f..7e2f69615 100644 --- a/userapi/api/api_trace.go +++ b/userapi/api/api_trace.go @@ -204,12 +204,6 @@ func (t *UserInternalAPITrace) PerformSaveThreePIDAssociation(ctx context.Contex return err } -func (t *UserInternalAPITrace) PerformDeleteProfile(ctx context.Context, req *PerformDeleteProfileRequest, res *struct{}) error { - err := t.Impl.PerformDeleteProfile(ctx, req, res) - util.GetLogger(ctx).Infof("PerformDeleteProfile req=%+v res=%+v", js(req), js(res)) - return err -} - func js(thing interface{}) string { b, err := json.Marshal(thing) if err != nil { diff --git a/userapi/consumers/roomserver_outputroomevent.go b/userapi/consumers/roomserver_outputroomevent.go new file mode 100644 index 000000000..f257dc9de --- /dev/null +++ b/userapi/consumers/roomserver_outputroomevent.go @@ -0,0 +1,89 @@ +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/userapi/storage" +) + +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + userDB storage.Database + serverName gomatrixserverlib.ServerName +} + +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call +// Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( + base *base.BaseDendrite, + js nats.JetStreamContext, + userDB storage.Database, +) *OutputRoomEventConsumer { + return &OutputRoomEventConsumer{ + ctx: base.Context(), + jetstream: js, + durable: base.Cfg.Global.JetStream.Durable("UserAPIRoomserverConsumer"), + topic: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + userDB: userDB, + serverName: base.Cfg.Global.ServerName, + } +} + +// Start consuming from room servers +func (s *OutputRoomEventConsumer) Start() error { + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +// onMessage is called when the userapi component receives a new event from +// the room server output log. +func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return true + } + + log.WithFields(log.Fields{ + "type": output.Type, + }).Debug("Got a message in OutputRoomEventConsumer") + + if output.Type == api.OutputTypeNewRoomEvent && output.NewRoomEvent != nil { + ev := output.NewRoomEvent.Event + // Only handle membership events + if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil { + return true + } + localPart, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) + if err != nil { + return true + } + // Profiles from ourselves are updated by API calls, don't delete them. + if domain == s.serverName { + return true + } + log.WithField("user_id", *ev.StateKey()).Debug("Deleting remote user profile") + if err := s.userDB.DeleteProfile(ctx, localPart, domain); err != nil { + // non-fatal error, log and continue + log.WithError(err).WithField("user_id", *ev.StateKey()).Warn("failed to delete user profile") + } + } + + return true +} diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 31202ae7f..9d9c5b2d7 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -873,12 +873,4 @@ func (a *UserInternalAPI) PerformSaveThreePIDAssociation(ctx context.Context, re return a.DB.SaveThreePIDAssociation(ctx, req.ThreePID, req.Localpart, req.Medium) } -func (a *UserInternalAPI) PerformDeleteProfile(ctx context.Context, req *api.PerformDeleteProfileRequest, res *struct{}) error { - localpart, serverName, err := gomatrixserverlib.SplitID('@', req.UserID) - if err != nil { - return err - } - return a.DB.DeleteProfile(ctx, localpart, serverName) -} - const pushRulesAccountDataType = "m.push_rules" diff --git a/userapi/inthttp/client.go b/userapi/inthttp/client.go index f2670e348..a375d6caa 100644 --- a/userapi/inthttp/client.go +++ b/userapi/inthttp/client.go @@ -43,7 +43,6 @@ const ( PerformSetDisplayNamePath = "/userapi/performSetDisplayName" PerformForgetThreePIDPath = "/userapi/performForgetThreePID" PerformSaveThreePIDAssociationPath = "/userapi/performSaveThreePIDAssociation" - PerformDeleteUserProfilePath = "/userapi/performDeleteUserProfile" QueryKeyBackupPath = "/userapi/queryKeyBackup" QueryProfilePath = "/userapi/queryProfile" @@ -440,10 +439,3 @@ func (h *httpUserInternalAPI) PerformSaveThreePIDAssociation( h.httpClient, ctx, request, response, ) } - -func (h *httpUserInternalAPI) PerformDeleteProfile(ctx context.Context, request *api.PerformDeleteProfileRequest, response *struct{}) error { - return httputil.CallInternalRPCAPI( - "PerformDeleteProfile", h.apiURL+PerformDeleteUserProfilePath, - h.httpClient, ctx, request, response, - ) -} diff --git a/userapi/inthttp/server.go b/userapi/inthttp/server.go index 19078de11..e9d89ccf1 100644 --- a/userapi/inthttp/server.go +++ b/userapi/inthttp/server.go @@ -198,8 +198,4 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) { PerformSaveThreePIDAssociationPath, httputil.MakeInternalRPCAPI("UserAPIPerformSaveThreePIDAssociation", s.PerformSaveThreePIDAssociation), ) - internalAPIMux.Handle( - PerformDeleteUserProfilePath, - httputil.MakeInternalRPCAPI("UserAPIPerformDeleteUserProfilePath", s.PerformDeleteProfile), - ) } diff --git a/userapi/userapi.go b/userapi/userapi.go index 2e86d6aa7..95d38ef98 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -18,6 +18,8 @@ import ( "time" "github.com/gorilla/mux" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/internal/pushgateway" keyapi "github.com/matrix-org/dendrite/keyserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" @@ -31,7 +33,6 @@ import ( "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/util" - "github.com/sirupsen/logrus" ) // AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions @@ -96,6 +97,13 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start user API streamed event consumer") } + roomserverConsumer := consumers.NewOutputRoomEventConsumer( + base, js, db, + ) + if err := roomserverConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start user API roomserver event consumer") + } + var cleanOldNotifs func() cleanOldNotifs = func() { logrus.Infof("Cleaning old notifications")