Syncapi presence

This commit is contained in:
S7evinK 2022-03-31 08:06:43 +02:00
parent 8213b2ba30
commit b6017c9901
12 changed files with 221 additions and 25 deletions

View file

@ -228,6 +228,28 @@ func (n *Notifier) OnNewNotificationData(
n.wakeupUsers([]string{userID}, nil, n.currPos)
}
func (n *Notifier) OnNewPresence(
posUpdate types.StreamingToken, userID string,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
sharedUsers := n.sharedUsers(userID)
sharedUsers = append(sharedUsers, userID)
n.wakeupUsers(sharedUsers, nil, n.currPos)
}
func (n *Notifier) sharedUsers(userID string) (sharedUsers []string) {
for roomID, users := range n.roomIDToJoinedUsers {
if _, ok := users[userID]; ok {
sharedUsers = append(sharedUsers, n.joinedUsers(roomID)...)
}
}
return sharedUsers
}
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos

View file

@ -26,6 +26,7 @@ import (
)
type Database interface {
Presence
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
@ -149,3 +150,10 @@ type Database interface {
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
}
type Presence interface {
UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
GetPresence(ctx context.Context, userID string) (*types.Presence, error)
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error)
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
}

View file

@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
presence, err := NewPostgresPresenceTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -111,6 +115,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
Presence: presence,
}
return &d, nil
}

View file

@ -48,6 +48,7 @@ type Database struct {
Receipts tables.Receipts
Memberships tables.Memberships
NotificationData tables.NotificationData
Presence tables.Presence
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -998,3 +999,19 @@ func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
func (s *Database) UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
}
func (s *Database) GetPresence(ctx context.Context, userID string) (*types.Presence, error) {
return s.Presence.GetPresenceForUser(ctx, nil, userID)
}
func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error) {
return s.Presence.GetPresenceAfter(ctx, nil, after)
}
func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
return s.Presence.GetMaxPresenceID(ctx, nil)
}

View file

