Remove QueryRoomsForUser from current state server (#1398)
This commit is contained in:
parent
5076925c18
commit
088294ee65
|
@ -130,7 +130,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||||
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
||||||
fsAPI := federationsender.NewInternalAPI(
|
fsAPI := federationsender.NewInternalAPI(
|
||||||
base, federation, rsAPI, stateAPI, keyRing,
|
base, federation, rsAPI, keyRing,
|
||||||
)
|
)
|
||||||
|
|
||||||
ygg.SetSessionFunc(func(address string) {
|
ygg.SetSessionFunc(func(address string) {
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
@ -94,10 +93,10 @@ func GetMemberships(
|
||||||
func GetJoinedRooms(
|
func GetJoinedRooms(
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
var res currentstateAPI.QueryRoomsForUserResponse
|
var res api.QueryRoomsForUserResponse
|
||||||
err := stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
err := rsAPI.QueryRoomsForUser(req.Context(), &api.QueryRoomsForUserRequest{
|
||||||
UserID: device.UserID,
|
UserID: device.UserID,
|
||||||
WantMembership: "join",
|
WantMembership: "join",
|
||||||
}, &res)
|
}, &res)
|
||||||
|
|
|
@ -140,8 +140,8 @@ func SetAvatarURL(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
var res currentstateAPI.QueryRoomsForUserResponse
|
var res api.QueryRoomsForUserResponse
|
||||||
err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
err = rsAPI.QueryRoomsForUser(req.Context(), &api.QueryRoomsForUserRequest{
|
||||||
UserID: device.UserID,
|
UserID: device.UserID,
|
||||||
WantMembership: "join",
|
WantMembership: "join",
|
||||||
}, &res)
|
}, &res)
|
||||||
|
@ -258,8 +258,8 @@ func SetDisplayName(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
var res currentstateAPI.QueryRoomsForUserResponse
|
var res api.QueryRoomsForUserResponse
|
||||||
err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
err = rsAPI.QueryRoomsForUser(req.Context(), &api.QueryRoomsForUserRequest{
|
||||||
UserID: device.UserID,
|
UserID: device.UserID,
|
||||||
WantMembership: "join",
|
WantMembership: "join",
|
||||||
}, &res)
|
}, &res)
|
||||||
|
|
|
@ -107,7 +107,7 @@ func Setup(
|
||||||
).Methods(http.MethodPost, http.MethodOptions)
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
r0mux.Handle("/joined_rooms",
|
r0mux.Handle("/joined_rooms",
|
||||||
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
return GetJoinedRooms(req, device, stateAPI)
|
return GetJoinedRooms(req, device, rsAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodGet, http.MethodOptions)
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
r0mux.Handle("/rooms/{roomID}/join",
|
r0mux.Handle("/rooms/{roomID}/join",
|
||||||
|
|
|
@ -162,7 +162,7 @@ func main() {
|
||||||
)
|
)
|
||||||
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
||||||
fsAPI := federationsender.NewInternalAPI(
|
fsAPI := federationsender.NewInternalAPI(
|
||||||
&base.Base, federation, rsAPI, stateAPI, keyRing,
|
&base.Base, federation, rsAPI, keyRing,
|
||||||
)
|
)
|
||||||
rsAPI.SetFederationSenderAPI(fsAPI)
|
rsAPI.SetFederationSenderAPI(fsAPI)
|
||||||
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI)
|
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI)
|
||||||
|
|
|
@ -115,7 +115,7 @@ func main() {
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||||
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
||||||
fsAPI := federationsender.NewInternalAPI(
|
fsAPI := federationsender.NewInternalAPI(
|
||||||
base, federation, rsAPI, stateAPI, keyRing,
|
base, federation, rsAPI, keyRing,
|
||||||
)
|
)
|
||||||
|
|
||||||
ygg.SetSessionFunc(func(address string) {
|
ygg.SetSessionFunc(func(address string) {
|
||||||
|
|
|
@ -31,7 +31,7 @@ func main() {
|
||||||
|
|
||||||
rsAPI := base.RoomserverHTTPClient()
|
rsAPI := base.RoomserverHTTPClient()
|
||||||
fsAPI := federationsender.NewInternalAPI(
|
fsAPI := federationsender.NewInternalAPI(
|
||||||
base, federation, rsAPI, base.CurrentStateAPIClient(), keyRing,
|
base, federation, rsAPI, keyRing,
|
||||||
)
|
)
|
||||||
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
||||||
|
|
||||||
|
|
|
@ -98,7 +98,7 @@ func main() {
|
||||||
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
|
||||||
|
|
||||||
fsAPI := federationsender.NewInternalAPI(
|
fsAPI := federationsender.NewInternalAPI(
|
||||||
base, federation, rsAPI, stateAPI, keyRing,
|
base, federation, rsAPI, keyRing,
|
||||||
)
|
)
|
||||||
if base.UseHTTPAPIs {
|
if base.UseHTTPAPIs {
|
||||||
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
||||||
|
|
|
@ -210,7 +210,7 @@ func main() {
|
||||||
asQuery := appservice.NewInternalAPI(
|
asQuery := appservice.NewInternalAPI(
|
||||||
base, userAPI, rsAPI,
|
base, userAPI, rsAPI,
|
||||||
)
|
)
|
||||||
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, stateAPI, &keyRing)
|
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing)
|
||||||
rsAPI.SetFederationSenderAPI(fedSenderAPI)
|
rsAPI.SetFederationSenderAPI(fedSenderAPI)
|
||||||
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
||||||
|
|
||||||
|
|
|
@ -21,22 +21,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type CurrentStateInternalAPI interface {
|
type CurrentStateInternalAPI interface {
|
||||||
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
|
||||||
QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error
|
|
||||||
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||||
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
|
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryRoomsForUserRequest struct {
|
|
||||||
UserID string
|
|
||||||
// The desired membership of the user. If this is the empty string then no rooms are returned.
|
|
||||||
WantMembership string
|
|
||||||
}
|
|
||||||
|
|
||||||
type QueryRoomsForUserResponse struct {
|
|
||||||
RoomIDs []string
|
|
||||||
}
|
|
||||||
|
|
||||||
type QueryBulkStateContentRequest struct {
|
type QueryBulkStateContentRequest struct {
|
||||||
// Returns state events in these rooms
|
// Returns state events in these rooms
|
||||||
RoomIDs []string
|
RoomIDs []string
|
||||||
|
|
|
@ -26,15 +26,6 @@ type CurrentStateInternalAPI struct {
|
||||||
DB storage.Database
|
DB storage.Database
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *CurrentStateInternalAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
|
|
||||||
roomIDs, err := a.DB.GetRoomsByMembership(ctx, req.UserID, req.WantMembership)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
res.RoomIDs = roomIDs
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
|
func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
|
||||||
events, err := a.DB.GetBulkStateContent(ctx, req.RoomIDs, req.StateTuples, req.AllowWildcards)
|
events, err := a.DB.GetBulkStateContent(ctx, req.RoomIDs, req.StateTuples, req.AllowWildcards)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -50,18 +50,6 @@ type httpCurrentStateInternalAPI struct {
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpCurrentStateInternalAPI) QueryRoomsForUser(
|
|
||||||
ctx context.Context,
|
|
||||||
request *api.QueryRoomsForUserRequest,
|
|
||||||
response *api.QueryRoomsForUserResponse,
|
|
||||||
) error {
|
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryRoomsForUser")
|
|
||||||
defer span.Finish()
|
|
||||||
|
|
||||||
apiURL := h.apiURL + QueryRoomsForUserPath
|
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *httpCurrentStateInternalAPI) QueryBulkStateContent(
|
func (h *httpCurrentStateInternalAPI) QueryBulkStateContent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryBulkStateContentRequest,
|
request *api.QueryBulkStateContentRequest,
|
||||||
|
|
|
@ -25,19 +25,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
||||||
internalAPIMux.Handle(QueryRoomsForUserPath,
|
|
||||||
httputil.MakeInternalAPI("queryRoomsForUser", func(req *http.Request) util.JSONResponse {
|
|
||||||
request := api.QueryRoomsForUserRequest{}
|
|
||||||
response := api.QueryRoomsForUserResponse{}
|
|
||||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
|
||||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
|
||||||
}
|
|
||||||
if err := intAPI.QueryRoomsForUser(req.Context(), &request, &response); err != nil {
|
|
||||||
return util.ErrorResponse(err)
|
|
||||||
}
|
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
internalAPIMux.Handle(QueryBulkStateContentPath,
|
internalAPIMux.Handle(QueryBulkStateContentPath,
|
||||||
httputil.MakeInternalAPI("queryBulkStateContent", func(req *http.Request) util.JSONResponse {
|
httputil.MakeInternalAPI("queryBulkStateContent", func(req *http.Request) util.JSONResponse {
|
||||||
request := api.QueryBulkStateContentRequest{}
|
request := api.QueryBulkStateContentRequest{}
|
||||||
|
|
|
@ -20,12 +20,12 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -36,7 +36,7 @@ type KeyChangeConsumer struct {
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
serverName gomatrixserverlib.ServerName
|
serverName gomatrixserverlib.ServerName
|
||||||
stateAPI stateapi.CurrentStateInternalAPI
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
|
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
|
||||||
|
@ -45,7 +45,7 @@ func NewKeyChangeConsumer(
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
queues *queue.OutgoingQueues,
|
queues *queue.OutgoingQueues,
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
stateAPI stateapi.CurrentStateInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
) *KeyChangeConsumer {
|
) *KeyChangeConsumer {
|
||||||
c := &KeyChangeConsumer{
|
c := &KeyChangeConsumer{
|
||||||
consumer: &internal.ContinualConsumer{
|
consumer: &internal.ContinualConsumer{
|
||||||
|
@ -57,7 +57,7 @@ func NewKeyChangeConsumer(
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
serverName: cfg.Matrix.ServerName,
|
serverName: cfg.Matrix.ServerName,
|
||||||
stateAPI: stateAPI,
|
rsAPI: rsAPI,
|
||||||
}
|
}
|
||||||
c.consumer.ProcessMessage = c.onMessage
|
c.consumer.ProcessMessage = c.onMessage
|
||||||
|
|
||||||
|
@ -92,8 +92,8 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var queryRes stateapi.QueryRoomsForUserResponse
|
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
||||||
err = t.stateAPI.QueryRoomsForUser(context.Background(), &stateapi.QueryRoomsForUserRequest{
|
err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
|
||||||
UserID: m.UserID,
|
UserID: m.UserID,
|
||||||
WantMembership: "join",
|
WantMembership: "join",
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
|
|
|
@ -16,7 +16,6 @@ package federationsender
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/api"
|
"github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/federationsender/consumers"
|
"github.com/matrix-org/dendrite/federationsender/consumers"
|
||||||
"github.com/matrix-org/dendrite/federationsender/internal"
|
"github.com/matrix-org/dendrite/federationsender/internal"
|
||||||
|
@ -42,7 +41,6 @@ func NewInternalAPI(
|
||||||
base *setup.BaseDendrite,
|
base *setup.BaseDendrite,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
stateAPI stateapi.CurrentStateInternalAPI,
|
|
||||||
keyRing *gomatrixserverlib.KeyRing,
|
keyRing *gomatrixserverlib.KeyRing,
|
||||||
) api.FederationSenderInternalAPI {
|
) api.FederationSenderInternalAPI {
|
||||||
cfg := &base.Cfg.FederationSender
|
cfg := &base.Cfg.FederationSender
|
||||||
|
@ -82,7 +80,7 @@ func NewInternalAPI(
|
||||||
logrus.WithError(err).Panic("failed to start typing server consumer")
|
logrus.WithError(err).Panic("failed to start typing server consumer")
|
||||||
}
|
}
|
||||||
keyConsumer := consumers.NewKeyChangeConsumer(
|
keyConsumer := consumers.NewKeyChangeConsumer(
|
||||||
&base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
|
&base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err := keyConsumer.Start(); err != nil {
|
if err := keyConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start key server consumer")
|
logrus.WithError(err).Panic("failed to start key server consumer")
|
||||||
|
|
|
@ -112,11 +112,6 @@ type mockStateAPI struct {
|
||||||
rsAPI *mockRoomserverAPI
|
rsAPI *mockRoomserverAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
|
||||||
func (s *mockStateAPI) QueryRoomsForUser(ctx context.Context, req *stateapi.QueryRoomsForUserRequest, res *stateapi.QueryRoomsForUserResponse) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||||
func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
|
func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
|
||||||
var res2 api.QueryBulkStateContentResponse
|
var res2 api.QueryBulkStateContentResponse
|
||||||
|
|
Loading…
Reference in a new issue