Fix presence (#16)

* Send expired presence to clients correctly

* Adjust logging for datadog integration

* Fix tests
This commit is contained in:
PiotrKozimor 2022-08-02 11:54:02 +02:00 committed by GitHub
parent b25fa5d683
commit ac556d93d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 110 additions and 54 deletions

View file

@ -16,6 +16,7 @@ package main
import (
"flag"
"log"
"os"
"github.com/matrix-org/dendrite/appservice"
@ -47,6 +48,16 @@ var (
func main() {
cfg := setup.ParseFlags(true)
httpAddr := config.HTTPAddress("http://" + *httpBindAddr)
for _, logging := range cfg.Logging {
if logging.Type == "std" {
level, err := logrus.ParseLevel(logging.Level)
if err != nil {
log.Fatal(err)
}
logrus.SetLevel(level)
logrus.SetFormatter(&logrus.JSONFormatter{})
}
}
httpsAddr := config.HTTPAddress("https://" + *httpsBindAddr)
httpAPIAddr := httpAddr
options := []basepkg.BaseDendriteOptions{}

View file

@ -131,9 +131,9 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
logrus.Fatalf("Failed to start due to configuration errors")
}
internal.SetupStdLogging()
internal.SetupHookLogging(cfg.Logging, componentName)
internal.SetupPprof()
// internal.SetupStdLogging()
// internal.SetupHookLogging(cfg.Logging, componentName)
// internal.SetupPprof()
logrus.Infof("Dendrite version %s", internal.VersionString())

View file

@ -21,7 +21,6 @@ import (
"regexp"
"strings"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v2"
)
@ -339,11 +338,11 @@ func checkErrors(config *AppServiceAPI, derived *Derived) (err error) {
// TODO: Remove once rate_limited is implemented
if appservice.RateLimited {
log.Warn("WARNING: Application service option rate_limited is currently unimplemented")
// log.Warn("WARNING: Application service option rate_limited is currently unimplemented")
}
// TODO: Remove once protocols is implemented
if len(appservice.Protocols) > 0 {
log.Warn("WARNING: Application service option protocols is currently unimplemented")
// log.Warn("WARNING: Application service option protocols is currently unimplemented")
}
}
@ -369,7 +368,7 @@ func validateNamespace(
// Check if GroupID for the users namespace is in the correct format
if key == "users" && namespace.GroupID != "" {
// TODO: Remove once group_id is implemented
log.Warn("WARNING: Application service option group_id is currently unimplemented")
// log.Warn("WARNING: Application service option group_id is currently unimplemented")
correctFormat := groupIDRegexp.MatchString(namespace.GroupID)
if !correctFormat {

View file

@ -164,4 +164,6 @@ type Presence interface {
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
ExpirePresence(ctx context.Context) ([]types.PresenceNotify, error)
UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error
}

View file

@ -62,6 +62,10 @@ const upsertPresenceFromSyncSQL = "" +
" presence = $2, last_active_ts = $3" +
" RETURNING id"
const updateLastActiveSQL = `UPDATE syncapi_presence
SET last_active_ts = $1
WHERE user_id = $2`
const selectPresenceForUserSQL = "" +
"SELECT presence, status_msg, last_active_ts" +
" FROM syncapi_presence" +
@ -80,9 +84,10 @@ const expirePresenceSQL = `UPDATE syncapi_presence SET
id = nextval('syncapi_presence_id'),
presence = 3
WHERE
to_timestamp(last_active_ts / 1000) < NOW() - INTERVAL '5 minutes'
to_timestamp(last_active_ts / 1000) < NOW() - INTERVAL` + types.PresenceExpire + `
AND
presence != 3
RETURNING id, user_id
`
type presenceStatements struct {
@ -92,6 +97,7 @@ type presenceStatements struct {
selectMaxPresenceStmt *sql.Stmt
selectPresenceAfterStmt *sql.Stmt
expirePresenceStmt *sql.Stmt
updateLastActiveStmt *sql.Stmt
}
func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
@ -107,6 +113,7 @@ func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
{&s.selectPresenceAfterStmt, selectPresenceAfter},
{&s.expirePresenceStmt, expirePresenceSQL},
{&s.updateLastActiveStmt, updateLastActiveSQL},
}.Prepare(db)
}
@ -180,7 +187,22 @@ func (p *presenceStatements) GetPresenceAfter(
func (p *presenceStatements) ExpirePresence(
ctx context.Context,
) error {
_, err := p.expirePresenceStmt.Exec()
) ([]types.PresenceNotify, error) {
rows, err := p.expirePresenceStmt.QueryContext(ctx)
presences := make([]types.PresenceNotify, 0)
i := 0
for rows.Next() {
presences = append(presences, types.PresenceNotify{})
err = rows.Scan(&presences[i].StreamPos, &presences[i].UserID)
if err != nil {
return nil, err
}
i++
}
return presences, err
}
func (p *presenceStatements) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error {
_, err := p.updateLastActiveStmt.Exec(&lastActiveTs, &userId)
return err
}

View file

@ -16,9 +16,7 @@
package postgres
import (
"context"
"database/sql"
"time"
// Import the postgres database driver.
_ "github.com/lib/pq"
@ -27,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/sirupsen/logrus"
)
// SyncServerDatasource represents a sync server datasource which manages
@ -125,17 +122,5 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
Ignores: ignores,
Presence: presence,
}
go func() {
ctx := context.Background()
for {
err := d.Database.Presence.ExpirePresence(ctx)
if err != nil {
logrus.WithError(err).Error("failed to expire presence")
} else {
logrus.Info("expired presence")
}
time.Sleep(time.Minute)
}
}()
return &d, nil
}

View file

@ -1068,6 +1068,10 @@ func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre
return s.Presence.GetMaxPresenceID(ctx, nil)
}
func (s *Database) ExpirePresence(ctx context.Context) error {
func (s *Database) ExpirePresence(ctx context.Context) ([]types.PresenceNotify, error) {
return s.Presence.ExpirePresence(ctx)
}
func (s *Database) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error {
return s.Presence.UpdateLastActive(ctx, userId, lastActiveTs)
}

View file

@ -183,7 +183,12 @@ func (p *presenceStatements) GetPresenceAfter(
func (p *presenceStatements) ExpirePresence(
ctx context.Context,
) error {
) ([]types.PresenceNotify, error) {
// TODO implement
return nil, nil
}
func (p *presenceStatements) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error {
// TODO implement
return nil
}

View file

@ -191,5 +191,6 @@ type Presence interface {
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (presences map[string]*types.PresenceInternal, err error)
ExpirePresence(ctx context.Context) error
ExpirePresence(ctx context.Context) ([]types.PresenceNotify, error)
UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error
}

View file

@ -17,7 +17,6 @@ package streams
import (
"context"
"encoding/json"
"sync"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/types"
@ -26,8 +25,6 @@ import (
type PresenceStreamProvider struct {
StreamProvider
// cache contains previously sent presence updates to avoid unneeded updates
cache sync.Map
notifier *notifier.Notifier
}
@ -103,18 +100,6 @@ func (p *PresenceStreamProvider) IncrementalSync(
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
continue
}
cacheKey := req.Device.UserID + req.Device.ID + presence.UserID
pres, ok := p.cache.Load(cacheKey)
if ok {
// skip already sent presence
prevPresence := pres.(*types.PresenceInternal)
currentlyActive := prevPresence.CurrentlyActive()
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
if skip {
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
continue
}
}
if _, known := types.PresenceFromString(presence.ClientFields.Presence); known {
presence.ClientFields.LastActiveAgo = presence.LastActiveAgo()
@ -142,7 +127,6 @@ func (p *PresenceStreamProvider) IncrementalSync(
if len(req.Response.Presence.Events) == req.Filter.Presence.Limit {
break
}
p.cache.Store(cacheKey, presence)
}
if len(req.Response.Presence.Events) == 0 {

View file

@ -49,7 +49,7 @@ type RequestPool struct {
keyAPI keyapi.SyncKeyAPI
rsAPI roomserverAPI.SyncRoomserverAPI
lastseen *sync.Map
presence *sync.Map
Presence *sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
producer PresencePublisher
@ -84,14 +84,14 @@ func NewRequestPool(
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: &sync.Map{},
presence: &sync.Map{},
Presence: &sync.Map{},
streams: streams,
Notifier: notifier,
producer: producer,
consumer: consumer,
}
go rp.cleanLastSeen()
go rp.cleanPresence(db, time.Minute*5)
// go rp.cleanPresence(db, time.Minute*5)
return rp
}
@ -110,11 +110,11 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
return
}
for {
rp.presence.Range(func(key interface{}, v interface{}) bool {
rp.Presence.Range(func(key interface{}, v interface{}) bool {
p := v.(types.PresenceInternal)
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID)
rp.presence.Delete(key)
rp.Presence.Delete(key)
}
return true
})
@ -152,12 +152,19 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
}
newPresence.ClientFields.Presence = presenceID.String()
defer rp.presence.Store(userID, newPresence)
defer rp.Presence.Store(userID, newPresence)
// avoid spamming presence updates when syncing
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
existingPresence, ok := rp.Presence.LoadOrStore(userID, newPresence)
if ok {
p := existingPresence.(types.PresenceInternal)
if p.ClientFields.Presence == newPresence.ClientFields.Presence {
if p.ClientFields.Presence == newPresence.ClientFields.Presence && newPresence.LastActiveTS-dbPresence.LastActiveTS < types.PresenceNoOpMs {
return
}
if dbPresence.Presence == types.PresenceOnline && presenceID == types.PresenceOnline && newPresence.LastActiveTS-dbPresence.LastActiveTS >= types.PresenceNoOpMs {
err := db.UpdateLastActive(context.Background(), userID, uint64(newPresence.LastActiveTS))
if err != nil {
logrus.WithError(err).Error("failed to update last active")
}
return
}
}

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -20,7 +21,9 @@ func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, st
return nil
}
type dummyDB struct{}
type dummyDB struct {
storage.Database
}
func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
return 0, nil
@ -106,7 +109,7 @@ func TestRequestPool_updatePresence(t *testing.T) {
},
}
rp := &RequestPool{
presence: &syncMap,
Presence: &syncMap,
producer: publisher,
consumer: consumer,
cfg: &config.SyncAPI{

View file

@ -16,6 +16,7 @@ package syncapi
import (
"context"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/sirupsen/logrus"
@ -33,6 +34,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
)
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
@ -144,4 +146,24 @@ func AddPublicRoutes(
base.PublicClientAPIMux, requestPool, syncDB, userAPI,
rsAPI, cfg, base.Caches,
)
go func() {
ctx := context.Background()
for {
notify, err := syncDB.ExpirePresence(ctx)
if err != nil {
logrus.WithError(err).Error("failed to expire presence")
}
for i := range notify {
requestPool.Presence.Store(notify[i].UserID, types.PresenceInternal{
Presence: types.PresenceOffline,
})
notifier.OnNewPresence(types.StreamingToken{
PresencePosition: notify[i].StreamPos,
}, notify[i].UserID)
}
time.Sleep(types.PresenceExpireInterval)
}
}()
}

View file

@ -21,6 +21,12 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
const (
PresenceNoOpMs = 60_000
PresenceExpire = "'4 minutes'"
PresenceExpireInterval = time.Second * 30
)
type Presence uint8
const (
@ -66,6 +72,11 @@ type PresenceInternal struct {
Presence Presence `json:"-"`
}
type PresenceNotify struct {
StreamPos StreamPosition
UserID string
}
// Equals compares p1 with p2.
func (p1 *PresenceInternal) Equals(p2 *PresenceInternal) bool {
return p1.ClientFields.Presence == p2.ClientFields.Presence &&