diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 6a641e6f8..5090c3236 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -228,6 +228,28 @@ func (n *Notifier) OnNewNotificationData( n.wakeupUsers([]string{userID}, nil, n.currPos) } +func (n *Notifier) OnNewPresence( + posUpdate types.StreamingToken, userID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.currPos.ApplyUpdates(posUpdate) + sharedUsers := n.sharedUsers(userID) + sharedUsers = append(sharedUsers, userID) + + n.wakeupUsers(sharedUsers, nil, n.currPos) +} + +func (n *Notifier) sharedUsers(userID string) (sharedUsers []string) { + for roomID, users := range n.roomIDToJoinedUsers { + if _, ok := users[userID]; ok { + sharedUsers = append(sharedUsers, n.joinedUsers(roomID)...) + } + } + return sharedUsers +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 647fffad5..018d61f70 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -26,6 +26,7 @@ import ( ) type Database interface { + Presence MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) @@ -149,3 +150,10 @@ type Database interface { StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error) } + +type Presence interface { + UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) + GetPresence(ctx context.Context, userID string) (*types.Presence, error) + PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error) + MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 4e4b5c0bb..54445c7e3 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if err != nil { return nil, err } + presence, err := NewPostgresPresenceTable(d.db) + if err != nil { + return nil, err + } m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) @@ -111,6 +115,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e Receipts: receipts, Memberships: memberships, NotificationData: notificationData, + Presence: presence, } return &d, nil } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 349e44526..6b78f69d7 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -48,6 +48,7 @@ type Database struct { Receipts tables.Receipts Memberships tables.Memberships NotificationData tables.NotificationData + Presence tables.Presence } func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { @@ -998,3 +999,19 @@ func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) } + +func (s *Database) UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { + return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) +} + +func (s *Database) GetPresence(ctx context.Context, userID string) (*types.Presence, error) { + return s.Presence.GetPresenceForUser(ctx, nil, userID) +} + +func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error) { + return s.Presence.GetPresenceAfter(ctx, nil, after) +} + +func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { + return s.Presence.GetMaxPresenceID(ctx, nil) +} diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go index 2be3ae93d..faa2c41fe 100644 --- a/syncapi/storage/sqlite3/stream_id_table.go +++ b/syncapi/storage/sqlite3/stream_id_table.go @@ -24,6 +24,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0) ON CONFLICT DO NOTHING; INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0) ON CONFLICT DO NOTHING; +INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0) + ON CONFLICT DO NOTHING; ` const increaseStreamIDStmt = "" + @@ -70,3 +72,9 @@ func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx) err = increaseStmt.QueryRowContext(ctx, "accountdata").Scan(&pos) return } + +func (s *streamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt) + err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos) + return +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index cb7e3b46f..cb2c3169d 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err != nil { return err } + presence, err := NewSqlitePresenceTable(d.db, &d.streamID) + if err != nil { + return err + } m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) @@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er Receipts: receipts, Memberships: memberships, NotificationData: notificationData, + Presence: presence, } return nil } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ba0076e22..08e589a33 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -181,3 +181,10 @@ type NotificationData interface { SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) SelectMaxID(ctx context.Context) (int64, error) } + +type Presence interface { + UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error) + GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.Presence, err error) + GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) + GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.Presence, err error) +} diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index b2273aadb..fbbb3544c 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -20,6 +20,7 @@ type Streams struct { AccountDataStreamProvider types.StreamProvider DeviceListStreamProvider types.StreamProvider NotificationDataStreamProvider types.StreamProvider + PresenceStreamProvider types.StreamProvider } func NewSyncStreamProviders( @@ -56,6 +57,9 @@ func NewSyncStreamProviders( rsAPI: rsAPI, keyAPI: keyAPI, }, + PresenceStreamProvider: &PresenceStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, } streams.PDUStreamProvider.Setup() @@ -66,6 +70,7 @@ func NewSyncStreamProviders( streams.AccountDataStreamProvider.Setup() streams.NotificationDataStreamProvider.Setup() streams.DeviceListStreamProvider.Setup() + streams.PresenceStreamProvider.Setup() return streams } @@ -80,5 +85,6 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken { AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx), NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx), DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx), + PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx), } } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 2c9920d18..baa14cb17 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -19,6 +19,7 @@ package sync import ( "net" "net/http" + "strconv" "strings" "sync" "time" @@ -27,26 +28,36 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - cfg *config.SyncAPI - userAPI userapi.UserInternalAPI - keyAPI keyapi.KeyInternalAPI - rsAPI roomserverAPI.RoomserverInternalAPI - lastseen sync.Map - streams *streams.Streams - Notifier *notifier.Notifier + db storage.Database + cfg *config.SyncAPI + userAPI userapi.UserInternalAPI + keyAPI keyapi.KeyInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI + lastseen sync.Map + presence sync.Map + streams *streams.Streams + Notifier *notifier.Notifier + jetstream JetstreamPublisher +} + +type JetstreamPublisher interface { + PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) } // NewRequestPool makes a new RequestPool @@ -55,18 +66,22 @@ func NewRequestPool( userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, streams *streams.Streams, notifier *notifier.Notifier, + jetstream nats.JetStreamContext, ) *RequestPool { rp := &RequestPool{ - db: db, - cfg: cfg, - userAPI: userAPI, - keyAPI: keyAPI, - rsAPI: rsAPI, - lastseen: sync.Map{}, - streams: streams, - Notifier: notifier, + db: db, + cfg: cfg, + userAPI: userAPI, + keyAPI: keyAPI, + rsAPI: rsAPI, + lastseen: sync.Map{}, + presence: sync.Map{}, + streams: streams, + Notifier: notifier, + jetstream: jetstream, } go rp.cleanLastSeen() + go rp.cleanPresence(time.Minute * 5) return rp } @@ -80,6 +95,58 @@ func (rp *RequestPool) cleanLastSeen() { } } +func (rp *RequestPool) cleanPresence(cleanupTime time.Duration) { + for { + rp.presence.Range(func(key interface{}, v interface{}) bool { + p := v.(types.Presence) + if time.Since(p.LastActiveTS.Time()) > cleanupTime { + rp.presence.Delete(key) + } + return true + }) + time.Sleep(cleanupTime) + } +} + +/* +Controls whether the client is automatically marked as online by polling this API. +If this parameter is omitted then the client is automatically marked as online when it uses this API. +Otherwise if the parameter is set to “offline” then the client is not marked as being online when it uses this API. When set to “unavailable”, the client is marked as being idle. +*/ +func (rp *RequestPool) updatePresence(presence string, device *userapi.Device) { + if presence == "" { + presence = "online" + } + + newPresence := types.Presence{ + ClientFields: types.PresenceClientResponse{ + Presence: presence, + }, + UserID: device.UserID, + LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()), + } + // avoid spamming presence updates when syncing + existingPresence, ok := rp.presence.LoadOrStore(device.UserID, newPresence) + if ok { + p := existingPresence.(types.Presence) + if p.ClientFields.Presence == newPresence.ClientFields.Presence { + return + } + } + + msg := nats.NewMsg(rp.cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent)) + msg.Header.Set(jetstream.UserID, device.UserID) + msg.Header.Set("presence", strings.ToLower(presence)) + msg.Header.Set("from_sync", "true") // only update last_active_ts and presence + msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now())))) + + if _, err := rp.jetstream.PublishMsg(msg); err != nil { + logrus.WithError(err).Error("Unable to publish presence message from sync") + } + + rp.presence.Store(device.UserID, newPresence) +} + func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) { if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok { return @@ -156,6 +223,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. defer activeSyncRequests.Dec() rp.updateLastSeen(req, device) + rp.updatePresence(req.FormValue("set_presence"), device) waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() @@ -219,6 +287,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( syncReq.Context, syncReq, ), + PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), } } else { // Incremental sync @@ -255,6 +326,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, ), + PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.PresencePosition, currentPos.PresencePosition, + ), } } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index b579467ae..de5eb1329 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) + js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -63,7 +63,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to load notifier ") } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, js) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, @@ -131,5 +131,14 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start receipts consumer") } + presenceConsumer := consumers.NewPresenceConsumer( + process, cfg, js, natsClient, syncDB, + notifier, streams.PresenceStreamProvider, + rsAPI, + ) + if err = presenceConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start presence consumer") + } + routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index f964b80b5..93256d398 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -103,6 +103,7 @@ type StreamingToken struct { AccountDataPosition StreamPosition DeviceListPosition StreamPosition NotificationDataPosition StreamPosition + PresencePosition StreamPosition } // This will be used as a fallback by json.Marshal. @@ -118,11 +119,12 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition, t.NotificationDataPosition, + t.PresencePosition, ) return posStr } @@ -146,12 +148,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.NotificationDataPosition > other.NotificationDataPosition: return true + case t.PresencePosition > other.PresencePosition: + return true } return false } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0 + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0 } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -192,6 +196,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.NotificationDataPosition > t.NotificationDataPosition { t.NotificationDataPosition = other.NotificationDataPosition } + if other.PresencePosition > t.PresencePosition { + t.PresencePosition = other.PresencePosition + } } type TopologyToken struct { @@ -284,7 +291,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { // s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions tok = strings.Split(tok, ".")[0] parts := strings.Split(tok[1:], "_") - var positions [8]StreamPosition + var positions [9]StreamPosition for i, p := range parts { if i >= len(positions) { break @@ -306,6 +313,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { AccountDataPosition: positions[5], DeviceListPosition: positions[6], NotificationDataPosition: positions[7], + PresencePosition: positions[8], } return token, nil } @@ -505,3 +513,29 @@ type OutputSendToDeviceEvent struct { DeviceID string `json:"device_id"` gomatrixserverlib.SendToDeviceEvent } + +type Presence struct { + ClientFields PresenceClientResponse + StreamPos StreamPosition `json:"-"` + UserID string `json:"-"` + LastActiveTS gomatrixserverlib.Timestamp `json:"-"` +} + +type PresenceClientResponse struct { + CurrentlyActive bool `json:"currently_active"` + LastActiveAgo int64 `json:"last_active_ago,omitempty"` + Presence string `json:"presence"` + StatusMsg *string `json:"status_msg,omitempty"` +} + +var PresenceToInt = map[string]int{ + "unavailable": 1, + "online": 2, + "offline": 3, +} + +var PresenceToString = map[int]string{ + 1: "unavailable", + 2: "online", + 3: "offline", +} diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index ff78bfb9d..19fcfc150 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -9,10 +9,10 @@ import ( func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(), - "s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(), - "s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(), + "s3_1_0_0_0_0_2_0_5": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5}.String(), + "s3_1_2_3_5_0_0_0_6": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass {