From 40f65563a03562c5fabcd3375a8826afeaad3752 Mon Sep 17 00:00:00 2001 From: S7evinK Date: Thu, 31 Mar 2022 08:07:21 +0200 Subject: [PATCH] Clientapi http presence handler --- clientapi/clientapi.go | 5 +++-- clientapi/producers/syncapi.go | 20 +++++++++++++++++++ clientapi/routing/routing.go | 36 ++++++++++++++++++++-------------- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index d4b417a31..e2f8d3f32 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,7 +48,7 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) + js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, @@ -56,6 +56,7 @@ func AddPublicRoutes( TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), UserAPI: userAPI, ServerName: cfg.Matrix.ServerName, } @@ -64,6 +65,6 @@ func AddPublicRoutes( router, synapseAdminRouter, cfg, rsAPI, asAPI, userAPI, userDirectoryProvider, federation, syncProducer, transactionsCache, fsAPI, keyAPI, - extRoomsProvider, mscCfg, + extRoomsProvider, mscCfg, natsClient, ) } diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 2dee04e3a..814dd70a6 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "strconv" + "time" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" @@ -34,6 +35,7 @@ type SyncAPIProducer struct { TopicReceiptEvent string TopicSendToDeviceEvent string TopicTypingEvent string + TopicPresenceEvent string JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName UserAPI userapi.UserInternalAPI @@ -173,3 +175,21 @@ func (p *SyncAPIProducer) SendTyping( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendPresence( + ctx context.Context, userID, presence string, statusMsg *string, +) error { + m := nats.NewMsg(p.TopicPresenceEvent) + m.Header.Set(jetstream.UserID, userID) + m.Header.Set("presence", presence) + nilMsg := statusMsg == nil + if !nilMsg { + m.Header.Set("status_msg", *statusMsg) + } + m.Header.Set("status_msg_nil", strconv.FormatBool(nilMsg)) + + m.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now())))) + + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 8afaba560..93d6ea01c 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -32,9 +32,11 @@ import ( keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) @@ -56,7 +58,7 @@ func Setup( federationSender federationAPI.FederationInternalAPI, keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, - mscCfg *config.MSCs, + mscCfg *config.MSCs, natsClient *nats.Conn, ) { rateLimits := httputil.NewRateLimits(&cfg.RateLimiting) userInteractiveAuth := auth.NewUserInteractive(userAPI, cfg) @@ -779,20 +781,6 @@ func Setup( }), ).Methods(http.MethodPost, http.MethodOptions) - // Element logs get flooded unless this is handled - v3mux.Handle("/presence/{userID}/status", - httputil.MakeExternalAPI("presence", func(req *http.Request) util.JSONResponse { - if r := rateLimits.Limit(req); r != nil { - return *r - } - // TODO: Set presence (probably the responsibility of a presence server not clientapi) - return util.JSONResponse{ - Code: http.StatusOK, - JSON: struct{}{}, - } - }), - ).Methods(http.MethodPut, http.MethodOptions) - v3mux.Handle("/voip/turnServer", httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { if r := rateLimits.Limit(req); r != nil { @@ -1298,4 +1286,22 @@ func Setup( return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"]) }), ).Methods(http.MethodPost, http.MethodOptions) + v3mux.Handle("/presence/{userId}/status", + httputil.MakeAuthAPI("set_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + return SetPresence(req, device, syncProducer, vars["userId"]) + }), + ).Methods(http.MethodPut, http.MethodOptions) + v3mux.Handle("/presence/{userId}/status", + httputil.MakeAuthAPI("get_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + return GetPresence(req, device, natsClient, cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence), vars["userId"]) + }), + ).Methods(http.MethodGet, http.MethodOptions) }