Migration & internal API fixes

This commit is contained in:
Till Faelligen 2022-08-12 07:56:43 +02:00
parent 16fb3469af
commit 51372cbb8f
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
12 changed files with 71 additions and 64 deletions

View file

@ -22,6 +22,11 @@ import (
"sync"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
@ -32,10 +37,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
const (
@ -281,7 +282,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
// Clear our local user profile cache, if this is a membership event
if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
if err = t.userAPI.DeleteProfile(ctx, &userapi.PerformDeleteProfileRequest{UserID: event.Sender()}, &struct{}{}); err != nil {
if err = t.userAPI.PerformDeleteProfile(ctx, &userapi.PerformDeleteProfileRequest{UserID: event.Sender()}, &struct{}{}); err != nil {
// non-fatal error, log and continue
util.GetLogger(ctx).WithError(err).Warnf("Transaction: couldn't delete user profile for %s", event.Sender())
}

View file

@ -61,7 +61,7 @@ type MediaUserAPI interface {
type FederationUserAPI interface {
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error
DeleteProfile(ctx context.Context, req *PerformDeleteProfileRequest, res *struct{}) error
PerformDeleteProfile(ctx context.Context, req *PerformDeleteProfileRequest, res *struct{}) error
}
// api functions required by the sync api

View file

@ -204,9 +204,9 @@ func (t *UserInternalAPITrace) PerformSaveThreePIDAssociation(ctx context.Contex
return err
}
func (t *UserInternalAPITrace) DeleteProfile(ctx context.Context, req *PerformDeleteProfileRequest, res *struct{}) error {
err := t.Impl.DeleteProfile(ctx, req, res)
util.GetLogger(ctx).Infof("DeleteProfile req=%+v res=%+v", js(req), js(res))
func (t *UserInternalAPITrace) PerformDeleteProfile(ctx context.Context, req *PerformDeleteProfileRequest, res *struct{}) error {
err := t.Impl.PerformDeleteProfile(ctx, req, res)
util.GetLogger(ctx).Infof("PerformDeleteProfile req=%+v res=%+v", js(req), js(res))
return err
}

View file

@ -873,7 +873,7 @@ func (a *UserInternalAPI) PerformSaveThreePIDAssociation(ctx context.Context, re
return a.DB.SaveThreePIDAssociation(ctx, req.ThreePID, req.Localpart, req.Medium)
}
func (a *UserInternalAPI) DeleteProfile(ctx context.Context, req *api.PerformDeleteProfileRequest, res *struct{}) error {
func (a *UserInternalAPI) PerformDeleteProfile(ctx context.Context, req *api.PerformDeleteProfileRequest, res *struct{}) error {
localpart, serverName, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
return err

View file

@ -441,10 +441,9 @@ func (h *httpUserInternalAPI) PerformSaveThreePIDAssociation(
)
}
func (h *httpUserInternalAPI) DeleteProfile(ctx context.Context, req *api.PerformDeleteProfileRequest, res *struct{}) error {
span, ctx := opentracing.StartSpanFromContext(ctx, PerformDeleteUserProfilePath)
defer span.Finish()
apiURL := h.apiURL + PerformDeleteUserProfilePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
func (h *httpUserInternalAPI) PerformDeleteProfile(ctx context.Context, request *api.PerformDeleteProfileRequest, response *struct{}) error {
return httputil.CallInternalRPCAPI(
"PerformDeleteProfile", h.apiURL+PerformDeleteUserProfilePath,
h.httpClient, ctx, request, response,
)
}

View file

@ -18,9 +18,10 @@ import (
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
// nolint: gocyclo
@ -197,16 +198,8 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
PerformSaveThreePIDAssociationPath,
httputil.MakeInternalRPCAPI("UserAPIPerformSaveThreePIDAssociation", s.PerformSaveThreePIDAssociation),
)
internalAPIMux.Handle(PerformDeleteUserProfilePath,
httputil.MakeInternalAPI("performDeleteUserProfilePath", func(req *http.Request) util.JSONResponse {
request := api.PerformDeleteProfileRequest{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := s.DeleteProfile(req.Context(), &request, &struct{}{}); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &struct{}{}}
}),
internalAPIMux.Handle(
PerformDeleteUserProfilePath,
httputil.MakeInternalRPCAPI("UserAPIPerformDeleteUserProfilePath", s.PerformDeleteProfile),
)
}

View file

@ -1,33 +1,28 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
var serverName gomatrixserverlib.ServerName
func LoadProfilePrimaryKey(m *sqlutil.Migrations, s gomatrixserverlib.ServerName) {
serverName = s
m.AddMigration(UpProfilePrimaryKey, DownProfilePrimaryKey)
}
func UpProfilePrimaryKey(tx *sql.Tx) error {
_, err := tx.Exec(fmt.Sprintf(`ALTER TABLE account_profiles ADD COLUMN IF NOT EXISTS server_name TEXT NOT NULL DEFAULT '%s';
func UpProfilePrimaryKey(serverName gomatrixserverlib.ServerName) func(context.Context, *sql.Tx) error {
return func(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE account_profiles ADD COLUMN IF NOT EXISTS server_name TEXT NOT NULL DEFAULT '%s';
ALTER TABLE account_profiles DROP CONSTRAINT account_profiles_pkey;
ALTER TABLE account_profiles ADD PRIMARY KEY (localpart, server_name);
ALTER TABLE account_profiles ALTER COLUMN server_name DROP DEFAULT;`, serverName))
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
return nil
}
func DownProfilePrimaryKey(tx *sql.Tx) error {
_, err := tx.Exec(`ALTER TABLE account_profiles DROP COLUMN IF EXISTS server_name;
func DownProfilePrimaryKey(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `ALTER TABLE account_profiles DROP COLUMN IF EXISTS server_name;
ALTER TABLE account_profiles ADD PRIMARY KEY(localpart);`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)

View file

@ -19,11 +19,13 @@ import (
"database/sql"
"fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
)
const profilesSchema = `
@ -69,7 +71,7 @@ type profilesStatements struct {
deleteProfileStmt *sql.Stmt
}
func NewPostgresProfilesTable(db *sql.DB, serverNoticesLocalpart string) (tables.ProfileTable, error) {
func NewPostgresProfilesTable(db *sql.DB, serverNoticesLocalpart string, serverName gomatrixserverlib.ServerName) (tables.ProfileTable, error) {
s := &profilesStatements{
serverNoticesLocalpart: serverNoticesLocalpart,
}
@ -77,6 +79,16 @@ func NewPostgresProfilesTable(db *sql.DB, serverNoticesLocalpart string) (tables
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "userapi: add server_name column (account_profiles)",
Up: deltas.UpProfilePrimaryKey(serverName),
})
if err := m.Up(context.Background()); err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertProfileStmt, insertProfileSQL},
{&s.selectProfileByLocalpartStmt, selectProfileByLocalpartSQL},

View file

@ -64,7 +64,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil {
return nil, fmt.Errorf("NewPostgresOpenIDTable: %w", err)
}
profilesTable, err := NewPostgresProfilesTable(db, serverNoticesLocalpart)
profilesTable, err := NewPostgresProfilesTable(db, serverNoticesLocalpart, serverName)
if err != nil {
return nil, fmt.Errorf("NewPostgresProfilesTable: %w", err)
}

View file

@ -1,22 +1,16 @@
package deltas
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
var serverName gomatrixserverlib.ServerName
func LoadProfilePrimaryKey(m *sqlutil.Migrations, s gomatrixserverlib.ServerName) {
serverName = s
m.AddMigration(UpProfilePrimaryKey, DownProfilePrimaryKey)
}
func UpProfilePrimaryKey(tx *sql.Tx) error {
_, err := tx.Exec(fmt.Sprintf(`
func UpProfilePrimaryKey(serverName gomatrixserverlib.ServerName) func(context.Context, *sql.Tx) error {
return func(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, fmt.Sprintf(`
ALTER TABLE account_profiles RENAME TO account_profiles_tmp;
CREATE TABLE IF NOT EXISTS account_profiles (
localpart TEXT NOT NULL,
@ -32,14 +26,15 @@ func UpProfilePrimaryKey(tx *sql.Tx) error {
localpart, '%s', display_name, avatar_url
FROM account_profiles_tmp;
DROP TABLE account_profiles_tmp;`, serverName))
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
return nil
}
func DownProfilePrimaryKey(tx *sql.Tx) error {
_, err := tx.Exec(`
func DownProfilePrimaryKey(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `
ALTER TABLE account_profiles RENAME TO account_profiles_tmp;
CREATE TABLE IF NOT EXISTS account_profiles (
localpart TEXT NOT NULL PRIMARY KEY,

View file

@ -19,11 +19,13 @@ import (
"database/sql"
"fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
)
const profilesSchema = `
@ -70,7 +72,7 @@ type profilesStatements struct {
deleteProfileStmt *sql.Stmt
}
func NewSQLiteProfilesTable(db *sql.DB, serverNoticesLocalpart string) (tables.ProfileTable, error) {
func NewSQLiteProfilesTable(db *sql.DB, serverNoticesLocalpart string, serverName gomatrixserverlib.ServerName) (tables.ProfileTable, error) {
s := &profilesStatements{
db: db,
serverNoticesLocalpart: serverNoticesLocalpart,
@ -79,6 +81,16 @@ func NewSQLiteProfilesTable(db *sql.DB, serverNoticesLocalpart string) (tables.P
if err != nil {
return nil, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "userapi: add server_name column (account_profiles)",
Up: deltas.UpProfilePrimaryKey(serverName),
})
if err := m.Up(context.Background()); err != nil {
return nil, err
}
return s, sqlutil.StatementList{
{&s.insertProfileStmt, insertProfileSQL},
{&s.selectProfileByLocalpartStmt, selectProfileByLocalpartSQL},

View file

@ -62,7 +62,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil {
return nil, fmt.Errorf("NewSQLiteOpenIDTable: %w", err)
}
profilesTable, err := NewSQLiteProfilesTable(db, serverNoticesLocalpart)
profilesTable, err := NewSQLiteProfilesTable(db, serverNoticesLocalpart, serverName)
if err != nil {
return nil, fmt.Errorf("NewSQLiteProfilesTable: %w", err)
}