mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-28 17:23:09 -06:00
Add presence storage
Add internal API to set/retrieve the presence status
This commit is contained in:
parent
139817a0b3
commit
e0f9c663e2
|
|
@ -19,12 +19,14 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// UserInternalAPI is the internal API for information about users and devices.
|
// UserInternalAPI is the internal API for information about users and devices.
|
||||||
type UserInternalAPI interface {
|
type UserInternalAPI interface {
|
||||||
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
|
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
|
||||||
|
InputPresenceData(ctx context.Context, req *InputPresenceRequest, res *InputPresenceResponse) error
|
||||||
PerformAccountCreation(ctx context.Context, req *PerformAccountCreationRequest, res *PerformAccountCreationResponse) error
|
PerformAccountCreation(ctx context.Context, req *PerformAccountCreationRequest, res *PerformAccountCreationResponse) error
|
||||||
PerformPasswordUpdate(ctx context.Context, req *PerformPasswordUpdateRequest, res *PerformPasswordUpdateResponse) error
|
PerformPasswordUpdate(ctx context.Context, req *PerformPasswordUpdateRequest, res *PerformPasswordUpdateResponse) error
|
||||||
PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error
|
PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error
|
||||||
|
|
@ -40,6 +42,7 @@ type UserInternalAPI interface {
|
||||||
QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
|
QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
|
||||||
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
|
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
|
||||||
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
|
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
|
||||||
|
QueryPresenceForUser(ctx context.Context, req *QueryPresenceForUserRequest, res *QueryPresenceForUserResponse) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// InputAccountDataRequest is the request for InputAccountData
|
// InputAccountDataRequest is the request for InputAccountData
|
||||||
|
|
@ -249,6 +252,34 @@ type QueryOpenIDTokenResponse struct {
|
||||||
ExpiresAtMS int64
|
ExpiresAtMS int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InputPresenceRequest is the request for presence data
|
||||||
|
type InputPresenceRequest struct {
|
||||||
|
UserID string
|
||||||
|
DisplayName string
|
||||||
|
AvatarURL string
|
||||||
|
Presence types.PresenceStatus
|
||||||
|
StatusMsg string
|
||||||
|
LastActiveTS int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// InputPresenceResponse is the response for InputPresenceRequest
|
||||||
|
type InputPresenceResponse struct{}
|
||||||
|
|
||||||
|
// QueryPresenceForUserRequest is the request for QueryPresenceForUserRequest
|
||||||
|
type QueryPresenceForUserRequest struct {
|
||||||
|
UserID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryPresenceForUserResponse is the response for QueryPresenceForUserRequest
|
||||||
|
type QueryPresenceForUserResponse struct {
|
||||||
|
PresenceStatus types.PresenceStatus `json:"-"`
|
||||||
|
Presence string `json:"presence"`
|
||||||
|
StatusMsg string `json:"status_msg,omitempty"`
|
||||||
|
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
|
||||||
|
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
|
||||||
|
CurrentlyActive bool `json:"currently_active,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// Device represents a client's device (mobile, web, etc)
|
// Device represents a client's device (mobile, web, etc)
|
||||||
type Device struct {
|
type Device struct {
|
||||||
ID string
|
ID string
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage/presence"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -37,6 +38,7 @@ import (
|
||||||
type UserInternalAPI struct {
|
type UserInternalAPI struct {
|
||||||
AccountDB accounts.Database
|
AccountDB accounts.Database
|
||||||
DeviceDB devices.Database
|
DeviceDB devices.Database
|
||||||
|
PresenceDB presence.Database
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
// AppServices is the list of all registered AS
|
// AppServices is the list of all registered AS
|
||||||
AppServices []config.ApplicationService
|
AppServices []config.ApplicationService
|
||||||
|
|
@ -57,6 +59,11 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
|
||||||
return a.AccountDB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData)
|
return a.AccountDB.SaveAccountData(ctx, local, req.RoomID, req.DataType, req.AccountData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *UserInternalAPI) InputPresenceData(ctx context.Context, req *api.InputPresenceRequest, res *api.InputPresenceResponse) error {
|
||||||
|
_, err := a.PresenceDB.UpsertPresence(ctx, req.UserID, req.StatusMsg, req.Presence, req.LastActiveTS)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
|
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
|
||||||
if req.AccountType == api.AccountTypeGuest {
|
if req.AccountType == api.AccountTypeGuest {
|
||||||
acc, err := a.AccountDB.CreateGuestAccount(ctx)
|
acc, err := a.AccountDB.CreateGuestAccount(ctx)
|
||||||
|
|
@ -442,3 +449,37 @@ func (a *UserInternalAPI) QueryOpenIDToken(ctx context.Context, req *api.QueryOp
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryPresenceForUser gets the current presence status, if set.
|
||||||
|
func (a *UserInternalAPI) QueryPresenceForUser(ctx context.Context, req *api.QueryPresenceForUserRequest, res *api.QueryPresenceForUserResponse) error {
|
||||||
|
local, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var maxLastSeen int64
|
||||||
|
// If it's a local user, we can check the devices for possible updated timestamps
|
||||||
|
if domain == a.ServerName {
|
||||||
|
devs, err := a.DeviceDB.GetDevicesByLocalpart(ctx, local)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, dev := range devs {
|
||||||
|
if dev.LastSeenTS > maxLastSeen {
|
||||||
|
maxLastSeen = dev.LastSeenTS
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := a.PresenceDB.GetPresenceForUser(ctx, req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
res.PresenceStatus = p.Presence
|
||||||
|
res.StatusMsg = p.StatusMsg
|
||||||
|
res.LastActiveTS = p.LastActiveTS
|
||||||
|
if maxLastSeen > p.LastActiveTS.Time().Unix() {
|
||||||
|
res.LastActiveAgo = maxLastSeen
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,8 @@ import (
|
||||||
|
|
||||||
// HTTP paths for the internal HTTP APIs
|
// HTTP paths for the internal HTTP APIs
|
||||||
const (
|
const (
|
||||||
InputAccountDataPath = "/userapi/inputAccountData"
|
InputAccountDataPath = "/userapi/inputAccountData"
|
||||||
|
InputPresenceDataPath = "/userapi/inputPresenceData"
|
||||||
|
|
||||||
PerformDeviceCreationPath = "/userapi/performDeviceCreation"
|
PerformDeviceCreationPath = "/userapi/performDeviceCreation"
|
||||||
PerformAccountCreationPath = "/userapi/performAccountCreation"
|
PerformAccountCreationPath = "/userapi/performAccountCreation"
|
||||||
|
|
@ -37,13 +38,14 @@ const (
|
||||||
PerformAccountDeactivationPath = "/userapi/performAccountDeactivation"
|
PerformAccountDeactivationPath = "/userapi/performAccountDeactivation"
|
||||||
PerformOpenIDTokenCreationPath = "/userapi/performOpenIDTokenCreation"
|
PerformOpenIDTokenCreationPath = "/userapi/performOpenIDTokenCreation"
|
||||||
|
|
||||||
QueryProfilePath = "/userapi/queryProfile"
|
QueryProfilePath = "/userapi/queryProfile"
|
||||||
QueryAccessTokenPath = "/userapi/queryAccessToken"
|
QueryAccessTokenPath = "/userapi/queryAccessToken"
|
||||||
QueryDevicesPath = "/userapi/queryDevices"
|
QueryDevicesPath = "/userapi/queryDevices"
|
||||||
QueryAccountDataPath = "/userapi/queryAccountData"
|
QueryAccountDataPath = "/userapi/queryAccountData"
|
||||||
QueryDeviceInfosPath = "/userapi/queryDeviceInfos"
|
QueryDeviceInfosPath = "/userapi/queryDeviceInfos"
|
||||||
QuerySearchProfilesPath = "/userapi/querySearchProfiles"
|
QuerySearchProfilesPath = "/userapi/querySearchProfiles"
|
||||||
QueryOpenIDTokenPath = "/userapi/queryOpenIDToken"
|
QueryOpenIDTokenPath = "/userapi/queryOpenIDToken"
|
||||||
|
QueryPresenceForUserPath = "/userapi/queryPresenceForUser"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API.
|
// NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API.
|
||||||
|
|
@ -74,6 +76,14 @@ func (h *httpUserInternalAPI) InputAccountData(ctx context.Context, req *api.Inp
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpUserInternalAPI) InputPresenceData(ctx context.Context, req *api.InputPresenceRequest, res *api.InputPresenceResponse) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "InputPresenceData")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.apiURL + InputPresenceDataPath
|
||||||
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *httpUserInternalAPI) PerformAccountCreation(
|
func (h *httpUserInternalAPI) PerformAccountCreation(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.PerformAccountCreationRequest,
|
request *api.PerformAccountCreationRequest,
|
||||||
|
|
@ -225,3 +235,11 @@ func (h *httpUserInternalAPI) QueryOpenIDToken(ctx context.Context, req *api.Que
|
||||||
apiURL := h.apiURL + QueryOpenIDTokenPath
|
apiURL := h.apiURL + QueryOpenIDTokenPath
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpUserInternalAPI) QueryPresenceForUser(ctx context.Context, req *api.QueryPresenceForUserRequest, res *api.QueryPresenceForUserResponse) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPresenceForUser")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.apiURL + QueryPresenceForUserPath
|
||||||
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -234,4 +234,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
internalAPIMux.Handle(InputPresenceDataPath,
|
||||||
|
httputil.MakeInternalAPI("inputPresenceDataPath", func(req *http.Request) util.JSONResponse {
|
||||||
|
request := api.InputPresenceRequest{}
|
||||||
|
response := api.InputPresenceResponse{}
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
|
}
|
||||||
|
if err := s.InputPresenceData(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
37
userapi/storage/presence/interface.go
Normal file
37
userapi/storage/presence/interface.go
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package presence
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/eduserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Database interface {
|
||||||
|
// UpsertPresence creates/updates the presence status of a user.
|
||||||
|
UpsertPresence(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, statusMsg string,
|
||||||
|
presence types.PresenceStatus,
|
||||||
|
lastActiveTS int64,
|
||||||
|
) (pos int64, err error)
|
||||||
|
// GetPresenceForUser gets the presence status of a user.
|
||||||
|
GetPresenceForUser(
|
||||||
|
ctx context.Context,
|
||||||
|
userID string,
|
||||||
|
) (presence api.OutputPresence, err error)
|
||||||
|
}
|
||||||
104
userapi/storage/presence/postgres/presence_table.go
Normal file
104
userapi/storage/presence/postgres/presence_table.go
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/eduserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const presenceSchema = `
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS presence_presence_id;
|
||||||
|
|
||||||
|
-- Stores data about presence
|
||||||
|
CREATE TABLE IF NOT EXISTS presence_presences (
|
||||||
|
-- The ID
|
||||||
|
id BIGINT PRIMARY KEY DEFAULT nextval('presence_presence_id'),
|
||||||
|
-- The Matrix user ID
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
-- The actual presence
|
||||||
|
presence INT NOT NULL,
|
||||||
|
-- The status message
|
||||||
|
status_msg TEXT,
|
||||||
|
-- The last time an action was received by this user
|
||||||
|
last_active_ts BIGINT NOT NULL,
|
||||||
|
CONSTRAINT presence_presences_unique UNIQUE (user_id)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS presence_presences_user_id ON presence_presences(user_id);
|
||||||
|
`
|
||||||
|
|
||||||
|
const upsertPresenceSQL = "" +
|
||||||
|
"INSERT INTO presence_presences" +
|
||||||
|
" (user_id, presence, status_msg, last_active_ts)" +
|
||||||
|
" VALUES ($1, $2, $3, $4)" +
|
||||||
|
" ON CONFLICT (user_id)" +
|
||||||
|
" DO UPDATE SET id = nextval('presence_presences_user_id')," +
|
||||||
|
" presence = $2, status_msg = $3, last_active_ts = $4" +
|
||||||
|
" RETURNING id"
|
||||||
|
|
||||||
|
const selectPresenceForUserSQL = "" +
|
||||||
|
"SELECT presence, status_msg, last_active_ts" +
|
||||||
|
" FROM presence_presences" +
|
||||||
|
" WHERE user_id = $1 LIMIT 1"
|
||||||
|
|
||||||
|
type presenceStatements struct {
|
||||||
|
db *sql.DB
|
||||||
|
upsertPresenceStmt *sql.Stmt
|
||||||
|
selectPresenceForUsersStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *presenceStatements) execSchema(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(presenceSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *presenceStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
if p.upsertPresenceStmt, err = db.Prepare(upsertPresenceSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if p.selectPresenceForUsersStmt, err = db.Prepare(selectPresenceForUserSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpsertPresence creates/updates a presence status.
|
||||||
|
func (p *presenceStatements) UpsertPresence(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx, userID,
|
||||||
|
statusMsg string,
|
||||||
|
presence types.PresenceStatus,
|
||||||
|
lastActiveTS int64,
|
||||||
|
) (pos int64, err error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||||
|
err = stmt.QueryRowContext(ctx, userID, presence, statusMsg, lastActiveTS).Scan(&pos)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPresenceForUser returns the current presence of a user.
|
||||||
|
func (p *presenceStatements) GetPresenceForUser(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
userID string,
|
||||||
|
) (presence api.OutputPresence, err error) {
|
||||||
|
presence.UserID = userID
|
||||||
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
|
|
||||||
|
err = stmt.QueryRowContext(ctx, userID).Scan(&presence.Presence, &presence.StatusMsg, &presence.LastActiveTS)
|
||||||
|
return
|
||||||
|
}
|
||||||
58
userapi/storage/presence/postgres/storage.go
Normal file
58
userapi/storage/presence/postgres/storage.go
Normal file
|
|
@ -0,0 +1,58 @@
|
||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/eduserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Database represents an account database
|
||||||
|
type Database struct {
|
||||||
|
db *sql.DB
|
||||||
|
writer sqlutil.Writer
|
||||||
|
sqlutil.PartitionOffsetStatements
|
||||||
|
presence presenceStatements
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
|
db, err := sqlutil.Open(dbProperties)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
d := &Database{
|
||||||
|
db: db,
|
||||||
|
writer: sqlutil.NewDummyWriter(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = d.presence.execSchema(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "presence"); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = d.presence.prepare(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) UpsertPresence(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, statusMsg string,
|
||||||
|
presence types.PresenceStatus,
|
||||||
|
lastActiveTS int64,
|
||||||
|
) (pos int64, err error) {
|
||||||
|
err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
pos, err = d.presence.UpsertPresence(ctx, txn, userID, statusMsg, presence, lastActiveTS)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresence, error) {
|
||||||
|
return d.presence.GetPresenceForUser(ctx, nil, userID)
|
||||||
|
}
|
||||||
103
userapi/storage/presence/sqlite3/presence_table.go
Normal file
103
userapi/storage/presence/sqlite3/presence_table.go
Normal file
|
|
@ -0,0 +1,103 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/eduserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const presenceSchema = `
|
||||||
|
-- Stores data about presence
|
||||||
|
CREATE TABLE IF NOT EXISTS presence_presences (
|
||||||
|
-- The ID
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
-- The Matrix user ID
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
-- The actual presence
|
||||||
|
presence INT NOT NULL,
|
||||||
|
-- The status message
|
||||||
|
status_msg TEXT,
|
||||||
|
-- The last time an action was received by this user
|
||||||
|
last_active_ts BIGINT NOT NULL,
|
||||||
|
CONSTRAINT presence_presences_unique UNIQUE (user_id)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS presence_presences_id ON presence_presences(id);
|
||||||
|
CREATE INDEX IF NOT EXISTS presence_presences_user_id ON presence_presences(user_id);
|
||||||
|
`
|
||||||
|
|
||||||
|
const upsertPresenceSQL = "" +
|
||||||
|
"INSERT INTO presence_presences" +
|
||||||
|
" (user_id, presence, status_msg, last_active_ts)" +
|
||||||
|
" VALUES ($1, $2, $3, $4)" +
|
||||||
|
" ON CONFLICT (user_id)" +
|
||||||
|
" DO UPDATE SET id = rowid+1," +
|
||||||
|
" presence = $5, status_msg = $6, last_active_ts = $7" +
|
||||||
|
" RETURNING id"
|
||||||
|
|
||||||
|
const selectPresenceForUserSQL = "" +
|
||||||
|
"SELECT presence, status_msg, last_active_ts" +
|
||||||
|
" FROM presence_presences" +
|
||||||
|
" WHERE user_id = $1 LIMIT 1"
|
||||||
|
|
||||||
|
type presenceStatements struct {
|
||||||
|
db *sql.DB
|
||||||
|
upsertPresenceStmt *sql.Stmt
|
||||||
|
selectPresenceForUsersStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *presenceStatements) execSchema(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(presenceSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *presenceStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
if p.upsertPresenceStmt, err = db.Prepare(upsertPresenceSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if p.selectPresenceForUsersStmt, err = db.Prepare(selectPresenceForUserSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpsertPresence creates/updates a presence status.
|
||||||
|
func (p *presenceStatements) UpsertPresence(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx, userID,
|
||||||
|
statusMsg string,
|
||||||
|
presence types.PresenceStatus,
|
||||||
|
lastActiveTS int64,
|
||||||
|
) (pos int64, err error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||||
|
err = stmt.QueryRowContext(ctx, userID, presence, statusMsg, lastActiveTS, presence, statusMsg, lastActiveTS).Scan(&pos)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPresenceForUser returns the current presence of a user.
|
||||||
|
func (p *presenceStatements) GetPresenceForUser(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
userID string,
|
||||||
|
) (presence api.OutputPresence, err error) {
|
||||||
|
presence.UserID = userID
|
||||||
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
|
|
||||||
|
err = stmt.QueryRowContext(ctx, userID).Scan(&presence.Presence, &presence.StatusMsg, &presence.LastActiveTS)
|
||||||
|
return
|
||||||
|
}
|
||||||
72
userapi/storage/presence/sqlite3/storage.go
Normal file
72
userapi/storage/presence/sqlite3/storage.go
Normal file
|
|
@ -0,0 +1,72 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/eduserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Database represents an account database
|
||||||
|
type Database struct {
|
||||||
|
db *sql.DB
|
||||||
|
writer sqlutil.Writer
|
||||||
|
sqlutil.PartitionOffsetStatements
|
||||||
|
presence presenceStatements
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
|
db, err := sqlutil.Open(dbProperties)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
d := &Database{
|
||||||
|
db: db,
|
||||||
|
writer: sqlutil.NewExclusiveWriter(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = d.presence.execSchema(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "presence"); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = d.presence.prepare(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) UpsertPresence(
|
||||||
|
ctx context.Context,
|
||||||
|
userID, statusMsg string,
|
||||||
|
presence types.PresenceStatus,
|
||||||
|
lastActiveTS int64,
|
||||||
|
) (pos int64, err error) {
|
||||||
|
err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
pos, err = d.presence.UpsertPresence(ctx, txn, userID, statusMsg, presence, lastActiveTS)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresence, error) {
|
||||||
|
return d.presence.GetPresenceForUser(ctx, nil, userID)
|
||||||
|
}
|
||||||
36
userapi/storage/presence/storage.go
Normal file
36
userapi/storage/presence/storage.go
Normal file
|
|
@ -0,0 +1,36 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package presence
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage/presence/postgres"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage/presence/sqlite3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme)
|
||||||
|
// and sets postgres connection parameters
|
||||||
|
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
|
||||||
|
switch {
|
||||||
|
case dbProperties.ConnectionString.IsSQLite():
|
||||||
|
return sqlite3.NewDatabase(dbProperties)
|
||||||
|
case dbProperties.ConnectionString.IsPostgres():
|
||||||
|
return postgres.NewDatabase(dbProperties)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected database type")
|
||||||
|
}
|
||||||
|
}
|
||||||
37
userapi/storage/presence/storage_wasm.go
Normal file
37
userapi/storage/presence/storage_wasm.go
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package devices
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage/presence/sqlite3"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewDatabase(
|
||||||
|
dbProperties *config.DatabaseOptions,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
) (Database, error) {
|
||||||
|
switch {
|
||||||
|
case dbProperties.ConnectionString.IsSQLite():
|
||||||
|
return sqlite3.NewDatabase(dbProperties)
|
||||||
|
case dbProperties.ConnectionString.IsPostgres():
|
||||||
|
return nil, fmt.Errorf("can't use Postgres implementation")
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected database type")
|
||||||
|
}
|
||||||
|
}
|
||||||
40
userapi/types/presence.go
Normal file
40
userapi/types/presence.go
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
type PresenceStatus int
|
||||||
|
|
||||||
|
//go:generate stringer -type=PresenceStatus -output=presence_string.go -linecomment
|
||||||
|
const (
|
||||||
|
Unknown PresenceStatus = iota - 1 // unknown
|
||||||
|
Online // online
|
||||||
|
Offline // offline
|
||||||
|
Unavailable // unavailable
|
||||||
|
)
|
||||||
|
|
||||||
|
// ToPresenceStatus tries to convert the given string to a PresenceStatus
|
||||||
|
func ToPresenceStatus(v string) PresenceStatus {
|
||||||
|
switch strings.ToLower(v) {
|
||||||
|
case "online":
|
||||||
|
return Online
|
||||||
|
case "offline":
|
||||||
|
return Offline
|
||||||
|
case "unavailable":
|
||||||
|
return Unavailable
|
||||||
|
}
|
||||||
|
return Unknown
|
||||||
|
}
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/inthttp"
|
"github.com/matrix-org/dendrite/userapi/inthttp"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage/presence"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -41,10 +42,15 @@ func NewInternalAPI(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to connect to device db")
|
logrus.WithError(err).Panicf("failed to connect to device db")
|
||||||
}
|
}
|
||||||
|
presenceDB, err := presence.NewDatabase(&cfg.PresenceDatabase)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).WithField("connString", cfg.PresenceDatabase.ConnectionString).Panicf("failed to connect to presence db")
|
||||||
|
}
|
||||||
|
|
||||||
return &internal.UserInternalAPI{
|
return &internal.UserInternalAPI{
|
||||||
AccountDB: accountDB,
|
AccountDB: accountDB,
|
||||||
DeviceDB: deviceDB,
|
DeviceDB: deviceDB,
|
||||||
|
PresenceDB: presenceDB,
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
AppServices: appServices,
|
AppServices: appServices,
|
||||||
KeyAPI: keyAPI,
|
KeyAPI: keyAPI,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue