Add presence producer

This commit is contained in:
S7evinK 2022-03-31 09:57:15 +02:00
parent d799c35de8
commit 9e42b3acdc
4 changed files with 80 additions and 46 deletions

View file

@ -0,0 +1,44 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"strconv"
"strings"
"time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
)
// FederationAPIPresenceProducer produces events for the federation API server to consume
type FederationAPIPresenceProducer struct {
Topic string
JetStream nats.JetStreamContext
}
func (f *FederationAPIPresenceProducer) SendPresence(
userID, presence string,
) error {
msg := nats.NewMsg(f.Topic)
msg.Header.Set(jetstream.UserID, userID)
msg.Header.Set("presence", strings.ToLower(presence))
msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
_, err := f.JetStream.PublishMsg(msg)
return err
}

View file

@ -19,7 +19,6 @@ package sync
import (
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -28,7 +27,6 @@ import (
keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
@ -37,27 +35,26 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct {
db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen *sync.Map
presence *sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
jetstream JetstreamPublisher
db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen *sync.Map
presence *sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
producer PresencePublisher
}
type JetstreamPublisher interface {
PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error)
type PresencePublisher interface {
SendPresence(userID, presence string) error
}
// NewRequestPool makes a new RequestPool
@ -66,19 +63,19 @@ func NewRequestPool(
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier,
jetstream nats.JetStreamContext,
producer PresencePublisher,
) *RequestPool {
rp := &RequestPool{
db: db,
cfg: cfg,
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: &sync.Map{},
presence: &sync.Map{},
streams: streams,
Notifier: notifier,
jetstream: jetstream,
db: db,
cfg: cfg,
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: &sync.Map{},
presence: &sync.Map{},
streams: streams,
Notifier: notifier,
producer: producer,
}
go rp.cleanLastSeen()
go rp.cleanPresence(time.Minute * 5)
@ -108,11 +105,7 @@ func (rp *RequestPool) cleanPresence(cleanupTime time.Duration) {
}
}
/*
Controls whether the client is automatically marked as online by polling this API.
If this parameter is omitted then the client is automatically marked as online when it uses this API.
Otherwise if the parameter is set to offline then the client is not marked as being online when it uses this API. When set to unavailable, the client is marked as being idle.
*/
// updatePresence sends presence updates to the SyncAPI and FederationAPI
func (rp *RequestPool) updatePresence(presence string, device *userapi.Device) {
if presence == "" {
presence = "online"
@ -134,14 +127,9 @@ func (rp *RequestPool) updatePresence(presence string, device *userapi.Device) {
}
}
msg := nats.NewMsg(rp.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent))
msg.Header.Set(jetstream.UserID, device.UserID)
msg.Header.Set("presence", strings.ToLower(presence))
msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
if _, err := rp.jetstream.PublishMsg(msg); err != nil {
if err := rp.producer.SendPresence(device.UserID, strings.ToLower(presence)); err != nil {
logrus.WithError(err).Error("Unable to publish presence message from sync")
return
}
rp.presence.Store(device.UserID, newPresence)

View file

@ -7,16 +7,15 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/nats-io/nats.go"
)
type dummyPublisher struct {
count int
}
func (d *dummyPublisher) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) {
func (d *dummyPublisher) SendPresence(userID, presence string) error {
d.count++
return nil, nil
return nil
}
func TestRequestPool_updatePresence(t *testing.T) {
@ -80,8 +79,8 @@ func TestRequestPool_updatePresence(t *testing.T) {
},
}
rp := &RequestPool{
presence: &syncMap,
jetstream: publisher,
presence: &syncMap,
producer: publisher,
cfg: &config.SyncAPI{
Matrix: &config.Global{
JetStream: config.JetStream{

View file

@ -63,7 +63,12 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to load notifier ")
}
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, js)
federationPresenceProducer := producers.FederationAPIPresenceProducer{
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
JetStream: js,
}
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
@ -75,8 +80,6 @@ func AddPublicRoutes(
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
}
_ = userAPIReadUpdateProducer
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier,