Make /sync requests work

This commit is contained in:
Till Faelligen 2021-07-31 16:10:46 +02:00
parent 9dee04c21a
commit 0889ceeac2
18 changed files with 244 additions and 19 deletions

View file

@ -171,7 +171,7 @@ func GetPresence(req *http.Request,
resp := presenceResponse{}
lastActive := time.Since(presence.LastActiveTS.Time())
resp.LastActiveAgo = lastActive.Milliseconds()
resp.StatusMsg = presence.StatusMsg
resp.CurrentlyActive = lastActive <= time.Minute*5
if !resp.CurrentlyActive {
presence.PresenceStatus = types.Unavailable

View file

@ -93,7 +93,7 @@ type OutputPresenceData struct {
UserID string `json:"user_id"`
Presence types.PresenceStatus `json:"presence"`
StatusMsg string `json:"status_msg,omitempty"`
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
LastActiveTS gomatrixserverlib.Timestamp `json:"last_active_ts"`
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
StreamPos types2.StreamPosition
StreamPos types2.StreamPosition `json:"stream_pos"`
}

View file

@ -74,7 +74,6 @@ func (s *OutputPresenceDataConsumer) Start() error {
}
func (s *OutputPresenceDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
log.Debug("presence received by sync api!")
var output api.OutputPresenceData
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
@ -85,7 +84,7 @@ func (s *OutputPresenceDataConsumer) onMessage(msg *sarama.ConsumerMessage) erro
log.Debugf("presence received by sync api! %+v", output)
s.stream.Advance(output.StreamPos)
s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPostion: output.StreamPos}, output.UserID)
s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPosition: output.StreamPos}, output.UserID)
return nil
}

View file

@ -18,6 +18,7 @@ type Streams struct {
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
PresenceDataStreamProdiver types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
}
@ -47,6 +48,10 @@ func NewSyncStreamProviders(
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
},
PresenceDataStreamProdiver: &PresenceStreamProvider{
StreamProvider: StreamProvider{DB: d},
UserAPI: userAPI,
},
DeviceListStreamProvider: &DeviceListStreamProvider{
PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
rsAPI: rsAPI,

View file

@ -0,0 +1,66 @@
package streams
import (
"context"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type PresenceStreamProvider struct {
StreamProvider
UserAPI userapi.UserInternalAPI
}
func (p *PresenceStreamProvider) CompleteSync(ctx context.Context, req *types.SyncRequest) types.StreamPosition {
req.Log.Debug(" CompleteSyncrequested for presence!")
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
type outputPresence struct {
AvatarUrl string `json:"avatar_url,omitempty"`
CurrentlyActive bool `json:"currently_active,omitempty"`
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
Presence string `json:"presence,omitempty"`
StatusMsg string `json:"status_msg,omitempty"`
}
func (p *PresenceStreamProvider) IncrementalSync(ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition {
res := userapi.QueryPresenceAfterResponse{}
if err := p.UserAPI.QueryPresenceAfter(ctx, &userapi.QueryPresenceAfterRequest{StreamPos: int64(from)}, &res); err != nil {
req.Log.WithError(err).Error("unable to fetch presence after")
return from
}
evs := []gomatrixserverlib.ClientEvent{}
var maxPos int64
for _, presence := range res.Presences {
ev := gomatrixserverlib.ClientEvent{}
lastActive := time.Since(presence.LastActiveTS.Time())
pres := outputPresence{
CurrentlyActive: lastActive <= time.Minute*5,
LastActiveAgo: lastActive.Milliseconds(),
Presence: presence.PresenceStatus.String(),
StatusMsg: presence.StatusMsg,
}
j, err := json.Marshal(pres)
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return from
}
ev.Type = "m.presence"
ev.Sender = presence.UserID
ev.Content = j
evs = append(evs, ev)
if presence.StreamPos > maxPos {
maxPos = presence.StreamPos
}
}
req.Response.Presence.Events = append(req.Response.Presence.Events, evs...)
return types.StreamPosition(maxPos)
}

View file

@ -210,6 +210,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
PresenceDataPosition: rp.streams.PresenceDataStreamProdiver.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
@ -242,6 +245,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
PresenceDataPosition: rp.streams.PresenceDataStreamProdiver.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PresenceDataPosition, currentPos.PresenceDataPosition,
),
}
}

View file

@ -108,5 +108,12 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
presenceConsumer := consumers.NewOutputPresenceDataConsumer(
process, cfg, consumer, syncDB, notifier, streams.PresenceDataStreamProdiver,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")
}
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}

View file

@ -124,7 +124,7 @@ type StreamingToken struct {
SendToDevicePosition StreamPosition
InvitePosition StreamPosition
AccountDataPosition StreamPosition
PresenceDataPostion StreamPosition
PresenceDataPosition StreamPosition
DeviceListPosition LogPosition
}
@ -141,10 +141,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d_%d",
"s%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition, t.AccountDataPosition,
t.PresenceDataPosition,
)
if dl := t.DeviceListPosition; !dl.IsEmpty() {
posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
@ -169,12 +170,15 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
return true
case t.PresenceDataPosition > other.PresenceDataPosition:
return true
}
return false
}
func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty()
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.PresenceDataPosition == 0 && t.DeviceListPosition.IsEmpty()
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -212,6 +216,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) {
t.DeviceListPosition = other.DeviceListPosition
}
if other.PresenceDataPosition > t.PresenceDataPosition {
t.PresenceDataPosition = other.PresenceDataPosition
}
}
type TopologyToken struct {
@ -302,7 +309,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
}
categories := strings.Split(tok[1:], ".")
parts := strings.Split(categories[0], "_")
var positions [6]StreamPosition
var positions [7]StreamPosition
for i, p := range parts {
if i > len(positions) {
break
@ -321,6 +328,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
SendToDevicePosition: positions[3],
InvitePosition: positions[4],
AccountDataPosition: positions[5],
PresenceDataPosition: positions[6],
}
// dl-0-1234
// $log_name-$partition-$offset

View file

@ -552,3 +552,4 @@ Can delete backup
Deleted & recreated backups are empty
GET /presence/:user_id/status fetches initial status
PUT /presence/:user_id/status updates my presence
Presence change reports an event to myself

View file

@ -45,6 +45,7 @@ type UserInternalAPI interface {
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
QueryPresenceForUser(ctx context.Context, req *QueryPresenceForUserRequest, res *QueryPresenceForUserResponse) error
QueryPresenceAfter(ctx context.Context, req *QueryPresenceAfterRequest, res *QueryPresenceAfterResponse) error
}
type PerformKeyBackupRequest struct {
@ -354,12 +355,24 @@ type QueryPresenceForUserRequest struct {
// QueryPresenceForUserResponse is the response for QueryPresenceForUserRequest
type QueryPresenceForUserResponse struct {
PresenceStatus types.PresenceStatus
Presence string
StatusMsg string
LastActiveTS gomatrixserverlib.Timestamp
LastActiveAgo int64
CurrentlyActive bool
UserID string `json:"user_id"`
PresenceStatus types.PresenceStatus `json:"presence_status"`
Presence string `json:"presence"`
StatusMsg string `json:"status_msg"`
LastActiveTS gomatrixserverlib.Timestamp `json:"last_active_ts"`
LastActiveAgo int64 `json:"last_active_ago"`
CurrentlyActive bool `json:"currently_active"`
StreamPos int64 `json:"stream_pos"`
}
// QueryPresenceAfterRequest is the request for QueryPresenceAfterRequest
type QueryPresenceAfterRequest struct {
StreamPos int64
}
// QueryPresenceAfterResponse is the response for QueryPresenceAfterRequest
type QueryPresenceAfterResponse struct {
Presences []QueryPresenceForUserResponse
}
// Device represents a client's device (mobile, web, etc)

View file

@ -60,8 +60,12 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
}
func (a *UserInternalAPI) InputPresenceData(ctx context.Context, req *api.InputPresenceRequest, res *api.InputPresenceResponse) error {
_, err := a.PresenceDB.UpsertPresence(ctx, req.UserID, req.StatusMsg, req.Presence, req.LastActiveTS)
pos, err := a.PresenceDB.UpsertPresence(ctx, req.UserID, req.StatusMsg, req.Presence, req.LastActiveTS)
if err != nil {
return err
}
res.StreamPos = pos
return nil
}
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
@ -485,6 +489,27 @@ func (a *UserInternalAPI) QueryPresenceForUser(ctx context.Context, req *api.Que
return nil
}
func (a *UserInternalAPI) QueryPresenceAfter(ctx context.Context, req *api.QueryPresenceAfterRequest, res *api.QueryPresenceAfterResponse) error {
p, err := a.PresenceDB.GetPresenceAfter(ctx, req.StreamPos)
if err != nil {
return err
}
presences := []api.QueryPresenceForUserResponse{}
for _, x := range p {
var y api.QueryPresenceForUserResponse
y.UserID = x.UserID
y.Presence = x.Presence.String()
y.StreamPos = int64(x.StreamPos)
y.LastActiveTS = x.LastActiveTS
y.LastActiveAgo = x.LastActiveAgo
y.PresenceStatus = x.Presence
y.StatusMsg = x.StatusMsg
presences = append(presences, y)
}
res.Presences = append(res.Presences, presences...)
return nil
}
func (a *UserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.PerformKeyBackupRequest, res *api.PerformKeyBackupResponse) {
// Delete metadata
if req.DeleteBackup {

View file

@ -47,6 +47,7 @@ const (
QuerySearchProfilesPath = "/userapi/querySearchProfiles"
QueryOpenIDTokenPath = "/userapi/queryOpenIDToken"
QueryPresenceForUserPath = "/userapi/queryPresenceForUser"
QueryPresenceAfterPath = "/userapi/queryPresenceAfter"
QueryKeyBackupPath = "/userapi/queryKeyBackup"
)
@ -267,3 +268,11 @@ func (h *httpUserInternalAPI) QueryKeyBackup(ctx context.Context, req *api.Query
res.Error = err.Error()
}
}
func (h *httpUserInternalAPI) QueryPresenceAfter(ctx context.Context, req *api.QueryPresenceAfterRequest, res *api.QueryPresenceAfterResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPresenceAfter")
defer span.Finish()
apiURL := h.apiURL + QueryPresenceAfterPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}

View file

@ -260,4 +260,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(QueryPresenceAfterPath,
httputil.MakeInternalAPI("queryPresenceAfter", func(req *http.Request) util.JSONResponse {
request := api.QueryPresenceAfterRequest{}
response := api.QueryPresenceAfterResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := s.QueryPresenceAfter(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -34,4 +34,8 @@ type Database interface {
ctx context.Context,
userID string,
) (presence api.OutputPresenceData, err error)
GetPresenceAfter(
ctx context.Context,
pos int64,
) (presence []api.OutputPresenceData, err error)
}

View file

@ -45,7 +45,7 @@ CREATE INDEX IF NOT EXISTS presence_presences_user_id ON presence_presences(user
`
const upsertPresenceSQL = "" +
"INSERT INTO presence_presences" +
"INSERT INTO presence_presences AS p" +
" (user_id, presence, status_msg, last_active_ts)" +
" VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (user_id)" +
@ -61,10 +61,16 @@ const selectPresenceForUserSQL = "" +
const selectMaxPresenceSQL = "" +
"SELECT MAX(id) FROM presence_presences"
const selectPresenceAfter = "" +
" SELECT id, user_id, presence, status_msg, last_active_ts" +
" FROM presence_presences" +
" WHERE id > $1"
type presenceStatements struct {
upsertPresenceStmt *sql.Stmt
selectPresenceForUsersStmt *sql.Stmt
selectMaxPresenceStmt *sql.Stmt
selectPresenceAfterStmt *sql.Stmt
}
func (p *presenceStatements) execSchema(db *sql.DB) error {
@ -82,6 +88,9 @@ func (p *presenceStatements) prepare(db *sql.DB) (err error) {
if p.selectMaxPresenceStmt, err = db.Prepare(selectMaxPresenceSQL); err != nil {
return
}
if p.selectPresenceAfterStmt, err = db.Prepare(selectPresenceAfter); err != nil {
return
}
return
}
@ -121,3 +130,24 @@ func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx)
err = stmt.QueryRowContext(ctx).Scan(&pos)
return
}
// GetPresenceAfter returns the changes presences after a given stream id
func (p *presenceStatements) GetPresenceAfter(
ctx context.Context, txn *sql.Tx,
after int64,
) (presences []api.OutputPresenceData, err error) {
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
rows, err := stmt.QueryContext(ctx, after)
if err != nil {
return nil, err
}
for rows.Next() {
presence := api.OutputPresenceData{}
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presence.Presence, &presence.StatusMsg, &presence.LastActiveTS); err != nil {
return nil, err
}
presences = append(presences, presence)
}
return presences, rows.Err()
}

View file

@ -56,3 +56,7 @@ func (d *Database) UpsertPresence(
func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresenceData, error) {
return d.presence.GetPresenceForUser(ctx, nil, userID)
}
func (d *Database) GetPresenceAfter(ctx context.Context, pos int64) (presence []api.OutputPresenceData, err error) {
return d.presence.GetPresenceAfter(ctx, nil, pos)
}

View file

@ -56,9 +56,15 @@ const selectPresenceForUserSQL = "" +
" FROM presence_presences" +
" WHERE user_id = $1 LIMIT 1"
const selectPresenceAfter = "" +
" SELECT id, user_id, presence, status_msg, last_active_ts" +
" FROM presence_presences" +
" WHERE id > $1"
type presenceStatements struct {
upsertPresenceStmt *sql.Stmt
selectPresenceForUsersStmt *sql.Stmt
selectPresenceAfterStmt *sql.Stmt
}
func (p *presenceStatements) execSchema(db *sql.DB) error {
@ -73,6 +79,9 @@ func (p *presenceStatements) prepare(db *sql.DB) (err error) {
if p.selectPresenceForUsersStmt, err = db.Prepare(selectPresenceForUserSQL); err != nil {
return
}
if p.selectPresenceAfterStmt, err = db.Prepare(selectPresenceAfter); err != nil {
return
}
return
}
@ -100,3 +109,24 @@ func (p *presenceStatements) GetPresenceForUser(
err = stmt.QueryRowContext(ctx, userID).Scan(&presence.Presence, &presence.StatusMsg, &presence.LastActiveTS)
return
}
// GetPresenceAfter returns the changes presences after a given stream id
func (p *presenceStatements) GetPresenceAfter(
ctx context.Context, txn *sql.Tx,
after int64,
) (presences []api.OutputPresenceData, err error) {
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
rows, err := stmt.QueryContext(ctx, after)
if err != nil {
return nil, err
}
for rows.Next() {
presence := api.OutputPresenceData{}
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presence.Presence, &presence.StatusMsg, &presence.LastActiveTS); err != nil {
return nil, err
}
presences = append(presences, presence)
}
return presences, rows.Err()
}

View file

@ -70,3 +70,7 @@ func (d *Database) UpsertPresence(
func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresenceData, error) {
return d.presence.GetPresenceForUser(ctx, nil, userID)
}
func (d *Database) GetPresenceAfter(ctx context.Context, pos int64) (presence []api.OutputPresenceData, err error) {
return d.presence.GetPresenceAfter(ctx, nil, pos)
}