@ -24,6 +24,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0)
ON CONFLICT DO NOTHING;
`
const increaseStreamIDStmt = "" +
@ -70,3 +72,9 @@ func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx)
err = increaseStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
return
}
func (s *streamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos)
return
}

View file

@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
presence, err := NewSqlitePresenceTable(d.db, &d.streamID)
if err != nil {
return err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
Presence: presence,
}
return nil
}

View file

@ -181,3 +181,10 @@ type NotificationData interface {
SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
SelectMaxID(ctx context.Context) (int64, error)
}
type Presence interface {
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.Presence, err error)
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.Presence, err error)
}

View file

@ -20,6 +20,7 @@ type Streams struct {
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamProvider
NotificationDataStreamProvider types.StreamProvider
PresenceStreamProvider types.StreamProvider
}
func NewSyncStreamProviders(
@ -56,6 +57,9 @@ func NewSyncStreamProviders(
rsAPI: rsAPI,
keyAPI: keyAPI,
},
PresenceStreamProvider: &PresenceStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
}
streams.PDUStreamProvider.Setup()
@ -66,6 +70,7 @@ func NewSyncStreamProviders(
streams.AccountDataStreamProvider.Setup()
streams.NotificationDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
streams.PresenceStreamProvider.Setup()
return streams
}
@ -80,5 +85,6 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx),
}
}

View file

@ -19,6 +19,7 @@ package sync
import (
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -27,14 +28,18 @@ import (
keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
@ -45,8 +50,14 @@ type RequestPool struct {
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
presence sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
jetstream JetstreamPublisher
}
type JetstreamPublisher interface {
PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error)
}
// NewRequestPool makes a new RequestPool
@ -55,6 +66,7 @@ func NewRequestPool(
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier,
jetstream nats.JetStreamContext,
) *RequestPool {
rp := &RequestPool{
db: db,
@ -63,10 +75,13 @@ func NewRequestPool(
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: sync.Map{},
presence: sync.Map{},
streams: streams,
Notifier: notifier,
jetstream: jetstream,
}
go rp.cleanLastSeen()
go rp.cleanPresence(time.Minute * 5)
return rp
}
@ -80,6 +95,58 @@ func (rp *RequestPool) cleanLastSeen() {
}
}
func (rp *RequestPool) cleanPresence(cleanupTime time.Duration) {
for {
rp.presence.Range(func(key interface{}, v interface{}) bool {
p := v.(types.Presence)
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
rp.presence.Delete(key)
}
return true
})
time.Sleep(cleanupTime)
}
}
/*
Controls whether the client is automatically marked as online by polling this API.
If this parameter is omitted then the client is automatically marked as online when it uses this API.
Otherwise if the parameter is set to offline then the client is not marked as being online when it uses this API. When set to unavailable, the client is marked as being idle.
*/
func (rp *RequestPool) updatePresence(presence string, device *userapi.Device) {
if presence == "" {
presence = "online"
}
newPresence := types.Presence{
ClientFields: types.PresenceClientResponse{
Presence: presence,
},
UserID: device.UserID,
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
// avoid spamming presence updates when syncing
existingPresence, ok := rp.presence.LoadOrStore(device.UserID, newPresence)
if ok {
p := existingPresence.(types.Presence)
if p.ClientFields.Presence == newPresence.ClientFields.Presence {
return
}
}
msg := nats.NewMsg(rp.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent))
msg.Header.Set(jetstream.UserID, device.UserID)
msg.Header.Set("presence", strings.ToLower(presence))
msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
if _, err := rp.jetstream.PublishMsg(msg); err != nil {
logrus.WithError(err).Error("Unable to publish presence message from sync")
}
rp.presence.Store(device.UserID, newPresence)
}
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok {
return
@ -156,6 +223,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device)
rp.updatePresence(req.FormValue("set_presence"), device)
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
@ -219,6 +287,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
@ -255,6 +326,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PresencePosition, currentPos.PresencePosition,
),
}
}

View file

@ -49,7 +49,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
@ -63,7 +63,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to load notifier ")
}
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, js)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
@ -131,5 +131,14 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
presenceConsumer := consumers.NewPresenceConsumer(
process, cfg, js, natsClient, syncDB,
notifier, streams.PresenceStreamProvider,
rsAPI,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")
}
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}

View file

@ -103,6 +103,7 @@ type StreamingToken struct {
AccountDataPosition StreamPosition
DeviceListPosition StreamPosition
NotificationDataPosition StreamPosition
PresencePosition StreamPosition
}
// This will be used as a fallback by json.Marshal.
@ -118,11 +119,12 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d_%d_%d_%d",
"s%d_%d_%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition, t.AccountDataPosition,
t.DeviceListPosition, t.NotificationDataPosition,
t.PresencePosition,
)
return posStr
}
@ -146,12 +148,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.NotificationDataPosition > other.NotificationDataPosition:
return true
case t.PresencePosition > other.PresencePosition:
return true
}
return false
}
func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -192,6 +196,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.NotificationDataPosition > t.NotificationDataPosition {
t.NotificationDataPosition = other.NotificationDataPosition
}
if other.PresencePosition > t.PresencePosition {
t.PresencePosition = other.PresencePosition
}
}
type TopologyToken struct {
@ -284,7 +291,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
// s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions
tok = strings.Split(tok, ".")[0]
parts := strings.Split(tok[1:], "_")
var positions [8]StreamPosition
var positions [9]StreamPosition
for i, p := range parts {
if i >= len(positions) {
break
@ -306,6 +313,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
AccountDataPosition: positions[5],
DeviceListPosition: positions[6],
NotificationDataPosition: positions[7],
PresencePosition: positions[8],
}
return token, nil
}
@ -505,3 +513,29 @@ type OutputSendToDeviceEvent struct {
DeviceID string `json:"device_id"`
gomatrixserverlib.SendToDeviceEvent
}
type Presence struct {
ClientFields PresenceClientResponse
StreamPos StreamPosition `json:"-"`
UserID string `json:"-"`
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
}
type PresenceClientResponse struct {
CurrentlyActive bool `json:"currently_active"`
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
Presence string `json:"presence"`
StatusMsg *string `json:"status_msg,omitempty"`
}
var PresenceToInt = map[string]int{
"unavailable": 1,
"online": 2,
"offline": 3,
}
var PresenceToString = map[int]string{
1: "unavailable",
2: "online",
3: "offline",
}

View file

@ -9,9 +9,9 @@ import (
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
"s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(),
"s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(),
"s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(),
"s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(),
"s3_1_0_0_0_0_2_0_5": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5}.String(),
"s3_1_2_3_5_0_0_0_6": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6}.String(),
"t3_1": TopologyToken{3, 1}.String(),
}