mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 05:43:09 -06:00
Merge branch 'master' into neilalexander/mediar0
This commit is contained in:
commit
435fd0d28e
|
|
@ -16,7 +16,6 @@ package appservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -29,12 +28,10 @@ import (
|
|||
"github.com/matrix-org/dendrite/appservice/storage"
|
||||
"github.com/matrix-org/dendrite/appservice/types"
|
||||
"github.com/matrix-org/dendrite/appservice/workers"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/setup"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
|
@ -47,8 +44,7 @@ func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQuer
|
|||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(
|
||||
base *setup.BaseDendrite,
|
||||
accountsDB accounts.Database,
|
||||
deviceDB devices.Database,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
) appserviceAPI.AppServiceQueryAPI {
|
||||
// Create a connection to the appservice postgres DB
|
||||
|
|
@ -70,7 +66,7 @@ func NewInternalAPI(
|
|||
workerStates[i] = ws
|
||||
|
||||
// Create bot account for this AS if it doesn't already exist
|
||||
if err = generateAppServiceAccount(accountsDB, deviceDB, appservice); err != nil {
|
||||
if err = generateAppServiceAccount(userAPI, appservice); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"appservice": appservice.ID,
|
||||
}).WithError(err).Panicf("failed to generate bot account for appservice")
|
||||
|
|
@ -90,7 +86,7 @@ func NewInternalAPI(
|
|||
// We can't add ASes at runtime so this is safe to do.
|
||||
if len(workerStates) > 0 {
|
||||
consumer := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
|
||||
base.Cfg, base.KafkaConsumer, appserviceDB,
|
||||
rsAPI, workerStates,
|
||||
)
|
||||
if err := consumer.Start(); err != nil {
|
||||
|
|
@ -109,22 +105,24 @@ func NewInternalAPI(
|
|||
// `sender_localpart` field of each application service if it doesn't
|
||||
// exist already
|
||||
func generateAppServiceAccount(
|
||||
accountsDB accounts.Database,
|
||||
deviceDB devices.Database,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
as config.ApplicationService,
|
||||
) error {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create an account for the application service
|
||||
_, err := accountsDB.CreateAccount(ctx, as.SenderLocalpart, "", as.ID)
|
||||
var accRes userapi.PerformAccountCreationResponse
|
||||
err := userAPI.PerformAccountCreation(context.Background(), &userapi.PerformAccountCreationRequest{
|
||||
Localpart: as.SenderLocalpart,
|
||||
AppServiceID: as.ID,
|
||||
OnConflict: userapi.ConflictUpdate,
|
||||
}, &accRes)
|
||||
if err != nil {
|
||||
if errors.Is(err, sqlutil.ErrUserExists) { // This account already exists
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a dummy device with a dummy token for the application service
|
||||
_, err = deviceDB.CreateDevice(ctx, as.SenderLocalpart, nil, as.ASToken, &as.SenderLocalpart)
|
||||
var devRes userapi.PerformDeviceCreationResponse
|
||||
err = userAPI.PerformDeviceCreation(context.Background(), &userapi.PerformDeviceCreationRequest{
|
||||
Localpart: as.SenderLocalpart,
|
||||
AccessToken: as.ASToken,
|
||||
DeviceID: &as.SenderLocalpart,
|
||||
DeviceDisplayName: &as.SenderLocalpart,
|
||||
}, &devRes)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/appservice/storage"
|
||||
"github.com/matrix-org/dendrite/appservice/types"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -33,7 +32,6 @@ import (
|
|||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||
type OutputRoomEventConsumer struct {
|
||||
roomServerConsumer *internal.ContinualConsumer
|
||||
db accounts.Database
|
||||
asDB storage.Database
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
serverName string
|
||||
|
|
@ -45,7 +43,6 @@ type OutputRoomEventConsumer struct {
|
|||
func NewOutputRoomEventConsumer(
|
||||
cfg *config.Dendrite,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store accounts.Database,
|
||||
appserviceDB storage.Database,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
workerStates []types.ApplicationServiceWorkerState,
|
||||
|
|
@ -53,11 +50,10 @@ func NewOutputRoomEventConsumer(
|
|||
consumer := internal.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: store,
|
||||
PartitionStore: appserviceDB,
|
||||
}
|
||||
s := &OutputRoomEventConsumer{
|
||||
roomServerConsumer: &consumer,
|
||||
db: store,
|
||||
asDB: appserviceDB,
|
||||
rsAPI: rsAPI,
|
||||
serverName: string(cfg.Matrix.ServerName),
|
||||
|
|
|
|||
|
|
@ -17,10 +17,12 @@ package storage
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database interface {
|
||||
internal.PartitionStorer
|
||||
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error
|
||||
GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error)
|
||||
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import (
|
|||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
sqlutil.PartitionOffsetStatements
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
|
|
@ -42,6 +43,9 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (*Dat
|
|||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import (
|
|||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
sqlutil.PartitionOffsetStatements
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
|
|
@ -46,6 +47,9 @@ func NewDatabase(dataSourceName string) (*Database, error) {
|
|||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,10 @@ type Database interface {
|
|||
GetMembershipsByLocalpart(ctx context.Context, localpart string) (memberships []authtypes.Membership, err error)
|
||||
SaveAccountData(ctx context.Context, localpart, roomID, dataType, content string) error
|
||||
GetAccountData(ctx context.Context, localpart string) (global []gomatrixserverlib.ClientEvent, rooms map[string][]gomatrixserverlib.ClientEvent, err error)
|
||||
// GetAccountDataByType returns account data matching a given
|
||||
// localpart, room ID and type.
|
||||
// If no account data could be found, returns nil
|
||||
// Returns an error if there was an issue with the retrieval
|
||||
GetAccountDataByType(ctx context.Context, localpart, roomID, dataType string) (data *gomatrixserverlib.ClientEvent, err error)
|
||||
GetNewNumericLocalpart(ctx context.Context) (int64, error)
|
||||
SaveThreePIDAssociation(ctx context.Context, threepid, localpart, medium string) (err error)
|
||||
|
|
|
|||
|
|
@ -24,6 +24,12 @@ type Database interface {
|
|||
GetDeviceByAccessToken(ctx context.Context, token string) (*api.Device, error)
|
||||
GetDeviceByID(ctx context.Context, localpart, deviceID string) (*api.Device, error)
|
||||
GetDevicesByLocalpart(ctx context.Context, localpart string) ([]api.Device, error)
|
||||
// CreateDevice makes a new device associated with the given user ID localpart.
|
||||
// If there is already a device with the same device ID for this user, that access token will be revoked
|
||||
// and replaced with the given accessToken. If the given accessToken is already in use for another device,
|
||||
// an error will be returned.
|
||||
// If no device ID is given one is generated.
|
||||
// Returns the device on success.
|
||||
CreateDevice(ctx context.Context, localpart string, deviceID *string, accessToken string, displayName *string) (dev *api.Device, returnErr error)
|
||||
UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error
|
||||
RemoveDevice(ctx context.Context, deviceID, localpart string) error
|
||||
|
|
|
|||
|
|
@ -24,11 +24,10 @@ func main() {
|
|||
base := setup.NewBaseDendrite(cfg, "AppServiceAPI", true)
|
||||
|
||||
defer base.Close() // nolint: errcheck
|
||||
accountDB := base.CreateAccountsDB()
|
||||
deviceDB := base.CreateDeviceDB()
|
||||
userAPI := base.UserAPIClient()
|
||||
rsAPI := base.RoomserverHTTPClient()
|
||||
|
||||
intAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI)
|
||||
intAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
appservice.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
||||
|
||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.AppServiceAPI), string(base.Cfg.Listen.AppServiceAPI))
|
||||
|
|
|
|||
|
|
@ -141,6 +141,7 @@ func main() {
|
|||
accountDB := base.Base.CreateAccountsDB()
|
||||
deviceDB := base.Base.CreateDeviceDB()
|
||||
federation := createFederationClient(base)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
|
||||
|
||||
serverKeyAPI := serverkeyapi.NewInternalAPI(
|
||||
base.Base.Cfg, federation, base.Base.Caches,
|
||||
|
|
@ -154,9 +155,9 @@ func main() {
|
|||
&base.Base, keyRing, federation,
|
||||
)
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
&base.Base, cache.New(), deviceDB,
|
||||
&base.Base, cache.New(), userAPI,
|
||||
)
|
||||
asAPI := appservice.NewInternalAPI(&base.Base, accountDB, deviceDB, rsAPI)
|
||||
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
||||
fsAPI := federationsender.NewInternalAPI(
|
||||
&base.Base, federation, rsAPI, keyRing,
|
||||
)
|
||||
|
|
@ -165,7 +166,6 @@ func main() {
|
|||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||
}
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Base.Cfg,
|
||||
|
|
|
|||
|
|
@ -130,16 +130,18 @@ func main() {
|
|||
serverKeyAPI := &signing.YggdrasilKeys{}
|
||||
keyRing := serverKeyAPI.KeyRing()
|
||||
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
|
||||
|
||||
rsComponent := roomserver.NewInternalAPI(
|
||||
base, keyRing, federation,
|
||||
)
|
||||
rsAPI := rsComponent
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
base, cache.New(), deviceDB,
|
||||
base, cache.New(), userAPI,
|
||||
)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI)
|
||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
|
||||
fsAPI := federationsender.NewInternalAPI(
|
||||
base, federation, rsAPI, keyRing,
|
||||
|
|
@ -153,7 +155,6 @@ func main() {
|
|||
}
|
||||
|
||||
embed.Embed(*instancePort, "Yggdrasil Demo")
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
|
|
|
|||
|
|
@ -29,9 +29,8 @@ func main() {
|
|||
logrus.WithError(err).Warn("BaseDendrite close failed")
|
||||
}
|
||||
}()
|
||||
deviceDB := base.CreateDeviceDB()
|
||||
|
||||
intAPI := eduserver.NewInternalAPI(base, cache.New(), deviceDB)
|
||||
intAPI := eduserver.NewInternalAPI(base, cache.New(), base.UserAPIClient())
|
||||
eduserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
||||
|
||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.EDUServer), string(base.Cfg.Listen.EDUServer))
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ func main() {
|
|||
serverKeyAPI = base.ServerKeyAPIClient()
|
||||
}
|
||||
keyRing := serverKeyAPI.KeyRing()
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices)
|
||||
|
||||
rsImpl := roomserver.NewInternalAPI(
|
||||
base, keyRing, federation,
|
||||
|
|
@ -92,14 +93,14 @@ func main() {
|
|||
}
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
base, cache.New(), deviceDB,
|
||||
base, cache.New(), userAPI,
|
||||
)
|
||||
if base.UseHTTPAPIs {
|
||||
eduserver.AddInternalRoutes(base.InternalAPIMux, eduInputAPI)
|
||||
eduInputAPI = base.EDUServerClient()
|
||||
}
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI)
|
||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
if base.UseHTTPAPIs {
|
||||
appservice.AddInternalRoutes(base.InternalAPIMux, asAPI)
|
||||
asAPI = base.AppserviceHTTPClient()
|
||||
|
|
@ -121,8 +122,6 @@ func main() {
|
|||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||
}
|
||||
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
|
|
|
|||
|
|
@ -25,12 +25,11 @@ func main() {
|
|||
defer base.Close() // nolint: errcheck
|
||||
|
||||
userAPI := base.UserAPIClient()
|
||||
accountDB := base.CreateAccountsDB()
|
||||
federation := base.CreateFederationClient()
|
||||
|
||||
rsAPI := base.RoomserverHTTPClient()
|
||||
|
||||
syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, accountDB, rsAPI, federation, cfg)
|
||||
syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg)
|
||||
|
||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))
|
||||
|
||||
|
|
|
|||
|
|
@ -194,6 +194,7 @@ func main() {
|
|||
accountDB := base.CreateAccountsDB()
|
||||
deviceDB := base.CreateDeviceDB()
|
||||
federation := createFederationClient(cfg, node)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
|
||||
|
||||
fetcher := &libp2pKeyFetcher{}
|
||||
keyRing := gomatrixserverlib.KeyRing{
|
||||
|
|
@ -204,9 +205,9 @@ func main() {
|
|||
}
|
||||
|
||||
rsAPI := roomserver.NewInternalAPI(base, keyRing, federation)
|
||||
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), deviceDB)
|
||||
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
|
||||
asQuery := appservice.NewInternalAPI(
|
||||
base, accountDB, deviceDB, rsAPI,
|
||||
base, userAPI, rsAPI,
|
||||
)
|
||||
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing)
|
||||
rsAPI.SetFederationSenderAPI(fedSenderAPI)
|
||||
|
|
@ -217,8 +218,6 @@ func main() {
|
|||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||
}
|
||||
|
||||
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
|
|
|
|||
|
|
@ -39,6 +39,8 @@ Internal only | `-------------------
|
|||
- 12 (FedSender -> ServerKeyAPI): Verifying event signatures of responses (e.g from send_join)
|
||||
- 13 (Roomserver -> ServerKeyAPI): Verifying event signatures of backfilled events
|
||||
|
||||
In addition to this, all public facing components (Tier 1) talk to the `UserAPI` to verify access tokens and extract profile information where needed.
|
||||
|
||||
## Kafka logs
|
||||
|
||||
```
|
||||
|
|
|
|||
|
|
@ -18,12 +18,12 @@ package eduserver
|
|||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/eduserver/input"
|
||||
"github.com/matrix-org/dendrite/eduserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/internal/setup"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
||||
|
|
@ -37,11 +37,11 @@ func AddInternalRoutes(internalMux *mux.Router, inputAPI api.EDUServerInputAPI)
|
|||
func NewInternalAPI(
|
||||
base *setup.BaseDendrite,
|
||||
eduCache *cache.EDUCache,
|
||||
deviceDB devices.Database,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
) api.EDUServerInputAPI {
|
||||
return &input.EDUServerInputAPI{
|
||||
Cache: eduCache,
|
||||
DeviceDB: deviceDB,
|
||||
UserAPI: userAPI,
|
||||
Producer: base.KafkaProducer,
|
||||
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
|
||||
OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEvent),
|
||||
|
|
|
|||
|
|
@ -22,9 +22,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
|
@ -39,8 +39,8 @@ type EDUServerInputAPI struct {
|
|||
OutputSendToDeviceEventTopic string
|
||||
// kafka producer
|
||||
Producer sarama.SyncProducer
|
||||
// device database
|
||||
DeviceDB devices.Database
|
||||
// Internal user query API
|
||||
UserAPI userapi.UserInternalAPI
|
||||
// our server name
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
}
|
||||
|
|
@ -115,7 +115,7 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
|
|||
|
||||
func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error {
|
||||
devices := []string{}
|
||||
localpart, domain, err := gomatrixserverlib.SplitID('@', ise.UserID)
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', ise.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -126,11 +126,14 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
|
|||
// wildcard as we don't know about the remote devices, so instead we leave it
|
||||
// as-is, so that the federation sender can send it on with the wildcard intact.
|
||||
if domain == t.ServerName && ise.DeviceID == "*" {
|
||||
devs, err := t.DeviceDB.GetDevicesByLocalpart(context.TODO(), localpart)
|
||||
var res userapi.QueryDevicesResponse
|
||||
err = t.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
|
||||
UserID: ise.UserID,
|
||||
}, &res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, dev := range devs {
|
||||
for _, dev := range res.Devices {
|
||||
devices = append(devices, dev.ID)
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -87,6 +87,6 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
|
|||
m.ExtPublicRoomsProvider,
|
||||
)
|
||||
syncapi.AddPublicRoutes(
|
||||
publicMux, m.KafkaConsumer, m.UserAPI, m.AccountDB, m.RoomserverAPI, m.FedClient, m.Config,
|
||||
publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import (
|
|||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
|
@ -33,14 +32,14 @@ import (
|
|||
|
||||
// RequestPool manages HTTP long-poll connections for /sync
|
||||
type RequestPool struct {
|
||||
db storage.Database
|
||||
accountDB accounts.Database
|
||||
notifier *Notifier
|
||||
db storage.Database
|
||||
userAPI userapi.UserInternalAPI
|
||||
notifier *Notifier
|
||||
}
|
||||
|
||||
// NewRequestPool makes a new RequestPool
|
||||
func NewRequestPool(db storage.Database, n *Notifier, adb accounts.Database) *RequestPool {
|
||||
return &RequestPool{db, adb, n}
|
||||
func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool {
|
||||
return &RequestPool{db, userAPI, n}
|
||||
}
|
||||
|
||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||
|
|
@ -193,6 +192,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
|||
return
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (rp *RequestPool) appendAccountData(
|
||||
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
||||
accountDataFilter *gomatrixserverlib.EventFilter,
|
||||
|
|
@ -202,25 +202,21 @@ func (rp *RequestPool) appendAccountData(
|
|||
// data keys were set between two message. This isn't a huge issue since the
|
||||
// duplicate data doesn't represent a huge quantity of data, but an optimisation
|
||||
// here would be making sure each data is sent only once to the client.
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.since == nil {
|
||||
// If this is the initial sync, we don't need to check if a data has
|
||||
// already been sent. Instead, we send the whole batch.
|
||||
var global []gomatrixserverlib.ClientEvent
|
||||
var rooms map[string][]gomatrixserverlib.ClientEvent
|
||||
global, rooms, err = rp.accountDB.GetAccountData(req.ctx, localpart)
|
||||
var res userapi.QueryAccountDataResponse
|
||||
err := rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{
|
||||
UserID: userID,
|
||||
}, &res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data.AccountData.Events = global
|
||||
data.AccountData.Events = res.GlobalAccountData
|
||||
|
||||
for r, j := range data.Rooms.Join {
|
||||
if len(rooms[r]) > 0 {
|
||||
j.AccountData.Events = rooms[r]
|
||||
if len(res.RoomAccountData[r]) > 0 {
|
||||
j.AccountData.Events = res.RoomAccountData[r]
|
||||
data.Rooms.Join[r] = j
|
||||
}
|
||||
}
|
||||
|
|
@ -256,13 +252,20 @@ func (rp *RequestPool) appendAccountData(
|
|||
events := []gomatrixserverlib.ClientEvent{}
|
||||
// Request the missing data from the database
|
||||
for _, dataType := range dataTypes {
|
||||
event, err := rp.accountDB.GetAccountDataByType(
|
||||
req.ctx, localpart, roomID, dataType,
|
||||
)
|
||||
var res userapi.QueryAccountDataResponse
|
||||
err = rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{
|
||||
UserID: userID,
|
||||
RoomID: roomID,
|
||||
DataType: dataType,
|
||||
}, &res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events = append(events, *event)
|
||||
if len(res.RoomAccountData[roomID]) > 0 {
|
||||
events = append(events, res.RoomAccountData[roomID]...)
|
||||
} else if len(res.GlobalAccountData) > 0 {
|
||||
events = append(events, res.GlobalAccountData...)
|
||||
}
|
||||
}
|
||||
|
||||
// Append the data to the response
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import (
|
|||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
|
|
@ -39,7 +38,6 @@ func AddPublicRoutes(
|
|||
router *mux.Router,
|
||||
consumer sarama.Consumer,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
accountsDB accounts.Database,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
cfg *config.Dendrite,
|
||||
|
|
@ -60,7 +58,7 @@ func AddPublicRoutes(
|
|||
logrus.WithError(err).Panicf("failed to start notifier")
|
||||
}
|
||||
|
||||
requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB)
|
||||
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI)
|
||||
|
||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||
cfg, consumer, notifier, syncDB, rsAPI,
|
||||
|
|
|
|||
|
|
@ -14,13 +14,20 @@
|
|||
|
||||
package api
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// UserInternalAPI is the internal API for information about users and devices.
|
||||
type UserInternalAPI interface {
|
||||
PerformAccountCreation(ctx context.Context, req *PerformAccountCreationRequest, res *PerformAccountCreationResponse) error
|
||||
PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error
|
||||
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error
|
||||
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
|
||||
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
|
||||
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
|
||||
}
|
||||
|
||||
// QueryAccessTokenRequest is the request for QueryAccessToken
|
||||
|
|
@ -37,6 +44,22 @@ type QueryAccessTokenResponse struct {
|
|||
Err error // e.g ErrorForbidden
|
||||
}
|
||||
|
||||
// QueryAccountDataRequest is the request for QueryAccountData
|
||||
type QueryAccountDataRequest struct {
|
||||
UserID string // required: the user to get account data for.
|
||||
// TODO: This is a terribly confusing API shape :/
|
||||
DataType string // optional: if specified returns only a single event matching this data type.
|
||||
// optional: Only used if DataType is set. If blank returns global account data matching the data type.
|
||||
// If set, returns only room account data matching this data type.
|
||||
RoomID string
|
||||
}
|
||||
|
||||
// QueryAccountDataResponse is the response for QueryAccountData
|
||||
type QueryAccountDataResponse struct {
|
||||
GlobalAccountData []gomatrixserverlib.ClientEvent
|
||||
RoomAccountData map[string][]gomatrixserverlib.ClientEvent
|
||||
}
|
||||
|
||||
// QueryDevicesRequest is the request for QueryDevices
|
||||
type QueryDevicesRequest struct {
|
||||
UserID string
|
||||
|
|
@ -64,6 +87,38 @@ type QueryProfileResponse struct {
|
|||
AvatarURL string
|
||||
}
|
||||
|
||||
// PerformAccountCreationRequest is the request for PerformAccountCreation
|
||||
type PerformAccountCreationRequest struct {
|
||||
Localpart string
|
||||
AppServiceID string
|
||||
Password string
|
||||
OnConflict Conflict
|
||||
}
|
||||
|
||||
// PerformAccountCreationResponse is the response for PerformAccountCreation
|
||||
type PerformAccountCreationResponse struct {
|
||||
AccountCreated bool
|
||||
UserID string
|
||||
}
|
||||
|
||||
// PerformDeviceCreationRequest is the request for PerformDeviceCreation
|
||||
type PerformDeviceCreationRequest struct {
|
||||
Localpart string
|
||||
AccessToken string // optional: if blank one will be made on your behalf
|
||||
// optional: if nil an ID is generated for you. If set, replaces any existing device session,
|
||||
// which will generate a new access token and invalidate the old one.
|
||||
DeviceID *string
|
||||
// optional: if nil no display name will be associated with this device.
|
||||
DeviceDisplayName *string
|
||||
}
|
||||
|
||||
// PerformDeviceCreationResponse is the response for PerformDeviceCreation
|
||||
type PerformDeviceCreationResponse struct {
|
||||
DeviceCreated bool
|
||||
AccessToken string
|
||||
DeviceID string
|
||||
}
|
||||
|
||||
// Device represents a client's device (mobile, web, etc)
|
||||
type Device struct {
|
||||
ID string
|
||||
|
|
@ -87,3 +142,22 @@ type ErrorForbidden struct {
|
|||
func (e *ErrorForbidden) Error() string {
|
||||
return "Forbidden: " + e.Message
|
||||
}
|
||||
|
||||
// ErrorConflict is an error indicating that there was a conflict which resulted in the request being aborted.
|
||||
type ErrorConflict struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
func (e *ErrorConflict) Error() string {
|
||||
return "Conflict: " + e.Message
|
||||
}
|
||||
|
||||
// Conflict is an enum representing what to do when encountering conflicting when creating profiles/devices
|
||||
type Conflict int
|
||||
|
||||
const (
|
||||
// ConflictUpdate will update matching records returning no error
|
||||
ConflictUpdate Conflict = 1
|
||||
// ConflictAbort will reject the request with ErrorConflict
|
||||
ConflictAbort Conflict = 2
|
||||
)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package internal
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice/types"
|
||||
|
|
@ -24,6 +25,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/clientapi/userutil"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
|
@ -36,6 +38,38 @@ type UserInternalAPI struct {
|
|||
AppServices []config.ApplicationService
|
||||
}
|
||||
|
||||
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
|
||||
acc, err := a.AccountDB.CreateAccount(ctx, req.Localpart, req.Password, req.AppServiceID)
|
||||
if err != nil {
|
||||
if errors.Is(err, sqlutil.ErrUserExists) { // This account already exists
|
||||
switch req.OnConflict {
|
||||
case api.ConflictUpdate:
|
||||
break
|
||||
case api.ConflictAbort:
|
||||
return &api.ErrorConflict{
|
||||
Message: err.Error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
res.AccountCreated = false
|
||||
res.UserID = fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName)
|
||||
return nil
|
||||
}
|
||||
res.AccountCreated = true
|
||||
res.UserID = acc.UserID
|
||||
return nil
|
||||
}
|
||||
func (a *UserInternalAPI) PerformDeviceCreation(ctx context.Context, req *api.PerformDeviceCreationRequest, res *api.PerformDeviceCreationResponse) error {
|
||||
dev, err := a.DeviceDB.CreateDevice(ctx, req.Localpart, req.DeviceID, req.AccessToken, req.DeviceDisplayName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res.DeviceCreated = true
|
||||
res.AccessToken = dev.AccessToken
|
||||
res.DeviceID = dev.ID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *UserInternalAPI) QueryProfile(ctx context.Context, req *api.QueryProfileRequest, res *api.QueryProfileResponse) error {
|
||||
local, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
|
|
@ -73,6 +107,39 @@ func (a *UserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDevice
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *UserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error {
|
||||
local, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if domain != a.ServerName {
|
||||
return fmt.Errorf("cannot query account data of remote users: got %s want %s", domain, a.ServerName)
|
||||
}
|
||||
if req.DataType != "" {
|
||||
var event *gomatrixserverlib.ClientEvent
|
||||
event, err = a.AccountDB.GetAccountDataByType(ctx, local, req.RoomID, req.DataType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if event != nil {
|
||||
if req.RoomID != "" {
|
||||
res.RoomAccountData = make(map[string][]gomatrixserverlib.ClientEvent)
|
||||
res.RoomAccountData[req.RoomID] = []gomatrixserverlib.ClientEvent{*event}
|
||||
} else {
|
||||
res.GlobalAccountData = append(res.GlobalAccountData, *event)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
global, rooms, err := a.AccountDB.GetAccountData(ctx, local)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res.RoomAccountData = rooms
|
||||
res.GlobalAccountData = global
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *UserInternalAPI) QueryAccessToken(ctx context.Context, req *api.QueryAccessTokenRequest, res *api.QueryAccessTokenResponse) error {
|
||||
if req.AppServiceUserID != "" {
|
||||
appServiceDevice, err := a.queryAppServiceToken(ctx, req.AccessToken, req.AppServiceUserID)
|
||||
|
|
|
|||
|
|
@ -26,9 +26,13 @@ import (
|
|||
|
||||
// HTTP paths for the internal HTTP APIs
|
||||
const (
|
||||
PerformDeviceCreationPath = "/userapi/performDeviceCreation"
|
||||
PerformAccountCreationPath = "/userapi/performAccountCreation"
|
||||
|
||||
QueryProfilePath = "/userapi/queryProfile"
|
||||
QueryAccessTokenPath = "/userapi/queryAccessToken"
|
||||
QueryDevicesPath = "/userapi/queryDevices"
|
||||
QueryAccountDataPath = "/userapi/queryAccountData"
|
||||
)
|
||||
|
||||
// NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API.
|
||||
|
|
@ -51,6 +55,30 @@ type httpUserInternalAPI struct {
|
|||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func (h *httpUserInternalAPI) PerformAccountCreation(
|
||||
ctx context.Context,
|
||||
request *api.PerformAccountCreationRequest,
|
||||
response *api.PerformAccountCreationResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformAccountCreation")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + PerformAccountCreationPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
func (h *httpUserInternalAPI) PerformDeviceCreation(
|
||||
ctx context.Context,
|
||||
request *api.PerformDeviceCreationRequest,
|
||||
response *api.PerformDeviceCreationResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDeviceCreation")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + PerformDeviceCreationPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
func (h *httpUserInternalAPI) QueryProfile(
|
||||
ctx context.Context,
|
||||
request *api.QueryProfileRequest,
|
||||
|
|
@ -82,3 +110,11 @@ func (h *httpUserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDe
|
|||
apiURL := h.apiURL + QueryDevicesPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||
}
|
||||
|
||||
func (h *httpUserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryAccountData")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + QueryAccountDataPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,32 @@ import (
|
|||
)
|
||||
|
||||
func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
|
||||
internalAPIMux.Handle(PerformAccountCreationPath,
|
||||
httputil.MakeInternalAPI("performAccountCreation", func(req *http.Request) util.JSONResponse {
|
||||
request := api.PerformAccountCreationRequest{}
|
||||
response := api.PerformAccountCreationResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := s.PerformAccountCreation(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(PerformDeviceCreationPath,
|
||||
httputil.MakeInternalAPI("performDeviceCreation", func(req *http.Request) util.JSONResponse {
|
||||
request := api.PerformDeviceCreationRequest{}
|
||||
response := api.PerformDeviceCreationResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := s.PerformDeviceCreation(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(QueryProfilePath,
|
||||
httputil.MakeInternalAPI("queryProfile", func(req *http.Request) util.JSONResponse {
|
||||
request := api.QueryProfileRequest{}
|
||||
|
|
@ -64,4 +90,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(QueryAccountDataPath,
|
||||
httputil.MakeInternalAPI("queryAccountData", func(req *http.Request) util.JSONResponse {
|
||||
request := api.QueryAccountDataRequest{}
|
||||
response := api.QueryAccountDataResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := s.QueryAccountData(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue