Make syncapi use userapi (#1136)
* Make syncapi use userapi * Unbreak things * Fix tests * Lint
This commit is contained in:
parent
1942928ee5
commit
83391da0e0
|
@ -25,12 +25,11 @@ func main() {
|
||||||
defer base.Close() // nolint: errcheck
|
defer base.Close() // nolint: errcheck
|
||||||
|
|
||||||
userAPI := base.UserAPIClient()
|
userAPI := base.UserAPIClient()
|
||||||
accountDB := base.CreateAccountsDB()
|
|
||||||
federation := base.CreateFederationClient()
|
federation := base.CreateFederationClient()
|
||||||
|
|
||||||
rsAPI := base.RoomserverHTTPClient()
|
rsAPI := base.RoomserverHTTPClient()
|
||||||
|
|
||||||
syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, accountDB, rsAPI, federation, cfg)
|
syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg)
|
||||||
|
|
||||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))
|
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,8 @@ Internal only | `-------------------
|
||||||
- 12 (FedSender -> ServerKeyAPI): Verifying event signatures of responses (e.g from send_join)
|
- 12 (FedSender -> ServerKeyAPI): Verifying event signatures of responses (e.g from send_join)
|
||||||
- 13 (Roomserver -> ServerKeyAPI): Verifying event signatures of backfilled events
|
- 13 (Roomserver -> ServerKeyAPI): Verifying event signatures of backfilled events
|
||||||
|
|
||||||
|
In addition to this, all public facing components (Tier 1) talk to the `UserAPI` to verify access tokens and extract profile information where needed.
|
||||||
|
|
||||||
## Kafka logs
|
## Kafka logs
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
|
@ -87,6 +87,6 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
|
||||||
m.ExtPublicRoomsProvider,
|
m.ExtPublicRoomsProvider,
|
||||||
)
|
)
|
||||||
syncapi.AddPublicRoutes(
|
syncapi.AddPublicRoutes(
|
||||||
publicMux, m.KafkaConsumer, m.UserAPI, m.AccountDB, m.RoomserverAPI, m.FedClient, m.Config,
|
publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
@ -33,14 +32,14 @@ import (
|
||||||
|
|
||||||
// RequestPool manages HTTP long-poll connections for /sync
|
// RequestPool manages HTTP long-poll connections for /sync
|
||||||
type RequestPool struct {
|
type RequestPool struct {
|
||||||
db storage.Database
|
db storage.Database
|
||||||
accountDB accounts.Database
|
userAPI userapi.UserInternalAPI
|
||||||
notifier *Notifier
|
notifier *Notifier
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRequestPool makes a new RequestPool
|
// NewRequestPool makes a new RequestPool
|
||||||
func NewRequestPool(db storage.Database, n *Notifier, adb accounts.Database) *RequestPool {
|
func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool {
|
||||||
return &RequestPool{db, adb, n}
|
return &RequestPool{db, userAPI, n}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||||
|
@ -193,6 +192,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nolint:gocyclo
|
||||||
func (rp *RequestPool) appendAccountData(
|
func (rp *RequestPool) appendAccountData(
|
||||||
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
||||||
accountDataFilter *gomatrixserverlib.EventFilter,
|
accountDataFilter *gomatrixserverlib.EventFilter,
|
||||||
|
@ -202,25 +202,21 @@ func (rp *RequestPool) appendAccountData(
|
||||||
// data keys were set between two message. This isn't a huge issue since the
|
// data keys were set between two message. This isn't a huge issue since the
|
||||||
// duplicate data doesn't represent a huge quantity of data, but an optimisation
|
// duplicate data doesn't represent a huge quantity of data, but an optimisation
|
||||||
// here would be making sure each data is sent only once to the client.
|
// here would be making sure each data is sent only once to the client.
|
||||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if req.since == nil {
|
if req.since == nil {
|
||||||
// If this is the initial sync, we don't need to check if a data has
|
// If this is the initial sync, we don't need to check if a data has
|
||||||
// already been sent. Instead, we send the whole batch.
|
// already been sent. Instead, we send the whole batch.
|
||||||
var global []gomatrixserverlib.ClientEvent
|
var res userapi.QueryAccountDataResponse
|
||||||
var rooms map[string][]gomatrixserverlib.ClientEvent
|
err := rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{
|
||||||
global, rooms, err = rp.accountDB.GetAccountData(req.ctx, localpart)
|
UserID: userID,
|
||||||
|
}, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
data.AccountData.Events = global
|
data.AccountData.Events = res.GlobalAccountData
|
||||||
|
|
||||||
for r, j := range data.Rooms.Join {
|
for r, j := range data.Rooms.Join {
|
||||||
if len(rooms[r]) > 0 {
|
if len(res.RoomAccountData[r]) > 0 {
|
||||||
j.AccountData.Events = rooms[r]
|
j.AccountData.Events = res.RoomAccountData[r]
|
||||||
data.Rooms.Join[r] = j
|
data.Rooms.Join[r] = j
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -256,13 +252,20 @@ func (rp *RequestPool) appendAccountData(
|
||||||
events := []gomatrixserverlib.ClientEvent{}
|
events := []gomatrixserverlib.ClientEvent{}
|
||||||
// Request the missing data from the database
|
// Request the missing data from the database
|
||||||
for _, dataType := range dataTypes {
|
for _, dataType := range dataTypes {
|
||||||
event, err := rp.accountDB.GetAccountDataByType(
|
var res userapi.QueryAccountDataResponse
|
||||||
req.ctx, localpart, roomID, dataType,
|
err = rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{
|
||||||
)
|
UserID: userID,
|
||||||
|
RoomID: roomID,
|
||||||
|
DataType: dataType,
|
||||||
|
}, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
events = append(events, *event)
|
if len(res.RoomAccountData[roomID]) > 0 {
|
||||||
|
events = append(events, res.RoomAccountData[roomID]...)
|
||||||
|
} else if len(res.GlobalAccountData) > 0 {
|
||||||
|
events = append(events, res.GlobalAccountData...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append the data to the response
|
// Append the data to the response
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
@ -39,7 +38,6 @@ func AddPublicRoutes(
|
||||||
router *mux.Router,
|
router *mux.Router,
|
||||||
consumer sarama.Consumer,
|
consumer sarama.Consumer,
|
||||||
userAPI userapi.UserInternalAPI,
|
userAPI userapi.UserInternalAPI,
|
||||||
accountsDB accounts.Database,
|
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
|
@ -60,7 +58,7 @@ func AddPublicRoutes(
|
||||||
logrus.WithError(err).Panicf("failed to start notifier")
|
logrus.WithError(err).Panicf("failed to start notifier")
|
||||||
}
|
}
|
||||||
|
|
||||||
requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB)
|
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI)
|
||||||
|
|
||||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||||
cfg, consumer, notifier, syncDB, rsAPI,
|
cfg, consumer, notifier, syncDB, rsAPI,
|
||||||
|
|
|
@ -14,13 +14,18 @@
|
||||||
|
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
// UserInternalAPI is the internal API for information about users and devices.
|
// UserInternalAPI is the internal API for information about users and devices.
|
||||||
type UserInternalAPI interface {
|
type UserInternalAPI interface {
|
||||||
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error
|
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error
|
||||||
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
|
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
|
||||||
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
|
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
|
||||||
|
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryAccessTokenRequest is the request for QueryAccessToken
|
// QueryAccessTokenRequest is the request for QueryAccessToken
|
||||||
|
@ -37,6 +42,22 @@ type QueryAccessTokenResponse struct {
|
||||||
Err error // e.g ErrorForbidden
|
Err error // e.g ErrorForbidden
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryAccountDataRequest is the request for QueryAccountData
|
||||||
|
type QueryAccountDataRequest struct {
|
||||||
|
UserID string // required: the user to get account data for.
|
||||||
|
// TODO: This is a terribly confusing API shape :/
|
||||||
|
DataType string // optional: if specified returns only a single event matching this data type.
|
||||||
|
// optional: Only used if DataType is set. If blank returns global account data matching the data type.
|
||||||
|
// If set, returns only room account data matching this data type.
|
||||||
|
RoomID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryAccountDataResponse is the response for QueryAccountData
|
||||||
|
type QueryAccountDataResponse struct {
|
||||||
|
GlobalAccountData []gomatrixserverlib.ClientEvent
|
||||||
|
RoomAccountData map[string][]gomatrixserverlib.ClientEvent
|
||||||
|
}
|
||||||
|
|
||||||
// QueryDevicesRequest is the request for QueryDevices
|
// QueryDevicesRequest is the request for QueryDevices
|
||||||
type QueryDevicesRequest struct {
|
type QueryDevicesRequest struct {
|
||||||
UserID string
|
UserID string
|
||||||
|
|
|
@ -73,6 +73,39 @@ func (a *UserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDevice
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *UserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error {
|
||||||
|
local, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if domain != a.ServerName {
|
||||||
|
return fmt.Errorf("cannot query account data of remote users: got %s want %s", domain, a.ServerName)
|
||||||
|
}
|
||||||
|
if req.DataType != "" {
|
||||||
|
var event *gomatrixserverlib.ClientEvent
|
||||||
|
event, err = a.AccountDB.GetAccountDataByType(ctx, local, req.RoomID, req.DataType)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if event != nil {
|
||||||
|
if req.RoomID != "" {
|
||||||
|
res.RoomAccountData = make(map[string][]gomatrixserverlib.ClientEvent)
|
||||||
|
res.RoomAccountData[req.RoomID] = []gomatrixserverlib.ClientEvent{*event}
|
||||||
|
} else {
|
||||||
|
res.GlobalAccountData = append(res.GlobalAccountData, *event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
global, rooms, err := a.AccountDB.GetAccountData(ctx, local)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
res.RoomAccountData = rooms
|
||||||
|
res.GlobalAccountData = global
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *UserInternalAPI) QueryAccessToken(ctx context.Context, req *api.QueryAccessTokenRequest, res *api.QueryAccessTokenResponse) error {
|
func (a *UserInternalAPI) QueryAccessToken(ctx context.Context, req *api.QueryAccessTokenRequest, res *api.QueryAccessTokenResponse) error {
|
||||||
if req.AppServiceUserID != "" {
|
if req.AppServiceUserID != "" {
|
||||||
appServiceDevice, err := a.queryAppServiceToken(ctx, req.AccessToken, req.AppServiceUserID)
|
appServiceDevice, err := a.queryAppServiceToken(ctx, req.AccessToken, req.AppServiceUserID)
|
||||||
|
|
|
@ -29,6 +29,7 @@ const (
|
||||||
QueryProfilePath = "/userapi/queryProfile"
|
QueryProfilePath = "/userapi/queryProfile"
|
||||||
QueryAccessTokenPath = "/userapi/queryAccessToken"
|
QueryAccessTokenPath = "/userapi/queryAccessToken"
|
||||||
QueryDevicesPath = "/userapi/queryDevices"
|
QueryDevicesPath = "/userapi/queryDevices"
|
||||||
|
QueryAccountDataPath = "/userapi/queryAccountData"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API.
|
// NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API.
|
||||||
|
@ -82,3 +83,11 @@ func (h *httpUserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDe
|
||||||
apiURL := h.apiURL + QueryDevicesPath
|
apiURL := h.apiURL + QueryDevicesPath
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpUserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryAccountData")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.apiURL + QueryAccountDataPath
|
||||||
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||||
|
}
|
||||||
|
|
|
@ -64,4 +64,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
internalAPIMux.Handle(QueryAccountDataPath,
|
||||||
|
httputil.MakeInternalAPI("queryAccountData", func(req *http.Request) util.JSONResponse {
|
||||||
|
request := api.QueryAccountDataRequest{}
|
||||||
|
response := api.QueryAccountDataResponse{}
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
|
}
|
||||||
|
if err := s.QueryAccountData(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue