Update last seen on sync requests (#1593)

* Update last seen on sync requests

* Fix MSC2836 unit tests

* Only update once per minute

* Remove debug logging

* Configurable option

* Simplify updateLastSeen/cleanLastSeen
This commit is contained in:
Neil Alexander 2020-11-20 11:29:02 +00:00 committed by GitHub
parent 13cbd50dc2
commit c636be5070
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 127 additions and 14 deletions

View file

@ -300,6 +300,11 @@ sync_api:
max_idle_conns: 2 max_idle_conns: 2
conn_max_lifetime: -1 conn_max_lifetime: -1
# This option controls which HTTP header to inspect to find the real remote IP
# address of the client. This is likely required if Dendrite is running behind
# a reverse proxy server.
# real_ip_header: X-Real-IP
# Configuration for the User API. # Configuration for the User API.
user_api: user_api:
internal_api: internal_api:

View file

@ -7,6 +7,8 @@ type SyncAPI struct {
ExternalAPI ExternalAPIOptions `yaml:"external_api"` ExternalAPI ExternalAPIOptions `yaml:"external_api"`
Database DatabaseOptions `yaml:"database"` Database DatabaseOptions `yaml:"database"`
RealIPHeader string `yaml:"real_ip_header"`
} }
func (c *SyncAPI) Defaults() { func (c *SyncAPI) Defaults() {

View file

@ -457,6 +457,9 @@ func (u *testUserAPI) PerformDeviceDeletion(ctx context.Context, req *userapi.Pe
func (u *testUserAPI) PerformDeviceUpdate(ctx context.Context, req *userapi.PerformDeviceUpdateRequest, res *userapi.PerformDeviceUpdateResponse) error { func (u *testUserAPI) PerformDeviceUpdate(ctx context.Context, req *userapi.PerformDeviceUpdateRequest, res *userapi.PerformDeviceUpdateResponse) error {
return nil return nil
} }
func (u *testUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
return nil
}
func (u *testUserAPI) PerformAccountDeactivation(ctx context.Context, req *userapi.PerformAccountDeactivationRequest, res *userapi.PerformAccountDeactivationResponse) error { func (u *testUserAPI) PerformAccountDeactivation(ctx context.Context, req *userapi.PerformAccountDeactivationRequest, res *userapi.PerformAccountDeactivationResponse) error {
return nil return nil
} }

View file

@ -19,10 +19,14 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"net/http" "net/http"
"strings"
"sync"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/config"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/internal"
@ -37,18 +41,62 @@ import (
// RequestPool manages HTTP long-poll connections for /sync // RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct { type RequestPool struct {
db storage.Database db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI userAPI userapi.UserInternalAPI
notifier *Notifier notifier *Notifier
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
} }
// NewRequestPool makes a new RequestPool // NewRequestPool makes a new RequestPool
func NewRequestPool( func NewRequestPool(
db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, db storage.Database, cfg *config.SyncAPI, n *Notifier,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
) *RequestPool { ) *RequestPool {
return &RequestPool{db, userAPI, n, keyAPI, rsAPI} rp := &RequestPool{db, cfg, userAPI, n, keyAPI, rsAPI, sync.Map{}}
go rp.cleanLastSeen()
return rp
}
func (rp *RequestPool) cleanLastSeen() {
for {
rp.lastseen.Range(func(key interface{}, _ interface{}) bool {
rp.lastseen.Delete(key)
return true
})
time.Sleep(time.Minute)
}
}
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok {
return
}
remoteAddr := req.RemoteAddr
if rp.cfg.RealIPHeader != "" {
if header := req.Header.Get(rp.cfg.RealIPHeader); header != "" {
// TODO: Maybe this isn't great but it will satisfy both X-Real-IP
// and X-Forwarded-For (which can be a list where the real client
// address is the first listed address). Make more intelligent?
addresses := strings.Split(header, ",")
if ip := net.ParseIP(addresses[0]); ip != nil {
remoteAddr = addresses[0]
}
}
}
lsreq := &userapi.PerformLastSeenUpdateRequest{
UserID: device.UserID,
DeviceID: device.ID,
RemoteAddr: remoteAddr,
}
lsres := &userapi.PerformLastSeenUpdateResponse{}
go rp.userAPI.PerformLastSeenUpdate(req.Context(), lsreq, lsres) // nolint:errcheck
rp.lastseen.Store(device.UserID+device.ID, time.Now())
} }
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@ -74,6 +122,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
"limit": syncReq.limit, "limit": syncReq.limit,
}) })
rp.updateLastSeen(req, device)
currPos := rp.notifier.CurrentPosition() currPos := rp.notifier.CurrentPosition()
if rp.shouldReturnImmediately(syncReq) { if rp.shouldReturnImmediately(syncReq) {

View file

@ -61,7 +61,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start notifier") logrus.WithError(err).Panicf("failed to start notifier")
} }
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI) requestPool := sync.NewRequestPool(syncDB, cfg, notifier, userAPI, keyAPI, rsAPI)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),

