mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
Make presence types const and use stringer for it
This commit is contained in:
parent
02c58e8ce1
commit
9b6ee2d475
|
|
@ -177,11 +177,11 @@ func (p *SyncAPIProducer) SendTyping(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendPresence(
|
func (p *SyncAPIProducer) SendPresence(
|
||||||
ctx context.Context, userID, presence string, statusMsg *string,
|
ctx context.Context, userID string, presence types.Presence, statusMsg *string,
|
||||||
) error {
|
) error {
|
||||||
m := nats.NewMsg(p.TopicPresenceEvent)
|
m := nats.NewMsg(p.TopicPresenceEvent)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
m.Header.Set("presence", presence)
|
m.Header.Set("presence", presence.String())
|
||||||
if statusMsg != nil {
|
if statusMsg != nil {
|
||||||
m.Header.Set("status_msg", *statusMsg)
|
m.Header.Set("status_msg", *statusMsg)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
|
@ -63,15 +62,16 @@ func SetPresence(
|
||||||
if parseErr != nil {
|
if parseErr != nil {
|
||||||
return *parseErr
|
return *parseErr
|
||||||
}
|
}
|
||||||
p := strings.ToLower(presence.Presence)
|
|
||||||
if _, ok := types.PresenceToInt[p]; !ok {
|
presenceStatus, ok := types.PresenceFromString(presence.Presence)
|
||||||
|
if !ok {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", p)),
|
JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", presence.Presence)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := producer.SendPresence(req.Context(), userID, presence.Presence, presence.StatusMsg)
|
err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("failed to update presence")
|
log.WithError(err).Errorf("failed to update presence")
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
@ -112,7 +112,7 @@ func GetPresence(
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: types.PresenceClientResponse{
|
JSON: types.PresenceClientResponse{
|
||||||
Presence: "unavailable",
|
Presence: types.PresenceUnavailable.String(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -124,7 +124,7 @@ func GetPresence(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p := types.Presence{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
|
p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
|
||||||
currentlyActive := p.CurrentlyActive()
|
currentlyActive := p.CurrentlyActive()
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) b
|
||||||
statusMsg = &status
|
statusMsg = &status
|
||||||
}
|
}
|
||||||
|
|
||||||
p := types.Presence{LastActiveTS: gomatrixserverlib.Timestamp(ts)}
|
p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(ts)}
|
||||||
|
|
||||||
content := fedTypes.Presence{
|
content := fedTypes.Presence{
|
||||||
Push: []fedTypes.PresenceContent{
|
Push: []fedTypes.PresenceContent{
|
||||||
|
|
|
||||||
|
|
@ -146,11 +146,11 @@ func (p *SyncAPIProducer) SendTyping(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendPresence(
|
func (p *SyncAPIProducer) SendPresence(
|
||||||
ctx context.Context, userID, presence string, statusMsg *string, lastActiveAgo int64,
|
ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveAgo int64,
|
||||||
) error {
|
) error {
|
||||||
m := nats.NewMsg(p.TopicPresenceEvent)
|
m := nats.NewMsg(p.TopicPresenceEvent)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
m.Header.Set("presence", presence)
|
m.Header.Set("presence", presence.String())
|
||||||
if statusMsg != nil {
|
if statusMsg != nil {
|
||||||
m.Header.Set("status_msg", *statusMsg)
|
m.Header.Set("status_msg", *statusMsg)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import (
|
||||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
@ -406,7 +407,12 @@ func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, content := range payload.Push {
|
for _, content := range payload.Push {
|
||||||
if err := t.producer.SendPresence(ctx, content.UserID, content.Presence, content.StatusMsg, content.LastActiveAgo); err != nil {
|
presence, ok := syncTypes.PresenceFromString(content.Presence)
|
||||||
|
if !ok {
|
||||||
|
logrus.Warnf("invalid presence '%s', skipping.", content.Presence)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -139,8 +139,9 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
||||||
newMsg := msg.Header.Get("status_msg")
|
newMsg := msg.Header.Get("status_msg")
|
||||||
statusMsg = &newMsg
|
statusMsg = &newMsg
|
||||||
}
|
}
|
||||||
|
// OK is already checked, so no need to do it again
|
||||||
pos, err := s.db.UpdatePresence(ctx, userID, presence, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
|
p, _ := types.PresenceFromString(presence)
|
||||||
|
pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,10 +16,10 @@ package producers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
@ -31,11 +31,11 @@ type FederationAPIPresenceProducer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FederationAPIPresenceProducer) SendPresence(
|
func (f *FederationAPIPresenceProducer) SendPresence(
|
||||||
userID, presence string, statusMsg *string,
|
userID string, presence types.Presence, statusMsg *string,
|
||||||
) error {
|
) error {
|
||||||
msg := nats.NewMsg(f.Topic)
|
msg := nats.NewMsg(f.Topic)
|
||||||
msg.Header.Set(jetstream.UserID, userID)
|
msg.Header.Set(jetstream.UserID, userID)
|
||||||
msg.Header.Set("presence", strings.ToLower(presence))
|
msg.Header.Set("presence", presence.String())
|
||||||
msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
|
msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
|
||||||
msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
|
msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -153,8 +153,8 @@ type Database interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Presence interface {
|
type Presence interface {
|
||||||
UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
||||||
GetPresence(ctx context.Context, userID string) (*types.Presence, error)
|
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
||||||
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error)
|
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error)
|
||||||
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -103,17 +103,16 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
txn *sql.Tx,
|
txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
statusMsg *string,
|
statusMsg *string,
|
||||||
presence string,
|
presence types.Presence,
|
||||||
lastActiveTS gomatrixserverlib.Timestamp,
|
lastActiveTS gomatrixserverlib.Timestamp,
|
||||||
fromSync bool,
|
fromSync bool,
|
||||||
) (pos types.StreamPosition, err error) {
|
) (pos types.StreamPosition, err error) {
|
||||||
presenceStatusID := types.PresenceToInt[presence]
|
|
||||||
if fromSync {
|
if fromSync {
|
||||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
||||||
err = stmt.QueryRowContext(ctx, userID, presenceStatusID, lastActiveTS).Scan(&pos)
|
err = stmt.QueryRowContext(ctx, userID, presence, lastActiveTS).Scan(&pos)
|
||||||
} else {
|
} else {
|
||||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||||
err = stmt.QueryRowContext(ctx, userID, presenceStatusID, statusMsg, lastActiveTS).Scan(&pos)
|
err = stmt.QueryRowContext(ctx, userID, presence, statusMsg, lastActiveTS).Scan(&pos)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -122,14 +121,13 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
func (p *presenceStatements) GetPresenceForUser(
|
func (p *presenceStatements) GetPresenceForUser(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
) (*types.Presence, error) {
|
) (*types.PresenceInternal, error) {
|
||||||
result := &types.Presence{
|
result := &types.PresenceInternal{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
}
|
}
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
var presenceStatusID int
|
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
result.ClientFields.Presence = result.Presence.String()
|
||||||
result.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -143,8 +141,8 @@ func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx)
|
||||||
func (p *presenceStatements) GetPresenceAfter(
|
func (p *presenceStatements) GetPresenceAfter(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
after types.StreamPosition,
|
after types.StreamPosition,
|
||||||
) (presences map[string]*types.Presence, err error) {
|
) (presences map[string]*types.PresenceInternal, err error) {
|
||||||
presences = make(map[string]*types.Presence)
|
presences = make(map[string]*types.PresenceInternal)
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, after)
|
rows, err := stmt.QueryContext(ctx, after)
|
||||||
|
|
@ -152,14 +150,13 @@ func (p *presenceStatements) GetPresenceAfter(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||||
var presenceStatusID int
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
presence := &types.Presence{}
|
qryRes := &types.PresenceInternal{}
|
||||||
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
presence.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
qryRes.ClientFields.Presence = qryRes.Presence.String()
|
||||||
presences[presence.UserID] = presence
|
presences[qryRes.UserID] = qryRes
|
||||||
}
|
}
|
||||||
return presences, rows.Err()
|
return presences, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1004,15 +1004,15 @@ func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID s
|
||||||
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Database) UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||||
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Database) GetPresence(ctx context.Context, userID string) (*types.Presence, error) {
|
func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||||
return s.Presence.GetPresenceForUser(ctx, nil, userID)
|
return s.Presence.GetPresenceForUser(ctx, nil, userID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error) {
|
func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
|
||||||
return s.Presence.GetPresenceAfter(ctx, nil, after)
|
return s.Presence.GetPresenceAfter(ctx, nil, after)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -107,7 +107,7 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
txn *sql.Tx,
|
txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
statusMsg *string,
|
statusMsg *string,
|
||||||
presence string,
|
presence types.Presence,
|
||||||
lastActiveTS gomatrixserverlib.Timestamp,
|
lastActiveTS gomatrixserverlib.Timestamp,
|
||||||
fromSync bool,
|
fromSync bool,
|
||||||
) (pos types.StreamPosition, err error) {
|
) (pos types.StreamPosition, err error) {
|
||||||
|
|
@ -116,19 +116,18 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
return pos, err
|
return pos, err
|
||||||
}
|
}
|
||||||
|
|
||||||
presenceStatusID := types.PresenceToInt[presence]
|
|
||||||
if fromSync {
|
if fromSync {
|
||||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
||||||
err = stmt.QueryRowContext(ctx,
|
err = stmt.QueryRowContext(ctx,
|
||||||
pos, userID, presenceStatusID,
|
pos, userID, presence,
|
||||||
lastActiveTS, pos,
|
lastActiveTS, pos,
|
||||||
presenceStatusID, lastActiveTS).Scan(&pos)
|
presence, lastActiveTS).Scan(&pos)
|
||||||
} else {
|
} else {
|
||||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||||
err = stmt.QueryRowContext(ctx,
|
err = stmt.QueryRowContext(ctx,
|
||||||
pos, userID, presenceStatusID,
|
pos, userID, presence,
|
||||||
statusMsg, lastActiveTS, pos,
|
statusMsg, lastActiveTS, pos,
|
||||||
presenceStatusID, statusMsg, lastActiveTS).Scan(&pos)
|
presence, statusMsg, lastActiveTS).Scan(&pos)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -137,14 +136,13 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
func (p *presenceStatements) GetPresenceForUser(
|
func (p *presenceStatements) GetPresenceForUser(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
) (*types.Presence, error) {
|
) (*types.PresenceInternal, error) {
|
||||||
result := &types.Presence{
|
result := &types.PresenceInternal{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
}
|
}
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
var presenceStatusID int
|
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
result.ClientFields.Presence = result.Presence.String()
|
||||||
result.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -158,8 +156,8 @@ func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx)
|
||||||
func (p *presenceStatements) GetPresenceAfter(
|
func (p *presenceStatements) GetPresenceAfter(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
after types.StreamPosition,
|
after types.StreamPosition,
|
||||||
) (presences map[string]*types.Presence, err error) {
|
) (presences map[string]*types.PresenceInternal, err error) {
|
||||||
presences = make(map[string]*types.Presence)
|
presences = make(map[string]*types.PresenceInternal)
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, after)
|
rows, err := stmt.QueryContext(ctx, after)
|
||||||
|
|
@ -167,14 +165,13 @@ func (p *presenceStatements) GetPresenceAfter(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||||
var presenceStatusID int
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
presence := &types.Presence{}
|
qryRes := &types.PresenceInternal{}
|
||||||
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
presence.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
qryRes.ClientFields.Presence = qryRes.Presence.String()
|
||||||
presences[presence.UserID] = presence
|
presences[qryRes.UserID] = qryRes
|
||||||
}
|
}
|
||||||
return presences, rows.Err()
|
return presences, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -184,8 +184,8 @@ type NotificationData interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Presence interface {
|
type Presence interface {
|
||||||
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
||||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.Presence, err error)
|
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
||||||
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
||||||
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.Presence, err error)
|
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -116,7 +116,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
pres, ok := p.cache.Load(cacheKey)
|
pres, ok := p.cache.Load(cacheKey)
|
||||||
if ok {
|
if ok {
|
||||||
// skip already sent presence
|
// skip already sent presence
|
||||||
prevPresence := pres.(*types.Presence)
|
prevPresence := pres.(*types.PresenceInternal)
|
||||||
currentlyActive := prevPresence.CurrentlyActive()
|
currentlyActive := prevPresence.CurrentlyActive()
|
||||||
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
|
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
|
||||||
if skip {
|
if skip {
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ type RequestPool struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type PresencePublisher interface {
|
type PresencePublisher interface {
|
||||||
SendPresence(userID, presence string, statusMsg *string) error
|
SendPresence(userID string, presence types.Presence, statusMsg *string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRequestPool makes a new RequestPool
|
// NewRequestPool makes a new RequestPool
|
||||||
|
|
@ -97,9 +97,9 @@ func (rp *RequestPool) cleanLastSeen() {
|
||||||
func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Duration) {
|
func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Duration) {
|
||||||
for {
|
for {
|
||||||
rp.presence.Range(func(key interface{}, v interface{}) bool {
|
rp.presence.Range(func(key interface{}, v interface{}) bool {
|
||||||
p := v.(types.Presence)
|
p := v.(types.PresenceInternal)
|
||||||
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
|
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
|
||||||
rp.updatePresence(db, "unavailable", p.UserID)
|
rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID)
|
||||||
rp.presence.Delete(key)
|
rp.presence.Delete(key)
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|
@ -114,13 +114,20 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if presence == "" {
|
if presence == "" {
|
||||||
presence = "online"
|
presence = types.PresenceOnline.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
newPresence := types.Presence{
|
presenceID, ok := types.PresenceFromString(presence)
|
||||||
|
if !ok { // this should almost never happen
|
||||||
|
logrus.Errorf("unknown presence '%s'", presence)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
newPresence := types.PresenceInternal{
|
||||||
ClientFields: types.PresenceClientResponse{
|
ClientFields: types.PresenceClientResponse{
|
||||||
Presence: presence,
|
Presence: presenceID.String(),
|
||||||
},
|
},
|
||||||
|
Presence: presenceID,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
|
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
|
||||||
}
|
}
|
||||||
|
|
@ -128,7 +135,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
// avoid spamming presence updates when syncing
|
// avoid spamming presence updates when syncing
|
||||||
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
|
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
|
||||||
if ok {
|
if ok {
|
||||||
p := existingPresence.(types.Presence)
|
p := existingPresence.(types.PresenceInternal)
|
||||||
if p.ClientFields.Presence == newPresence.ClientFields.Presence {
|
if p.ClientFields.Presence == newPresence.ClientFields.Presence {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -140,7 +147,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := rp.producer.SendPresence(userID, strings.ToLower(presence), dbPresence.ClientFields.StatusMsg); err != nil {
|
if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
|
||||||
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,23 +15,23 @@ type dummyPublisher struct {
|
||||||
count int
|
count int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dummyPublisher) SendPresence(userID, presence string, statusMsg *string) error {
|
func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
|
||||||
d.count++
|
d.count++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyDB struct{}
|
type dummyDB struct{}
|
||||||
|
|
||||||
func (d dummyDB) UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.Presence, error) {
|
func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||||
return &types.Presence{}, nil
|
return &types.PresenceInternal{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error) {
|
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
|
||||||
return map[string]*types.Presence{}, nil
|
return map[string]*types.PresenceInternal{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
|
func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
|
||||||
|
|
|
||||||
75
syncapi/types/presence.go
Normal file
75
syncapi/types/presence.go
Normal file
|
|
@ -0,0 +1,75 @@
|
||||||
|
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:generate stringer -type=Presence -linecomment
|
||||||
|
type Presence uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
PresenceUnavailable Presence = iota + 1 // unavailable
|
||||||
|
PresenceOnline // online
|
||||||
|
PresenceOffline // offline
|
||||||
|
)
|
||||||
|
|
||||||
|
// PresenceFromString returns the integer representation of the given input presence.
|
||||||
|
// Returns false for ok, if input is not a valid presence value.
|
||||||
|
func PresenceFromString(input string) (p Presence, ok bool) {
|
||||||
|
for i := 0; i < len(_Presence_index)-1; i++ {
|
||||||
|
l, r := _Presence_index[i], _Presence_index[i+1]
|
||||||
|
if strings.EqualFold(input, _Presence_name[l:r]) {
|
||||||
|
return Presence(i + 1), true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
type PresenceInternal struct {
|
||||||
|
ClientFields PresenceClientResponse
|
||||||
|
StreamPos StreamPosition `json:"-"`
|
||||||
|
UserID string `json:"-"`
|
||||||
|
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
|
||||||
|
Presence Presence `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Equals compares p1 with p2.
|
||||||
|
func (p1 *PresenceInternal) Equals(p2 *PresenceInternal) bool {
|
||||||
|
return p1.ClientFields.Presence == p2.ClientFields.Presence &&
|
||||||
|
p1.ClientFields.StatusMsg == p2.ClientFields.StatusMsg &&
|
||||||
|
p1.UserID == p2.UserID
|
||||||
|
}
|
||||||
|
|
||||||
|
// CurrentlyActive returns the current active state.
|
||||||
|
func (p *PresenceInternal) CurrentlyActive() bool {
|
||||||
|
return time.Since(p.LastActiveTS.Time()).Minutes() < 5
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastActiveAgo returns the time since the LastActiveTS in milliseconds.
|
||||||
|
func (p *PresenceInternal) LastActiveAgo() int64 {
|
||||||
|
return time.Since(p.LastActiveTS.Time()).Milliseconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
type PresenceClientResponse struct {
|
||||||
|
CurrentlyActive *bool `json:"currently_active,omitempty"`
|
||||||
|
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
|
||||||
|
Presence string `json:"presence"`
|
||||||
|
StatusMsg *string `json:"status_msg,omitempty"`
|
||||||
|
}
|
||||||
26
syncapi/types/presence_string.go
Normal file
26
syncapi/types/presence_string.go
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
// Code generated by "stringer -type=Presence -linecomment"; DO NOT EDIT.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
import "strconv"
|
||||||
|
|
||||||
|
func _() {
|
||||||
|
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||||
|
// Re-run the stringer command to generate them again.
|
||||||
|
var x [1]struct{}
|
||||||
|
_ = x[PresenceUnavailable-1]
|
||||||
|
_ = x[PresenceOnline-2]
|
||||||
|
_ = x[PresenceOffline-3]
|
||||||
|
}
|
||||||
|
|
||||||
|
const _Presence_name = "unavailableonlineoffline"
|
||||||
|
|
||||||
|
var _Presence_index = [...]uint8{0, 11, 17, 24}
|
||||||
|
|
||||||
|
func (i Presence) String() string {
|
||||||
|
i -= 1
|
||||||
|
if i >= Presence(len(_Presence_index)-1) {
|
||||||
|
return "Presence(" + strconv.FormatInt(int64(i+1), 10) + ")"
|
||||||
|
}
|
||||||
|
return _Presence_name[_Presence_index[i]:_Presence_index[i+1]]
|
||||||
|
}
|
||||||
42
syncapi/types/presence_test.go
Normal file
42
syncapi/types/presence_test.go
Normal file
|
|
@ -0,0 +1,42 @@
|
||||||
|
package types
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestPresenceFromString(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
input string
|
||||||
|
wantStatus Presence
|
||||||
|
wantOk bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "presence unavailable",
|
||||||
|
input: "unavailable",
|
||||||
|
wantStatus: PresenceUnavailable,
|
||||||
|
wantOk: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "presence online",
|
||||||
|
input: "OnLINE",
|
||||||
|
wantStatus: PresenceOnline,
|
||||||
|
wantOk: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unknown presence",
|
||||||
|
input: "unknown",
|
||||||
|
wantStatus: 0,
|
||||||
|
wantOk: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, got1 := PresenceFromString(tt.input)
|
||||||
|
if got != tt.wantStatus {
|
||||||
|
t.Errorf("PresenceFromString() got = %v, want %v", got, tt.wantStatus)
|
||||||
|
}
|
||||||
|
if got1 != tt.wantOk {
|
||||||
|
t.Errorf("PresenceFromString() got1 = %v, want %v", got1, tt.wantOk)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -20,7 +20,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -519,46 +518,3 @@ type OutputSendToDeviceEvent struct {
|
||||||
DeviceID string `json:"device_id"`
|
DeviceID string `json:"device_id"`
|
||||||
gomatrixserverlib.SendToDeviceEvent
|
gomatrixserverlib.SendToDeviceEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
type Presence struct {
|
|
||||||
ClientFields PresenceClientResponse
|
|
||||||
StreamPos StreamPosition `json:"-"`
|
|
||||||
UserID string `json:"-"`
|
|
||||||
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Equals compares p1 with p2.
|
|
||||||
func (p1 *Presence) Equals(p2 *Presence) bool {
|
|
||||||
return p1.ClientFields.Presence == p2.ClientFields.Presence &&
|
|
||||||
p1.ClientFields.StatusMsg == p2.ClientFields.StatusMsg &&
|
|
||||||
p1.UserID == p2.UserID
|
|
||||||
}
|
|
||||||
|
|
||||||
// CurrentlyActive returns the current active state.
|
|
||||||
func (p *Presence) CurrentlyActive() bool {
|
|
||||||
return time.Since(p.LastActiveTS.Time()).Minutes() < 5
|
|
||||||
}
|
|
||||||
|
|
||||||
// LastActiveAgo returns the time since the LastActiveTS in milliseconds.
|
|
||||||
func (p *Presence) LastActiveAgo() int64 {
|
|
||||||
return time.Since(p.LastActiveTS.Time()).Milliseconds()
|
|
||||||
}
|
|
||||||
|
|
||||||
type PresenceClientResponse struct {
|
|
||||||
CurrentlyActive *bool `json:"currently_active,omitempty"`
|
|
||||||
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
|
|
||||||
Presence string `json:"presence"`
|
|
||||||
StatusMsg *string `json:"status_msg,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
var PresenceToInt = map[string]int{
|
|
||||||
"unavailable": 1,
|
|
||||||
"online": 2,
|
|
||||||
"offline": 3,
|
|
||||||
}
|
|
||||||
|
|
||||||
var PresenceToString = map[int]string{
|
|
||||||
1: "unavailable",
|
|
||||||
2: "online",
|
|
||||||
3: "offline",
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue