mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
Missing files
This commit is contained in:
parent
5249226402
commit
b404ed5c44
128
clientapi/routing/presence.go
Normal file
128
clientapi/routing/presence.go
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package routing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type presenceReq struct {
|
||||
Presence string `json:"presence"`
|
||||
StatusMsg *string `json:"status_msg,omitempty"`
|
||||
}
|
||||
|
||||
func SetPresence(
|
||||
req *http.Request,
|
||||
device *api.Device,
|
||||
producer *producers.SyncAPIProducer,
|
||||
userID string,
|
||||
) util.JSONResponse {
|
||||
if device.UserID != userID {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("Unable to set presence for other user."),
|
||||
}
|
||||
}
|
||||
var presence presenceReq
|
||||
parseErr := httputil.UnmarshalJSONRequest(req, &presence)
|
||||
if parseErr != nil {
|
||||
return *parseErr
|
||||
}
|
||||
p := strings.ToLower(presence.Presence)
|
||||
if _, ok := types.PresenceToInt[p]; !ok {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", p)),
|
||||
}
|
||||
}
|
||||
|
||||
err := producer.SendPresence(req.Context(), userID, presence.Presence, presence.StatusMsg)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to update presence")
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
JSON: jsonerror.InternalServerError(),
|
||||
}
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
func GetPresence(
|
||||
req *http.Request,
|
||||
device *api.Device,
|
||||
natsClient *nats.Conn,
|
||||
presenceTopic string,
|
||||
userID string,
|
||||
) util.JSONResponse {
|
||||
msg := nats.NewMsg(presenceTopic)
|
||||
msg.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
presence, err := natsClient.RequestMsg(msg, time.Second*10)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("unable to get presence")
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
JSON: jsonerror.InternalServerError(),
|
||||
}
|
||||
}
|
||||
|
||||
statusMsg := presence.Header.Get("status_msg")
|
||||
e := presence.Header.Get("error")
|
||||
if e != "" {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: types.PresenceClientResponse{
|
||||
Presence: "unavailable",
|
||||
},
|
||||
}
|
||||
}
|
||||
lastActive, err := strconv.Atoi(presence.Header.Get("last_active_ts"))
|
||||
if err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
JSON: jsonerror.InternalServerError(),
|
||||
}
|
||||
}
|
||||
|
||||
lastActiveTS := gomatrixserverlib.Timestamp(lastActive)
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: types.PresenceClientResponse{
|
||||
CurrentlyActive: time.Since(lastActiveTS.Time()).Minutes() < 5,
|
||||
LastActiveAgo: time.Since(lastActiveTS.Time()).Milliseconds(),
|
||||
Presence: presence.Header.Get("presence"),
|
||||
StatusMsg: &statusMsg,
|
||||
},
|
||||
}
|
||||
}
|
||||
136
syncapi/consumers/presence.go
Normal file
136
syncapi/consumers/presence.go
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// OutputTypingEventConsumer consumes events that originated in the EDU server.
|
||||
type PresenceConsumer struct {
|
||||
ctx context.Context
|
||||
jetstream nats.JetStreamContext
|
||||
nats *nats.Conn
|
||||
durable string
|
||||
requestTopic string
|
||||
presenceTopic string
|
||||
db storage.Database
|
||||
stream types.StreamProvider
|
||||
notifier *notifier.Notifier
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
}
|
||||
|
||||
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
|
||||
// Call Start() to begin consuming from the EDU server.
|
||||
func NewPresenceConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
js nats.JetStreamContext,
|
||||
nats *nats.Conn,
|
||||
db storage.Database,
|
||||
notifier *notifier.Notifier,
|
||||
stream types.StreamProvider,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
) *PresenceConsumer {
|
||||
return &PresenceConsumer{
|
||||
ctx: process.Context(),
|
||||
nats: nats,
|
||||
jetstream: js,
|
||||
durable: cfg.Matrix.JetStream.Durable("SyncAPIPresenceConsumer"),
|
||||
presenceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||
requestTopic: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence),
|
||||
db: db,
|
||||
notifier: notifier,
|
||||
stream: stream,
|
||||
rsAPI: rsAPI,
|
||||
}
|
||||
}
|
||||
|
||||
// Start consuming typing events.
|
||||
func (s *PresenceConsumer) Start() error {
|
||||
// Normal NATS subscription, used by Request/Reply
|
||||
_, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
|
||||
presence, err := s.db.GetPresence(context.Background(), msg.Header.Get(jetstream.UserID))
|
||||
m := &nats.Msg{
|
||||
Header: nats.Header{},
|
||||
}
|
||||
if err != nil {
|
||||
m.Header.Set("error", err.Error())
|
||||
if err = msg.RespondMsg(m); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
m.Header.Set(jetstream.UserID, presence.UserID)
|
||||
m.Header.Set("presence", presence.ClientFields.Presence)
|
||||
m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
|
||||
m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
|
||||
|
||||
if err = msg.RespondMsg(m); err != nil {
|
||||
return
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return jetstream.JetStreamConsumer(
|
||||
s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage,
|
||||
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
||||
userID := msg.Header.Get(jetstream.UserID)
|
||||
presence := msg.Header.Get("presence")
|
||||
statusMsg := msg.Header.Get("status_msg")
|
||||
timestamp := msg.Header.Get("last_active_ts")
|
||||
fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
|
||||
nilStatusMsg, _ := strconv.ParseBool(msg.Header.Get("status_msg_nil"))
|
||||
|
||||
logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
|
||||
|
||||
ts, err := strconv.Atoi(timestamp)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
newStatusMsg := &statusMsg
|
||||
if nilStatusMsg {
|
||||
newStatusMsg = nil
|
||||
}
|
||||
|
||||
pos, err := s.db.UpdatePresence(ctx, userID, presence, newStatusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
s.stream.Advance(pos)
|
||||
s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID)
|
||||
|
||||
return true
|
||||
}
|
||||
165
syncapi/storage/postgres/presence_table.go
Normal file
165
syncapi/storage/postgres/presence_table.go
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const presenceSchema = `
|
||||
CREATE SEQUENCE IF NOT EXISTS syncapi_presence_id;
|
||||
-- Stores data about presence
|
||||
CREATE TABLE IF NOT EXISTS syncapi_presence (
|
||||
-- The ID
|
||||
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_presence_id'),
|
||||
-- The Matrix user ID
|
||||
user_id TEXT NOT NULL,
|
||||
-- The actual presence
|
||||
presence INT NOT NULL,
|
||||
-- The status message
|
||||
status_msg TEXT,
|
||||
-- The last time an action was received by this user
|
||||
last_active_ts BIGINT NOT NULL,
|
||||
CONSTRAINT presence_presences_unique UNIQUE (user_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id);
|
||||
`
|
||||
|
||||
const upsertPresenceSQL = "" +
|
||||
"INSERT INTO syncapi_presence AS p" +
|
||||
" (user_id, presence, status_msg, last_active_ts)" +
|
||||
" VALUES ($1, $2, $3, $4)" +
|
||||
" ON CONFLICT (user_id)" +
|
||||
" DO UPDATE SET id = nextval('syncapi_presence_id')," +
|
||||
" presence = $2, status_msg = COALESCE($3, p.status_msg), last_active_ts = $4" +
|
||||
" RETURNING id"
|
||||
|
||||
const upsertPresenceFromSyncSQL = "" +
|
||||
"INSERT INTO syncapi_presence AS p" +
|
||||
" (user_id, presence, last_active_ts)" +
|
||||
" VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT (user_id)" +
|
||||
" DO UPDATE SET id = nextval('syncapi_presence_id')," +
|
||||
" presence = $2, last_active_ts = $3" +
|
||||
" RETURNING id"
|
||||
|
||||
const selectPresenceForUserSQL = "" +
|
||||
"SELECT presence, status_msg, last_active_ts" +
|
||||
" FROM syncapi_presence" +
|
||||
" WHERE user_id = $1 LIMIT 1"
|
||||
|
||||
const selectMaxPresenceSQL = "" +
|
||||
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
||||
|
||||
const selectPresenceAfter = "" +
|
||||
" SELECT id, user_id, presence, status_msg, last_active_ts" +
|
||||
" FROM syncapi_presence" +
|
||||
" WHERE id > $1"
|
||||
|
||||
type presenceStatements struct {
|
||||
upsertPresenceStmt *sql.Stmt
|
||||
upsertPresenceFromSyncStmt *sql.Stmt
|
||||
selectPresenceForUsersStmt *sql.Stmt
|
||||
selectMaxPresenceStmt *sql.Stmt
|
||||
selectPresenceAfterStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
|
||||
_, err := db.Exec(presenceSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &presenceStatements{}
|
||||
return s, sqlutil.StatementList{
|
||||
{&s.upsertPresenceStmt, upsertPresenceSQL},
|
||||
{&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL},
|
||||
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
|
||||
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
|
||||
{&s.selectPresenceAfterStmt, selectPresenceAfter},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
// UpsertPresence creates/updates a presence status.
|
||||
func (p *presenceStatements) UpsertPresence(
|
||||
ctx context.Context,
|
||||
txn *sql.Tx,
|
||||
userID string,
|
||||
statusMsg *string,
|
||||
presence string,
|
||||
lastActiveTS gomatrixserverlib.Timestamp,
|
||||
fromSync bool,
|
||||
) (pos types.StreamPosition, err error) {
|
||||
presenceStatusID := types.PresenceToInt[presence]
|
||||
if fromSync {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
||||
err = stmt.QueryRowContext(ctx, userID, presenceStatusID, lastActiveTS).Scan(&pos)
|
||||
} else {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||
err = stmt.QueryRowContext(ctx, userID, presenceStatusID, statusMsg, lastActiveTS).Scan(&pos)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetPresenceForUser returns the current presence of a user.
|
||||
func (p *presenceStatements) GetPresenceForUser(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
userID string,
|
||||
) (*types.Presence, error) {
|
||||
result := &types.Presence{
|
||||
UserID: userID,
|
||||
}
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||
var presenceStatusID int
|
||||
err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||
result.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
|
||||
stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt)
|
||||
err = stmt.QueryRowContext(ctx).Scan(&pos)
|
||||
return
|
||||
}
|
||||
|
||||
// GetPresenceAfter returns the changes presences after a given stream id
|
||||
func (p *presenceStatements) GetPresenceAfter(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
after types.StreamPosition,
|
||||
) (presences map[string]*types.Presence, err error) {
|
||||
presences = make(map[string]*types.Presence)
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, after)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||
var presenceStatusID int
|
||||
for rows.Next() {
|
||||
presence := &types.Presence{}
|
||||
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
presence.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
presences[presence.UserID] = presence
|
||||
}
|
||||
return presences, rows.Err()
|
||||
}
|
||||
180
syncapi/storage/sqlite3/presence_table.go
Normal file
180
syncapi/storage/sqlite3/presence_table.go
Normal file
|
|
@ -0,0 +1,180 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const presenceSchema = `
|
||||
-- Stores data about presence
|
||||
CREATE TABLE IF NOT EXISTS syncapi_presence (
|
||||
-- The ID
|
||||
id BIGINT NOT NULL,
|
||||
-- The Matrix user ID
|
||||
user_id TEXT NOT NULL,
|
||||
-- The actual presence
|
||||
presence INT NOT NULL,
|
||||
-- The status message
|
||||
status_msg TEXT,
|
||||
-- The last time an action was received by this user
|
||||
last_active_ts BIGINT NOT NULL,
|
||||
CONSTRAINT presence_presences_unique UNIQUE (user_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id);
|
||||
`
|
||||
|
||||
const upsertPresenceSQL = "" +
|
||||
"INSERT INTO syncapi_presence AS p" +
|
||||
" (id, user_id, presence, status_msg, last_active_ts)" +
|
||||
" VALUES ($1, $2, $3, $4, $5)" +
|
||||
" ON CONFLICT (user_id)" +
|
||||
" DO UPDATE SET id = $6, " +
|
||||
" presence = $7, status_msg = COALESCE($8, p.status_msg), last_active_ts = $9" +
|
||||
" RETURNING id"
|
||||
|
||||
const upsertPresenceFromSyncSQL = "" +
|
||||
"INSERT INTO syncapi_presence AS p" +
|
||||
" (id, user_id, presence, last_active_ts)" +
|
||||
" VALUES ($1, $2, $3, $4)" +
|
||||
" ON CONFLICT (user_id)" +
|
||||
" DO UPDATE SET id = $5, " +
|
||||
" presence = $6, last_active_ts = $7" +
|
||||
" RETURNING id"
|
||||
|
||||
const selectPresenceForUserSQL = "" +
|
||||
"SELECT presence, status_msg, last_active_ts" +
|
||||
" FROM syncapi_presence" +
|
||||
" WHERE user_id = $1 LIMIT 1"
|
||||
|
||||
const selectMaxPresenceSQL = "" +
|
||||
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
||||
|
||||
const selectPresenceAfter = "" +
|
||||
" SELECT id, user_id, presence, status_msg, last_active_ts" +
|
||||
" FROM syncapi_presence" +
|
||||
" WHERE id > $1"
|
||||
|
||||
type presenceStatements struct {
|
||||
db *sql.DB
|
||||
streamIDStatements *streamIDStatements
|
||||
upsertPresenceStmt *sql.Stmt
|
||||
upsertPresenceFromSyncStmt *sql.Stmt
|
||||
selectPresenceForUsersStmt *sql.Stmt
|
||||
selectMaxPresenceStmt *sql.Stmt
|
||||
selectPresenceAfterStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqlitePresenceTable(db *sql.DB, streamID *streamIDStatements) (*presenceStatements, error) {
|
||||
_, err := db.Exec(presenceSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &presenceStatements{
|
||||
db: db,
|
||||
streamIDStatements: streamID,
|
||||
}
|
||||
return s, sqlutil.StatementList{
|
||||
{&s.upsertPresenceStmt, upsertPresenceSQL},
|
||||
{&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL},
|
||||
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
|
||||
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
|
||||
{&s.selectPresenceAfterStmt, selectPresenceAfter},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
// UpsertPresence creates/updates a presence status.
|
||||
func (p *presenceStatements) UpsertPresence(
|
||||
ctx context.Context,
|
||||
txn *sql.Tx,
|
||||
userID string,
|
||||
statusMsg *string,
|
||||
presence string,
|
||||
lastActiveTS gomatrixserverlib.Timestamp,
|
||||
fromSync bool,
|
||||
) (pos types.StreamPosition, err error) {
|
||||
pos, err = p.streamIDStatements.nextPresenceID(ctx, txn)
|
||||
if err != nil {
|
||||
return pos, err
|
||||
}
|
||||
|
||||
presenceStatusID := types.PresenceToInt[presence]
|
||||
if fromSync {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
||||
err = stmt.QueryRowContext(ctx,
|
||||
pos, userID, presenceStatusID,
|
||||
lastActiveTS, pos,
|
||||
presenceStatusID, lastActiveTS).Scan(&pos)
|
||||
} else {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||
err = stmt.QueryRowContext(ctx,
|
||||
pos, userID, presenceStatusID,
|
||||
statusMsg, lastActiveTS, pos,
|
||||
presenceStatusID, statusMsg, lastActiveTS).Scan(&pos)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetPresenceForUser returns the current presence of a user.
|
||||
func (p *presenceStatements) GetPresenceForUser(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
userID string,
|
||||
) (*types.Presence, error) {
|
||||
result := &types.Presence{
|
||||
UserID: userID,
|
||||
}
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||
var presenceStatusID int
|
||||
err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||
result.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
|
||||
stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt)
|
||||
err = stmt.QueryRowContext(ctx).Scan(&pos)
|
||||
return
|
||||
}
|
||||
|
||||
// GetPresenceAfter returns the changes presences after a given stream id
|
||||
func (p *presenceStatements) GetPresenceAfter(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
after types.StreamPosition,
|
||||
) (presences map[string]*types.Presence, err error) {
|
||||
presences = make(map[string]*types.Presence)
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, after)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||
var presenceStatusID int
|
||||
for rows.Next() {
|
||||
presence := &types.Presence{}
|
||||
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
presence.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
presences[presence.UserID] = presence
|
||||
}
|
||||
return presences, rows.Err()
|
||||
}
|
||||
155
syncapi/streams/stream_presence.go
Normal file
155
syncapi/streams/stream_presence.go
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package streams
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type PresenceStreamProvider struct {
|
||||
StreamProvider
|
||||
}
|
||||
|
||||
func (p *PresenceStreamProvider) Setup() {
|
||||
p.StreamProvider.Setup()
|
||||
|
||||
id, err := p.DB.MaxStreamPositionForPresence(context.Background())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.latest = id
|
||||
}
|
||||
|
||||
func (p *PresenceStreamProvider) CompleteSync(
|
||||
ctx context.Context,
|
||||
req *types.SyncRequest,
|
||||
) types.StreamPosition {
|
||||
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
|
||||
}
|
||||
|
||||
func (p *PresenceStreamProvider) IncrementalSync(
|
||||
ctx context.Context,
|
||||
req *types.SyncRequest,
|
||||
from, to types.StreamPosition,
|
||||
) types.StreamPosition {
|
||||
presences, err := p.DB.PresenceAfter(ctx, from)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
|
||||
return from
|
||||
}
|
||||
|
||||
if len(presences) == 0 {
|
||||
return to
|
||||
}
|
||||
|
||||
// get all joined users
|
||||
rooms, err := p.DB.AllJoinedUsersInRooms(ctx)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("unable to query joined users")
|
||||
}
|
||||
|
||||
sharedUsers := map[string]bool{
|
||||
req.Device.UserID: true,
|
||||
}
|
||||
for roomID := range req.Rooms {
|
||||
roomUsers := rooms[roomID]
|
||||
for i := range roomUsers {
|
||||
sharedUsers[roomUsers[i]] = true
|
||||
}
|
||||
}
|
||||
|
||||
// add newly joined rooms user presences
|
||||
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
|
||||
for _, roomID := range newlyJoined {
|
||||
roomUsers := rooms[roomID]
|
||||
for i := range roomUsers {
|
||||
sharedUsers[roomUsers[i]] = true
|
||||
// we already got a presence from this user
|
||||
if _, ok := presences[roomUsers[i]]; ok {
|
||||
continue
|
||||
}
|
||||
presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("unable to query presence for user")
|
||||
if err == sql.ErrNoRows {
|
||||
continue
|
||||
}
|
||||
return to
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lastPos := to
|
||||
for i := range presences {
|
||||
presence := presences[i]
|
||||
// Ignore users we don't share a room with
|
||||
if !sharedUsers[presence.UserID] {
|
||||
continue
|
||||
}
|
||||
presence.ClientFields.LastActiveAgo = time.Since(presence.LastActiveTS.Time()).Milliseconds()
|
||||
presence.ClientFields.CurrentlyActive = time.Since(presence.LastActiveTS.Time()).Minutes() < 5
|
||||
|
||||
content, err := json.Marshal(presence.ClientFields)
|
||||
if err != nil {
|
||||
return from
|
||||
}
|
||||
|
||||
req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{
|
||||
Content: content,
|
||||
Sender: presence.UserID,
|
||||
Type: gomatrixserverlib.MPresence,
|
||||
})
|
||||
if presence.StreamPos > lastPos {
|
||||
lastPos = presence.StreamPos
|
||||
}
|
||||
}
|
||||
|
||||
return lastPos
|
||||
}
|
||||
|
||||
func joinedRooms(res *types.Response, userID string) []string {
|
||||
var roomIDs []string
|
||||
for roomID, join := range res.Rooms.Join {
|
||||
// we would expect to see our join event somewhere if we newly joined the room.
|
||||
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
|
||||
newlyJoined := membershipEventPresent(join.State.Events, userID)
|
||||
if newlyJoined {
|
||||
roomIDs = append(roomIDs, roomID)
|
||||
continue
|
||||
}
|
||||
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
|
||||
if newlyJoined {
|
||||
roomIDs = append(roomIDs, roomID)
|
||||
}
|
||||
}
|
||||
return roomIDs
|
||||
}
|
||||
|
||||
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
|
||||
for _, ev := range events {
|
||||
// it's enough to know that we have our member event here, don't need to check membership content
|
||||
// as it's implied by being in the respective section of the sync response.
|
||||
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
104
syncapi/sync/requestpool_test.go
Normal file
104
syncapi/sync/requestpool_test.go
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type dummyPublisher struct {
|
||||
count int
|
||||
}
|
||||
|
||||
func (d *dummyPublisher) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) {
|
||||
d.count++
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestRequestPool_updatePresence(t *testing.T) {
|
||||
type args struct {
|
||||
presence string
|
||||
device *api.Device
|
||||
sleep time.Duration
|
||||
}
|
||||
publisher := &dummyPublisher{}
|
||||
syncMap := sync.Map{}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantIncrease bool
|
||||
}{
|
||||
{
|
||||
name: "new presence is published",
|
||||
wantIncrease: true,
|
||||
args: args{
|
||||
device: &api.Device{UserID: "dummy"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "presence not published, no change",
|
||||
args: args{
|
||||
device: &api.Device{UserID: "dummy"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "new presence is published dummy2",
|
||||
wantIncrease: true,
|
||||
args: args{
|
||||
device: &api.Device{UserID: "dummy2"},
|
||||
presence: "online",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "different presence is published dummy2",
|
||||
wantIncrease: true,
|
||||
args: args{
|
||||
device: &api.Device{UserID: "dummy2"},
|
||||
presence: "unavailable",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "same presence is not published dummy2",
|
||||
args: args{
|
||||
device: &api.Device{UserID: "dummy2"},
|
||||
presence: "unavailable",
|
||||
sleep: time.Millisecond * 150,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "same presence is published after being deleted",
|
||||
wantIncrease: true,
|
||||
args: args{
|
||||
device: &api.Device{UserID: "dummy2"},
|
||||
presence: "unavailable",
|
||||
},
|
||||
},
|
||||
}
|
||||
rp := &RequestPool{
|
||||
presence: syncMap,
|
||||
jetstream: publisher,
|
||||
cfg: &config.SyncAPI{
|
||||
Matrix: &config.Global{
|
||||
JetStream: config.JetStream{
|
||||
TopicPrefix: "Dendrite",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
go rp.cleanPresence(time.Millisecond * 50)
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
beforeCount := publisher.count
|
||||
rp.updatePresence(tt.args.presence, tt.args.device)
|
||||
if tt.wantIncrease && publisher.count <= beforeCount {
|
||||
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
|
||||
}
|
||||
time.Sleep(tt.args.sleep)
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue