From 9e42b3acdcff513209cf860d1064a492f730b52a Mon Sep 17 00:00:00 2001 From: S7evinK Date: Thu, 31 Mar 2022 09:57:15 +0200 Subject: [PATCH] Add presence producer --- syncapi/producers/federationapi_presence.go | 44 ++++++++++++++ syncapi/sync/requestpool.go | 64 +++++++++------------ syncapi/sync/requestpool_test.go | 9 ++- syncapi/syncapi.go | 9 ++- 4 files changed, 80 insertions(+), 46 deletions(-) create mode 100644 syncapi/producers/federationapi_presence.go diff --git a/syncapi/producers/federationapi_presence.go b/syncapi/producers/federationapi_presence.go new file mode 100644 index 000000000..09b6a9271 --- /dev/null +++ b/syncapi/producers/federationapi_presence.go @@ -0,0 +1,44 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "strconv" + "strings" + "time" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" +) + +// FederationAPIPresenceProducer produces events for the federation API server to consume +type FederationAPIPresenceProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +func (f *FederationAPIPresenceProducer) SendPresence( + userID, presence string, +) error { + msg := nats.NewMsg(f.Topic) + msg.Header.Set(jetstream.UserID, userID) + msg.Header.Set("presence", strings.ToLower(presence)) + msg.Header.Set("from_sync", "true") // only update last_active_ts and presence + msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now())))) + + _, err := f.JetStream.PublishMsg(msg) + return err +} diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 7ae001771..63599b7d6 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -19,7 +19,6 @@ package sync import ( "net" "net/http" - "strconv" "strings" "sync" "time" @@ -28,7 +27,6 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -37,27 +35,26 @@ import ( userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - cfg *config.SyncAPI - userAPI userapi.UserInternalAPI - keyAPI keyapi.KeyInternalAPI - rsAPI roomserverAPI.RoomserverInternalAPI - lastseen *sync.Map - presence *sync.Map - streams *streams.Streams - Notifier *notifier.Notifier - jetstream JetstreamPublisher + db storage.Database + cfg *config.SyncAPI + userAPI userapi.UserInternalAPI + keyAPI keyapi.KeyInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI + lastseen *sync.Map + presence *sync.Map + streams *streams.Streams + Notifier *notifier.Notifier + producer PresencePublisher } -type JetstreamPublisher interface { - PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) +type PresencePublisher interface { + SendPresence(userID, presence string) error } // NewRequestPool makes a new RequestPool @@ -66,19 +63,19 @@ func NewRequestPool( userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, streams *streams.Streams, notifier *notifier.Notifier, - jetstream nats.JetStreamContext, + producer PresencePublisher, ) *RequestPool { rp := &RequestPool{ - db: db, - cfg: cfg, - userAPI: userAPI, - keyAPI: keyAPI, - rsAPI: rsAPI, - lastseen: &sync.Map{}, - presence: &sync.Map{}, - streams: streams, - Notifier: notifier, - jetstream: jetstream, + db: db, + cfg: cfg, + userAPI: userAPI, + keyAPI: keyAPI, + rsAPI: rsAPI, + lastseen: &sync.Map{}, + presence: &sync.Map{}, + streams: streams, + Notifier: notifier, + producer: producer, } go rp.cleanLastSeen() go rp.cleanPresence(time.Minute * 5) @@ -108,11 +105,7 @@ func (rp *RequestPool) cleanPresence(cleanupTime time.Duration) { } } -/* -Controls whether the client is automatically marked as online by polling this API. -If this parameter is omitted then the client is automatically marked as online when it uses this API. -Otherwise if the parameter is set to “offline” then the client is not marked as being online when it uses this API. When set to “unavailable”, the client is marked as being idle. -*/ +// updatePresence sends presence updates to the SyncAPI and FederationAPI func (rp *RequestPool) updatePresence(presence string, device *userapi.Device) { if presence == "" { presence = "online" @@ -134,14 +127,9 @@ func (rp *RequestPool) updatePresence(presence string, device *userapi.Device) { } } - msg := nats.NewMsg(rp.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent)) - msg.Header.Set(jetstream.UserID, device.UserID) - msg.Header.Set("presence", strings.ToLower(presence)) - msg.Header.Set("from_sync", "true") // only update last_active_ts and presence - msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now())))) - - if _, err := rp.jetstream.PublishMsg(msg); err != nil { + if err := rp.producer.SendPresence(device.UserID, strings.ToLower(presence)); err != nil { logrus.WithError(err).Error("Unable to publish presence message from sync") + return } rp.presence.Store(device.UserID, newPresence) diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go index 838e4133b..1d11b9df3 100644 --- a/syncapi/sync/requestpool_test.go +++ b/syncapi/sync/requestpool_test.go @@ -7,16 +7,15 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/api" - "github.com/nats-io/nats.go" ) type dummyPublisher struct { count int } -func (d *dummyPublisher) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { +func (d *dummyPublisher) SendPresence(userID, presence string) error { d.count++ - return nil, nil + return nil } func TestRequestPool_updatePresence(t *testing.T) { @@ -80,8 +79,8 @@ func TestRequestPool_updatePresence(t *testing.T) { }, } rp := &RequestPool{ - presence: &syncMap, - jetstream: publisher, + presence: &syncMap, + producer: publisher, cfg: &config.SyncAPI{ Matrix: &config.Global{ JetStream: config.JetStream{ diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index de5eb1329..1581149b2 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -63,7 +63,12 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to load notifier ") } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, js) + federationPresenceProducer := producers.FederationAPIPresenceProducer{ + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + JetStream: js, + } + + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, @@ -75,8 +80,6 @@ func AddPublicRoutes( Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), } - _ = userAPIReadUpdateProducer - keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier,