diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 845b9e465..a864b1185 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -16,6 +16,7 @@ package main import ( "flag" + "log" "os" "github.com/matrix-org/dendrite/appservice" @@ -47,6 +48,16 @@ var ( func main() { cfg := setup.ParseFlags(true) httpAddr := config.HTTPAddress("http://" + *httpBindAddr) + for _, logging := range cfg.Logging { + if logging.Type == "std" { + level, err := logrus.ParseLevel(logging.Level) + if err != nil { + log.Fatal(err) + } + logrus.SetLevel(level) + logrus.SetFormatter(&logrus.JSONFormatter{}) + } + } httpsAddr := config.HTTPAddress("https://" + *httpsBindAddr) httpAPIAddr := httpAddr options := []basepkg.BaseDendriteOptions{} diff --git a/setup/base/base.go b/setup/base/base.go index 5cbd7da9c..ac912728a 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -131,9 +131,9 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base logrus.Fatalf("Failed to start due to configuration errors") } - internal.SetupStdLogging() - internal.SetupHookLogging(cfg.Logging, componentName) - internal.SetupPprof() + // internal.SetupStdLogging() + // internal.SetupHookLogging(cfg.Logging, componentName) + // internal.SetupPprof() logrus.Infof("Dendrite version %s", internal.VersionString()) diff --git a/setup/config/config_appservice.go b/setup/config/config_appservice.go index ff3287714..30b275bc1 100644 --- a/setup/config/config_appservice.go +++ b/setup/config/config_appservice.go @@ -21,7 +21,6 @@ import ( "regexp" "strings" - log "github.com/sirupsen/logrus" yaml "gopkg.in/yaml.v2" ) @@ -339,11 +338,11 @@ func checkErrors(config *AppServiceAPI, derived *Derived) (err error) { // TODO: Remove once rate_limited is implemented if appservice.RateLimited { - log.Warn("WARNING: Application service option rate_limited is currently unimplemented") + // log.Warn("WARNING: Application service option rate_limited is currently unimplemented") } // TODO: Remove once protocols is implemented if len(appservice.Protocols) > 0 { - log.Warn("WARNING: Application service option protocols is currently unimplemented") + // log.Warn("WARNING: Application service option protocols is currently unimplemented") } } @@ -369,7 +368,7 @@ func validateNamespace( // Check if GroupID for the users namespace is in the correct format if key == "users" && namespace.GroupID != "" { // TODO: Remove once group_id is implemented - log.Warn("WARNING: Application service option group_id is currently unimplemented") + // log.Warn("WARNING: Application service option group_id is currently unimplemented") correctFormat := groupIDRegexp.MatchString(namespace.GroupID) if !correctFormat { diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 5a036d889..5253eb8b7 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -164,4 +164,6 @@ type Presence interface { GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) + ExpirePresence(ctx context.Context) ([]types.PresenceNotify, error) + UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error } diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go index 1ba44ad0d..081de5142 100644 --- a/syncapi/storage/postgres/presence_table.go +++ b/syncapi/storage/postgres/presence_table.go @@ -62,6 +62,10 @@ const upsertPresenceFromSyncSQL = "" + " presence = $2, last_active_ts = $3" + " RETURNING id" +const updateLastActiveSQL = `UPDATE syncapi_presence +SET last_active_ts = $1 +WHERE user_id = $2` + const selectPresenceForUserSQL = "" + "SELECT presence, status_msg, last_active_ts" + " FROM syncapi_presence" + @@ -80,9 +84,10 @@ const expirePresenceSQL = `UPDATE syncapi_presence SET id = nextval('syncapi_presence_id'), presence = 3 WHERE - to_timestamp(last_active_ts / 1000) < NOW() - INTERVAL '5 minutes' + to_timestamp(last_active_ts / 1000) < NOW() - INTERVAL` + types.PresenceExpire + ` AND presence != 3 +RETURNING id, user_id ` type presenceStatements struct { @@ -92,6 +97,7 @@ type presenceStatements struct { selectMaxPresenceStmt *sql.Stmt selectPresenceAfterStmt *sql.Stmt expirePresenceStmt *sql.Stmt + updateLastActiveStmt *sql.Stmt } func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { @@ -107,6 +113,7 @@ func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, {&s.selectPresenceAfterStmt, selectPresenceAfter}, {&s.expirePresenceStmt, expirePresenceSQL}, + {&s.updateLastActiveStmt, updateLastActiveSQL}, }.Prepare(db) } @@ -180,7 +187,22 @@ func (p *presenceStatements) GetPresenceAfter( func (p *presenceStatements) ExpirePresence( ctx context.Context, -) error { - _, err := p.expirePresenceStmt.Exec() +) ([]types.PresenceNotify, error) { + rows, err := p.expirePresenceStmt.QueryContext(ctx) + presences := make([]types.PresenceNotify, 0) + i := 0 + for rows.Next() { + presences = append(presences, types.PresenceNotify{}) + err = rows.Scan(&presences[i].StreamPos, &presences[i].UserID) + if err != nil { + return nil, err + } + i++ + } + return presences, err +} + +func (p *presenceStatements) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error { + _, err := p.updateLastActiveStmt.Exec(&lastActiveTs, &userId) return err } diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index bf47610a4..9cfe7c070 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -16,9 +16,7 @@ package postgres import ( - "context" "database/sql" - "time" // Import the postgres database driver. _ "github.com/lib/pq" @@ -27,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" - "github.com/sirupsen/logrus" ) // SyncServerDatasource represents a sync server datasource which manages @@ -125,17 +122,5 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) Ignores: ignores, Presence: presence, } - go func() { - ctx := context.Background() - for { - err := d.Database.Presence.ExpirePresence(ctx) - if err != nil { - logrus.WithError(err).Error("failed to expire presence") - } else { - logrus.Info("expired presence") - } - time.Sleep(time.Minute) - } - }() return &d, nil } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 401b69f53..f396759b7 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1068,6 +1068,10 @@ func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre return s.Presence.GetMaxPresenceID(ctx, nil) } -func (s *Database) ExpirePresence(ctx context.Context) error { +func (s *Database) ExpirePresence(ctx context.Context) ([]types.PresenceNotify, error) { return s.Presence.ExpirePresence(ctx) } + +func (s *Database) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error { + return s.Presence.UpdateLastActive(ctx, userId, lastActiveTs) +} diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go index 55ab28784..fe6b3ce84 100644 --- a/syncapi/storage/sqlite3/presence_table.go +++ b/syncapi/storage/sqlite3/presence_table.go @@ -183,7 +183,12 @@ func (p *presenceStatements) GetPresenceAfter( func (p *presenceStatements) ExpirePresence( ctx context.Context, -) error { +) ([]types.PresenceNotify, error) { + // TODO implement + return nil, nil +} + +func (p *presenceStatements) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error { // TODO implement return nil } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 37884f510..1c5215e6c 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -191,5 +191,6 @@ type Presence interface { GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (presences map[string]*types.PresenceInternal, err error) - ExpirePresence(ctx context.Context) error + ExpirePresence(ctx context.Context) ([]types.PresenceNotify, error) + UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error } diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 877bcf141..65cb3ba5d 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -17,7 +17,6 @@ package streams import ( "context" "encoding/json" - "sync" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/types" @@ -26,8 +25,6 @@ import ( type PresenceStreamProvider struct { StreamProvider - // cache contains previously sent presence updates to avoid unneeded updates - cache sync.Map notifier *notifier.Notifier } @@ -103,18 +100,6 @@ func (p *PresenceStreamProvider) IncrementalSync( if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) { continue } - cacheKey := req.Device.UserID + req.Device.ID + presence.UserID - pres, ok := p.cache.Load(cacheKey) - if ok { - // skip already sent presence - prevPresence := pres.(*types.PresenceInternal) - currentlyActive := prevPresence.CurrentlyActive() - skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID - if skip { - req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID) - continue - } - } if _, known := types.PresenceFromString(presence.ClientFields.Presence); known { presence.ClientFields.LastActiveAgo = presence.LastActiveAgo() @@ -142,7 +127,6 @@ func (p *PresenceStreamProvider) IncrementalSync( if len(req.Response.Presence.Events) == req.Filter.Presence.Limit { break } - p.cache.Store(cacheKey, presence) } if len(req.Response.Presence.Events) == 0 { diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 7b9526b53..878dd9fa3 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -49,7 +49,7 @@ type RequestPool struct { keyAPI keyapi.SyncKeyAPI rsAPI roomserverAPI.SyncRoomserverAPI lastseen *sync.Map - presence *sync.Map + Presence *sync.Map streams *streams.Streams Notifier *notifier.Notifier producer PresencePublisher @@ -84,14 +84,14 @@ func NewRequestPool( keyAPI: keyAPI, rsAPI: rsAPI, lastseen: &sync.Map{}, - presence: &sync.Map{}, + Presence: &sync.Map{}, streams: streams, Notifier: notifier, producer: producer, consumer: consumer, } go rp.cleanLastSeen() - go rp.cleanPresence(db, time.Minute*5) + // go rp.cleanPresence(db, time.Minute*5) return rp } @@ -110,11 +110,11 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat return } for { - rp.presence.Range(func(key interface{}, v interface{}) bool { + rp.Presence.Range(func(key interface{}, v interface{}) bool { p := v.(types.PresenceInternal) if time.Since(p.LastActiveTS.Time()) > cleanupTime { rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID) - rp.presence.Delete(key) + rp.Presence.Delete(key) } return true }) @@ -152,12 +152,19 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user } newPresence.ClientFields.Presence = presenceID.String() - defer rp.presence.Store(userID, newPresence) + defer rp.Presence.Store(userID, newPresence) // avoid spamming presence updates when syncing - existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence) + existingPresence, ok := rp.Presence.LoadOrStore(userID, newPresence) if ok { p := existingPresence.(types.PresenceInternal) - if p.ClientFields.Presence == newPresence.ClientFields.Presence { + if p.ClientFields.Presence == newPresence.ClientFields.Presence && newPresence.LastActiveTS-dbPresence.LastActiveTS < types.PresenceNoOpMs { + return + } + if dbPresence.Presence == types.PresenceOnline && presenceID == types.PresenceOnline && newPresence.LastActiveTS-dbPresence.LastActiveTS >= types.PresenceNoOpMs { + err := db.UpdateLastActive(context.Background(), userID, uint64(newPresence.LastActiveTS)) + if err != nil { + logrus.WithError(err).Error("failed to update last active") + } return } } diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go index 0c7209521..4c8b73585 100644 --- a/syncapi/sync/requestpool_test.go +++ b/syncapi/sync/requestpool_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -20,7 +21,9 @@ func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, st return nil } -type dummyDB struct{} +type dummyDB struct { + storage.Database +} func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { return 0, nil @@ -106,7 +109,7 @@ func TestRequestPool_updatePresence(t *testing.T) { }, } rp := &RequestPool{ - presence: &syncMap, + Presence: &syncMap, producer: publisher, consumer: consumer, cfg: &config.SyncAPI{ diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 92db18d56..2fff62d0a 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -16,6 +16,7 @@ package syncapi import ( "context" + "time" "github.com/matrix-org/dendrite/internal/caching" "github.com/sirupsen/logrus" @@ -33,6 +34,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/dendrite/syncapi/types" ) // AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI @@ -144,4 +146,24 @@ func AddPublicRoutes( base.PublicClientAPIMux, requestPool, syncDB, userAPI, rsAPI, cfg, base.Caches, ) + + go func() { + ctx := context.Background() + for { + notify, err := syncDB.ExpirePresence(ctx) + if err != nil { + logrus.WithError(err).Error("failed to expire presence") + } + for i := range notify { + requestPool.Presence.Store(notify[i].UserID, types.PresenceInternal{ + Presence: types.PresenceOffline, + }) + notifier.OnNewPresence(types.StreamingToken{ + PresencePosition: notify[i].StreamPos, + }, notify[i].UserID) + + } + time.Sleep(types.PresenceExpireInterval) + } + }() } diff --git a/syncapi/types/presence.go b/syncapi/types/presence.go index 30e025b9f..760225de8 100644 --- a/syncapi/types/presence.go +++ b/syncapi/types/presence.go @@ -21,6 +21,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +const ( + PresenceNoOpMs = 60_000 + PresenceExpire = "'4 minutes'" + PresenceExpireInterval = time.Second * 30 +) + type Presence uint8 const ( @@ -66,6 +72,11 @@ type PresenceInternal struct { Presence Presence `json:"-"` } +type PresenceNotify struct { + StreamPos StreamPosition + UserID string +} + // Equals compares p1 with p2. func (p1 *PresenceInternal) Equals(p2 *PresenceInternal) bool { return p1.ClientFields.Presence == p2.ClientFields.Presence &&