View file

@ -29,6 +29,7 @@ type UserInternalAPI interface {
PerformPasswordUpdate(ctx context.Context, req *PerformPasswordUpdateRequest, res *PerformPasswordUpdateResponse) error PerformPasswordUpdate(ctx context.Context, req *PerformPasswordUpdateRequest, res *PerformPasswordUpdateResponse) error
PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
PerformAccountDeactivation(ctx context.Context, req *PerformAccountDeactivationRequest, res *PerformAccountDeactivationResponse) error PerformAccountDeactivation(ctx context.Context, req *PerformAccountDeactivationRequest, res *PerformAccountDeactivationResponse) error
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error
@ -183,6 +184,17 @@ type PerformPasswordUpdateResponse struct {
Account *Account Account *Account
} }
// PerformLastSeenUpdateRequest is the request for PerformLastSeenUpdate.
type PerformLastSeenUpdateRequest struct {
UserID string
DeviceID string
RemoteAddr string
}
// PerformLastSeenUpdateResponse is the response for PerformLastSeenUpdate.
type PerformLastSeenUpdateResponse struct {
}
// PerformDeviceCreationRequest is the request for PerformDeviceCreation // PerformDeviceCreationRequest is the request for PerformDeviceCreation
type PerformDeviceCreationRequest struct { type PerformDeviceCreationRequest struct {
Localpart string Localpart string

View file

@ -172,6 +172,21 @@ func (a *UserInternalAPI) deviceListUpdate(userID string, deviceIDs []string) er
return nil return nil
} }
func (a *UserInternalAPI) PerformLastSeenUpdate(
ctx context.Context,
req *api.PerformLastSeenUpdateRequest,
res *api.PerformLastSeenUpdateResponse,
) error {
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
if err := a.DeviceDB.UpdateDeviceLastSeen(ctx, localpart, req.DeviceID, req.RemoteAddr); err != nil {
return fmt.Errorf("a.DeviceDB.UpdateDeviceLastSeen: %w", err)
}
return nil
}
func (a *UserInternalAPI) PerformDeviceUpdate(ctx context.Context, req *api.PerformDeviceUpdateRequest, res *api.PerformDeviceUpdateResponse) error { func (a *UserInternalAPI) PerformDeviceUpdate(ctx context.Context, req *api.PerformDeviceUpdateRequest, res *api.PerformDeviceUpdateResponse) error {
localpart, _, err := gomatrixserverlib.SplitID('@', req.RequestingUserID) localpart, _, err := gomatrixserverlib.SplitID('@', req.RequestingUserID)
if err != nil { if err != nil {

View file

@ -32,6 +32,7 @@ const (
PerformAccountCreationPath = "/userapi/performAccountCreation" PerformAccountCreationPath = "/userapi/performAccountCreation"
PerformPasswordUpdatePath = "/userapi/performPasswordUpdate" PerformPasswordUpdatePath = "/userapi/performPasswordUpdate"
PerformDeviceDeletionPath = "/userapi/performDeviceDeletion" PerformDeviceDeletionPath = "/userapi/performDeviceDeletion"
PerformLastSeenUpdatePath = "/userapi/performLastSeenUpdate"
PerformDeviceUpdatePath = "/userapi/performDeviceUpdate" PerformDeviceUpdatePath = "/userapi/performDeviceUpdate"
PerformAccountDeactivationPath = "/userapi/performAccountDeactivation" PerformAccountDeactivationPath = "/userapi/performAccountDeactivation"
@ -119,6 +120,18 @@ func (h *httpUserInternalAPI) PerformDeviceDeletion(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
func (h *httpUserInternalAPI) PerformLastSeenUpdate(
ctx context.Context,
req *api.PerformLastSeenUpdateRequest,
res *api.PerformLastSeenUpdateResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformLastSeen")
defer span.Finish()
apiURL := h.apiURL + PerformLastSeenUpdatePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}
func (h *httpUserInternalAPI) PerformDeviceUpdate(ctx context.Context, req *api.PerformDeviceUpdateRequest, res *api.PerformDeviceUpdateResponse) error { func (h *httpUserInternalAPI) PerformDeviceUpdate(ctx context.Context, req *api.PerformDeviceUpdateRequest, res *api.PerformDeviceUpdateResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDeviceUpdate") span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDeviceUpdate")
defer span.Finish() defer span.Finish()

View file

@ -65,6 +65,19 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(PerformLastSeenUpdatePath,
httputil.MakeInternalAPI("performLastSeenUpdate", func(req *http.Request) util.JSONResponse {
request := api.PerformLastSeenUpdateRequest{}
response := api.PerformLastSeenUpdateResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := s.PerformLastSeenUpdate(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformDeviceUpdatePath, internalAPIMux.Handle(PerformDeviceUpdatePath,
httputil.MakeInternalAPI("performDeviceUpdate", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("performDeviceUpdate", func(req *http.Request) util.JSONResponse {
request := api.PerformDeviceUpdateRequest{} request := api.PerformDeviceUpdateRequest{}

View file

@ -33,9 +33,9 @@ type Database interface {
// Returns the device on success. // Returns the device on success.
CreateDevice(ctx context.Context, localpart string, deviceID *string, accessToken string, displayName *string, ipAddr, userAgent string) (dev *api.Device, returnErr error) CreateDevice(ctx context.Context, localpart string, deviceID *string, accessToken string, displayName *string, ipAddr, userAgent string) (dev *api.Device, returnErr error)
UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error
UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr string) error
RemoveDevice(ctx context.Context, deviceID, localpart string) error RemoveDevice(ctx context.Context, deviceID, localpart string) error
RemoveDevices(ctx context.Context, localpart string, devices []string) error RemoveDevices(ctx context.Context, localpart string, devices []string) error
// RemoveAllDevices deleted all devices for this user. Returns the devices deleted. // RemoveAllDevices deleted all devices for this user. Returns the devices deleted.
RemoveAllDevices(ctx context.Context, localpart, exceptDeviceID string) (devices []api.Device, err error) RemoveAllDevices(ctx context.Context, localpart, exceptDeviceID string) (devices []api.Device, err error)
UpdateDeviceLastSeen(ctx context.Context, deviceID, ipAddr string) error
} }

View file

@ -95,7 +95,7 @@ const selectDevicesByIDSQL = "" +
"SELECT device_id, localpart, display_name FROM device_devices WHERE device_id = ANY($1)" "SELECT device_id, localpart, display_name FROM device_devices WHERE device_id = ANY($1)"
const updateDeviceLastSeen = "" + const updateDeviceLastSeen = "" +
"UPDATE device_devices SET last_seen_ts = $1, ip = $2 WHERE device_id = $3" "UPDATE device_devices SET last_seen_ts = $1, ip = $2 WHERE localpart = $3 AND device_id = $4"
type devicesStatements struct { type devicesStatements struct {
insertDeviceStmt *sql.Stmt insertDeviceStmt *sql.Stmt
@ -310,9 +310,9 @@ func (s *devicesStatements) selectDevicesByLocalpart(
return devices, rows.Err() return devices, rows.Err()
} }
func (s *devicesStatements) updateDeviceLastSeen(ctx context.Context, txn *sql.Tx, deviceID, ipAddr string) error { func (s *devicesStatements) updateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr string) error {
lastSeenTs := time.Now().UnixNano() / 1000000 lastSeenTs := time.Now().UnixNano() / 1000000
stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt) stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt)
_, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, deviceID) _, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, localpart, deviceID)
return err return err
} }

View file

@ -205,8 +205,8 @@ func (d *Database) RemoveAllDevices(
} }
// UpdateDeviceLastSeen updates a the last seen timestamp and the ip address // UpdateDeviceLastSeen updates a the last seen timestamp and the ip address
func (d *Database) UpdateDeviceLastSeen(ctx context.Context, deviceID, ipAddr string) error { func (d *Database) UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr string) error {
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
return d.devices.updateDeviceLastSeen(ctx, txn, deviceID, ipAddr) return d.devices.updateDeviceLastSeen(ctx, txn, localpart, deviceID, ipAddr)
}) })
} }

View file

@ -80,7 +80,7 @@ const selectDevicesByIDSQL = "" +
"SELECT device_id, localpart, display_name FROM device_devices WHERE device_id IN ($1)" "SELECT device_id, localpart, display_name FROM device_devices WHERE device_id IN ($1)"
const updateDeviceLastSeen = "" + const updateDeviceLastSeen = "" +
"UPDATE device_devices SET last_seen_ts = $1, ip = $2 WHERE device_id = $3" "UPDATE device_devices SET last_seen_ts = $1, ip = $2 WHERE localpart = $3 AND device_id = $4"
type devicesStatements struct { type devicesStatements struct {
db *sql.DB db *sql.DB
@ -314,9 +314,9 @@ func (s *devicesStatements) selectDevicesByID(ctx context.Context, deviceIDs []s
return devices, rows.Err() return devices, rows.Err()
} }
func (s *devicesStatements) updateDeviceLastSeen(ctx context.Context, txn *sql.Tx, deviceID, ipAddr string) error { func (s *devicesStatements) updateDeviceLastSeen(ctx context.Context, txn *sql.Tx, localpart, deviceID, ipAddr string) error {
lastSeenTs := time.Now().UnixNano() / 1000000 lastSeenTs := time.Now().UnixNano() / 1000000
stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt) stmt := sqlutil.TxStmt(txn, s.updateDeviceLastSeenStmt)
_, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, deviceID) _, err := stmt.ExecContext(ctx, lastSeenTs, ipAddr, localpart, deviceID)
return err return err
} }

View file

@ -207,8 +207,8 @@ func (d *Database) RemoveAllDevices(
} }
// UpdateDeviceLastSeen updates a the last seen timestamp and the ip address // UpdateDeviceLastSeen updates a the last seen timestamp and the ip address
func (d *Database) UpdateDeviceLastSeen(ctx context.Context, deviceID, ipAddr string) error { func (d *Database) UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr string) error {
return d.writer.Do(d.db, nil, func(txn *sql.Tx) error { return d.writer.Do(d.db, nil, func(txn *sql.Tx) error {
return d.devices.updateDeviceLastSeen(ctx, txn, deviceID, ipAddr) return d.devices.updateDeviceLastSeen(ctx, txn, localpart, deviceID, ipAddr)
}) })
} }