mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
Remove most parts of the EDU server
This commit is contained in:
parent
9f2f4ca7d7
commit
72df6fa8ee
|
|
@ -23,8 +23,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
|
|
@ -315,10 +313,6 @@ func (m *DendriteMonolith) Start() {
|
|||
m.userAPI = userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
|
||||
keyAPI.SetUserAPI(m.userAPI)
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
base, cache.New(), m.userAPI,
|
||||
)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, m.userAPI, rsAPI)
|
||||
|
||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||
|
|
@ -333,7 +327,6 @@ func (m *DendriteMonolith) Start() {
|
|||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asAPI,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: m.userAPI,
|
||||
|
|
|
|||
|
|
@ -13,8 +13,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
|
|
@ -119,10 +117,6 @@ func (m *DendriteMonolith) Start() {
|
|||
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
base, cache.New(), userAPI,
|
||||
)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
rsAPI.SetAppserviceAPI(asAPI)
|
||||
|
||||
|
|
@ -137,12 +131,11 @@ func (m *DendriteMonolith) Start() {
|
|||
FedClient: federation,
|
||||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asAPI,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
AppserviceAPI: asAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
|
||||
ygg, fsAPI, federation,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/api"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal/transactions"
|
||||
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
|
||||
|
|
@ -42,7 +41,6 @@ func AddPublicRoutes(
|
|||
accountsDB userdb.Database,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
eduInputAPI eduServerAPI.EDUServerInputAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
transactionsCache *transactions.Cache,
|
||||
fsAPI federationAPI.FederationInternalAPI,
|
||||
|
|
@ -58,11 +56,12 @@ func AddPublicRoutes(
|
|||
TopicClientData: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
|
||||
TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
|
||||
TopicSendToDeviceEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
|
||||
TopicTypingEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
|
||||
UserAPI: userAPI,
|
||||
}
|
||||
|
||||
routing.Setup(
|
||||
router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI,
|
||||
router, synapseAdminRouter, cfg, rsAPI, asAPI,
|
||||
accountsDB, userAPI, federation,
|
||||
syncProducer, transactionsCache, fsAPI, keyAPI,
|
||||
extRoomsProvider, mscCfg,
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ type SyncAPIProducer struct {
|
|||
TopicClientData string
|
||||
TopicReceiptEvent string
|
||||
TopicSendToDeviceEvent string
|
||||
TopicTypingEvent string
|
||||
JetStream nats.JetStreamContext
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
UserAPI userapi.UserInternalAPI
|
||||
|
|
@ -156,3 +157,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncAPIProducer) SendTyping(
|
||||
ctx context.Context, userID, roomID string, typing bool, timeoutMS int64,
|
||||
) error {
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicTypingEvent,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
m.Header.Set(jetstream.RoomID, roomID)
|
||||
m.Header.Set("typing", strconv.FormatBool(typing))
|
||||
m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS)))
|
||||
|
||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import (
|
|||
clientutil "github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/internal/transactions"
|
||||
|
|
@ -48,7 +47,6 @@ import (
|
|||
// nolint: gocyclo
|
||||
func Setup(
|
||||
publicAPIMux, synapseAdminRouter *mux.Router, cfg *config.ClientAPI,
|
||||
eduAPI eduServerAPI.EDUServerInputAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
accountDB userdb.Database,
|
||||
|
|
@ -468,7 +466,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI, rsAPI)
|
||||
return SendTyping(req, device, vars["roomID"], vars["userID"], rsAPI, syncProducer)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
v3mux.Handle("/rooms/{roomID}/redact/{eventID}",
|
||||
|
|
|
|||
|
|
@ -17,10 +17,9 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
userdb "github.com/matrix-org/dendrite/userapi/storage"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
|
|
@ -33,9 +32,8 @@ type typingContentJSON struct {
|
|||
// sends the typing events to client API typingProducer
|
||||
func SendTyping(
|
||||
req *http.Request, device *userapi.Device, roomID string,
|
||||
userID string, accountDB userdb.Database,
|
||||
eduAPI api.EDUServerInputAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
userID string, rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
syncProducer *producers.SyncAPIProducer,
|
||||
) util.JSONResponse {
|
||||
if device.UserID != userID {
|
||||
return util.JSONResponse{
|
||||
|
|
@ -57,9 +55,7 @@ func SendTyping(
|
|||
return *resErr
|
||||
}
|
||||
|
||||
if err := api.SendTyping(
|
||||
req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout,
|
||||
); err != nil {
|
||||
if err := syncProducer.SendTyping(req.Context(), userID, roomID, r.Typing, r.Timeout); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import (
|
|||
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/keyserver"
|
||||
|
|
@ -40,8 +39,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/userapi"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
|
|
@ -152,9 +149,6 @@ func main() {
|
|||
userAPI := userapi.NewInternalAPI(&base.Base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.Base.PushGatewayHTTPClient())
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
&base.Base, cache.New(), userAPI,
|
||||
)
|
||||
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
||||
rsAPI.SetAppserviceAPI(asAPI)
|
||||
fsAPI := federationapi.NewInternalAPI(
|
||||
|
|
@ -180,7 +174,6 @@ func main() {
|
|||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asAPI,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
|
|
|
|||
|
|
@ -36,8 +36,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
|
@ -190,10 +188,6 @@ func main() {
|
|||
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
base, cache.New(), userAPI,
|
||||
)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
|
||||
rsComponent.SetFederationAPI(fsAPI, keyRing)
|
||||
|
|
@ -206,7 +200,6 @@ func main() {
|
|||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asAPI,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
|
|
|
|||
|
|
@ -32,8 +32,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
|
@ -120,10 +118,6 @@ func main() {
|
|||
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
base, cache.New(), userAPI,
|
||||
)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||
rsAPI.SetAppserviceAPI(asAPI)
|
||||
fsAPI := federationapi.NewInternalAPI(
|
||||
|
|
@ -139,12 +133,11 @@ func main() {
|
|||
FedClient: federation,
|
||||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asAPI,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
AppserviceAPI: asAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
|
||||
ygg, fsAPI, federation,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -19,8 +19,6 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/keyserver"
|
||||
"github.com/matrix-org/dendrite/roomserver"
|
||||
|
|
@ -136,14 +134,6 @@ func main() {
|
|||
rsImpl.SetUserAPI(userAPI)
|
||||
keyImpl.SetUserAPI(userAPI)
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(
|
||||
base, cache.New(), userAPI,
|
||||
)
|
||||
if base.UseHTTPAPIs {
|
||||
eduserver.AddInternalRoutes(base.InternalAPIMux, eduInputAPI)
|
||||
eduInputAPI = base.EDUServerClient()
|
||||
}
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
|
|
@ -151,12 +141,10 @@ func main() {
|
|||
FedClient: federation,
|
||||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asAPI,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
AppserviceAPI: asAPI, FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
}
|
||||
monolith.AddAllPublicRoutes(
|
||||
base.ProcessContext,
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ func main() {
|
|||
components := map[string]entrypoint{
|
||||
"appservice": personalities.Appservice,
|
||||
"clientapi": personalities.ClientAPI,
|
||||
"eduserver": personalities.EDUServer,
|
||||
"federationapi": personalities.FederationAPI,
|
||||
"keyserver": personalities.KeyServer,
|
||||
"mediaapi": personalities.MediaAPI,
|
||||
|
|
|
|||
|
|
@ -28,13 +28,12 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
|
|||
asQuery := base.AppserviceHTTPClient()
|
||||
rsAPI := base.RoomserverHTTPClient()
|
||||
fsAPI := base.FederationAPIHTTPClient()
|
||||
eduInputAPI := base.EDUServerClient()
|
||||
userAPI := base.UserAPIClient()
|
||||
keyAPI := base.KeyServerHTTPClient()
|
||||
|
||||
clientapi.AddPublicRoutes(
|
||||
base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI,
|
||||
accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI,
|
||||
accountDB, federation, rsAPI, asQuery, transactions.New(), fsAPI, userAPI,
|
||||
keyAPI, nil, &cfg.MSCs,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,33 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package personalities
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
basepkg "github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
)
|
||||
|
||||
func EDUServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
|
||||
intAPI := eduserver.NewInternalAPI(base, cache.New(), base.UserAPIClient())
|
||||
eduserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
||||
|
||||
base.SetupAndServeHTTP(
|
||||
base.Cfg.EDUServer.InternalAPI.Listen, // internal listener
|
||||
basepkg.NoListener, // external listener
|
||||
nil, nil,
|
||||
)
|
||||
}
|
||||
|
|
@ -31,7 +31,7 @@ func FederationAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
|
|||
federationapi.AddPublicRoutes(
|
||||
base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux,
|
||||
&base.Cfg.FederationAPI, userAPI, federation, keyRing,
|
||||
rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
|
||||
rsAPI, fsAPI, keyAPI,
|
||||
&base.Cfg.MSCs, nil,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -31,8 +31,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/keyserver"
|
||||
|
|
@ -193,7 +191,6 @@ func startup() {
|
|||
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
|
||||
asQuery := appservice.NewInternalAPI(
|
||||
base, userAPI, rsAPI,
|
||||
)
|
||||
|
|
@ -208,12 +205,11 @@ func startup() {
|
|||
FedClient: federation,
|
||||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asQuery,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationAPI: fedSenderAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
AppserviceAPI: asQuery,
|
||||
FederationAPI: fedSenderAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
//ServerKeyAPI: serverKeyAPI,
|
||||
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,8 +24,6 @@ import (
|
|||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/keyserver"
|
||||
|
|
@ -203,7 +201,6 @@ func main() {
|
|||
}
|
||||
|
||||
rsAPI := roomserver.NewInternalAPI(base)
|
||||
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
|
||||
asQuery := appservice.NewInternalAPI(
|
||||
base, userAPI, rsAPI,
|
||||
)
|
||||
|
|
@ -222,7 +219,6 @@ func main() {
|
|||
KeyRing: &keyRing,
|
||||
|
||||
AppserviceAPI: asQuery,
|
||||
EDUInternalAPI: eduInputAPI,
|
||||
FederationSenderAPI: fedSenderAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
|
|
|
|||
|
|
@ -1,61 +0,0 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package api provides the types that are used to communicate with the typing server.
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// InputTypingEvent is an event for notifying the typing server about typing updates.
|
||||
type InputTypingEvent struct {
|
||||
// UserID of the user to update typing status.
|
||||
UserID string `json:"user_id"`
|
||||
// RoomID of the room the user is typing (or has stopped).
|
||||
RoomID string `json:"room_id"`
|
||||
// Typing is true if the user is typing, false if they have stopped.
|
||||
Typing bool `json:"typing"`
|
||||
// Timeout is the interval in milliseconds for which the user should be marked as typing.
|
||||
TimeoutMS int64 `json:"timeout"`
|
||||
// OriginServerTS when the server received the update.
|
||||
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
|
||||
}
|
||||
|
||||
// InputTypingEventRequest is a request to EDUServerInputAPI
|
||||
type InputTypingEventRequest struct {
|
||||
InputTypingEvent InputTypingEvent `json:"input_typing_event"`
|
||||
}
|
||||
|
||||
// InputTypingEventResponse is a response to InputTypingEvents
|
||||
type InputTypingEventResponse struct{}
|
||||
|
||||
type InputCrossSigningKeyUpdateRequest struct {
|
||||
CrossSigningKeyUpdate `json:"signing_keys"`
|
||||
}
|
||||
|
||||
type InputCrossSigningKeyUpdateResponse struct{}
|
||||
|
||||
// EDUServerInputAPI is used to write events to the typing server.
|
||||
type EDUServerInputAPI interface {
|
||||
InputTypingEvent(
|
||||
ctx context.Context,
|
||||
request *InputTypingEventRequest,
|
||||
response *InputTypingEventResponse,
|
||||
) error
|
||||
}
|
||||
|
|
@ -17,22 +17,9 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// OutputTypingEvent is an entry in typing server output kafka log.
|
||||
// This contains the event with extra fields used to create 'm.typing' event
|
||||
// in clientapi & federation.
|
||||
type OutputTypingEvent struct {
|
||||
// The Event for the typing edu event.
|
||||
Event TypingEvent `json:"event"`
|
||||
// ExpireTime is the interval after which the user should no longer be
|
||||
// considered typing. Only available if Event.Typing is true.
|
||||
ExpireTime *time.Time
|
||||
}
|
||||
|
||||
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
|
||||
// This contains the full event content, along with the user ID and device ID
|
||||
// to which it is destined.
|
||||
|
|
|
|||
|
|
@ -20,21 +20,6 @@ const (
|
|||
MSigningKeyUpdate = "m.signing_key_update"
|
||||
)
|
||||
|
||||
type TypingEvent struct {
|
||||
Type string `json:"type"`
|
||||
RoomID string `json:"room_id"`
|
||||
UserID string `json:"user_id"`
|
||||
Typing bool `json:"typing"`
|
||||
}
|
||||
|
||||
type ReceiptEvent struct {
|
||||
UserID string `json:"user_id"`
|
||||
RoomID string `json:"room_id"`
|
||||
EventID string `json:"event_id"`
|
||||
Type string `json:"type"`
|
||||
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
|
||||
}
|
||||
|
||||
type FederationReceiptMRead struct {
|
||||
User map[string]FederationReceiptData `json:"m.read"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,43 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// SendTyping sends a typing event to EDU server
|
||||
func SendTyping(
|
||||
ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string,
|
||||
typing bool, timeoutMS int64,
|
||||
) error {
|
||||
requestData := InputTypingEvent{
|
||||
UserID: userID,
|
||||
RoomID: roomID,
|
||||
Typing: typing,
|
||||
TimeoutMS: timeoutMS,
|
||||
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
|
||||
}
|
||||
|
||||
var response InputTypingEventResponse
|
||||
err := eduAPI.InputTypingEvent(
|
||||
ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
@ -1,54 +0,0 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package eduserver
|
||||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/eduserver/input"
|
||||
"github.com/matrix-org/dendrite/eduserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
||||
// on the given input API.
|
||||
func AddInternalRoutes(internalMux *mux.Router, inputAPI api.EDUServerInputAPI) {
|
||||
inthttp.AddRoutes(inputAPI, internalMux)
|
||||
}
|
||||
|
||||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(
|
||||
base *base.BaseDendrite,
|
||||
eduCache *cache.EDUCache,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
) api.EDUServerInputAPI {
|
||||
cfg := &base.Cfg.EDUServer
|
||||
|
||||
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
return &input.EDUServerInputAPI{
|
||||
Cache: eduCache,
|
||||
UserAPI: userAPI,
|
||||
JetStream: js,
|
||||
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
}
|
||||
}
|
||||
|
|
@ -1,100 +0,0 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package input
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// EDUServerInputAPI implements api.EDUServerInputAPI
|
||||
type EDUServerInputAPI struct {
|
||||
// Cache to store the current typing members in each room.
|
||||
Cache *cache.EDUCache
|
||||
// The kafka topic to output new typing events to.
|
||||
OutputTypingEventTopic string
|
||||
// kafka producer
|
||||
JetStream nats.JetStreamContext
|
||||
// Internal user query API
|
||||
UserAPI userapi.UserInternalAPI
|
||||
// our server name
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
// InputTypingEvent implements api.EDUServerInputAPI
|
||||
func (t *EDUServerInputAPI) InputTypingEvent(
|
||||
ctx context.Context,
|
||||
request *api.InputTypingEventRequest,
|
||||
response *api.InputTypingEventResponse,
|
||||
) error {
|
||||
ite := &request.InputTypingEvent
|
||||
if ite.Typing {
|
||||
// user is typing, update our current state of users typing.
|
||||
expireTime := ite.OriginServerTS.Time().Add(
|
||||
time.Duration(ite.TimeoutMS) * time.Millisecond,
|
||||
)
|
||||
t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
|
||||
} else {
|
||||
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
|
||||
}
|
||||
|
||||
return t.sendTypingEvent(ite)
|
||||
}
|
||||
|
||||
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
|
||||
ev := &api.TypingEvent{
|
||||
Type: gomatrixserverlib.MTyping,
|
||||
RoomID: ite.RoomID,
|
||||
UserID: ite.UserID,
|
||||
Typing: ite.Typing,
|
||||
}
|
||||
ote := &api.OutputTypingEvent{
|
||||
Event: *ev,
|
||||
}
|
||||
|
||||
if ev.Typing {
|
||||
expireTime := ite.OriginServerTS.Time().Add(
|
||||
time.Duration(ite.TimeoutMS) * time.Millisecond,
|
||||
)
|
||||
ote.ExpireTime = &expireTime
|
||||
}
|
||||
|
||||
eventJSON, err := json.Marshal(ote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"room_id": ite.RoomID,
|
||||
"user_id": ite.UserID,
|
||||
"typing": ite.Typing,
|
||||
}).Tracef("Producing to topic '%s'", t.OutputTypingEventTopic)
|
||||
|
||||
_, err = t.JetStream.PublishMsg(&nats.Msg{
|
||||
Subject: t.OutputTypingEventTopic,
|
||||
Header: nats.Header{},
|
||||
Data: eventJSON,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
@ -1,42 +0,0 @@
|
|||
package inthttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
// HTTP paths for the internal HTTP APIs
|
||||
const (
|
||||
EDUServerInputTypingEventPath = "/eduserver/input"
|
||||
)
|
||||
|
||||
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
|
||||
func NewEDUServerClient(eduServerURL string, httpClient *http.Client) (api.EDUServerInputAPI, error) {
|
||||
if httpClient == nil {
|
||||
return nil, errors.New("NewEDUServerClient: httpClient is <nil>")
|
||||
}
|
||||
return &httpEDUServerInputAPI{eduServerURL, httpClient}, nil
|
||||
}
|
||||
|
||||
type httpEDUServerInputAPI struct {
|
||||
eduServerURL string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// InputTypingEvent implements EDUServerInputAPI
|
||||
func (h *httpEDUServerInputAPI) InputTypingEvent(
|
||||
ctx context.Context,
|
||||
request *api.InputTypingEventRequest,
|
||||
response *api.InputTypingEventResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.eduServerURL + EDUServerInputTypingEventPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
package inthttp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// AddRoutes adds the EDUServerInputAPI handlers to the http.ServeMux.
|
||||
func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
|
||||
internalAPIMux.Handle(EDUServerInputTypingEventPath,
|
||||
httputil.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
|
||||
var request api.InputTypingEventRequest
|
||||
var response api.InputTypingEventResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
|
@ -149,18 +149,19 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.M
|
|||
// events topic from the EDU server.
|
||||
func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bool {
|
||||
// Extract the typing event from msg.
|
||||
var ote api.OutputTypingEvent
|
||||
if err := json.Unmarshal(msg.Data, &ote); err != nil {
|
||||
// Skip this msg but continue processing messages.
|
||||
log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
|
||||
_ = msg.Ack()
|
||||
|
||||
roomID := msg.Header.Get(jetstream.RoomID)
|
||||
userID := msg.Header.Get(jetstream.UserID)
|
||||
typing, err := strconv.ParseBool(msg.Header.Get("typing"))
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("EDU server output log: typing parse failure")
|
||||
return true
|
||||
}
|
||||
|
||||
// only send typing events which originated from us
|
||||
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
|
||||
_, typingServerName, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
|
||||
log.WithError(err).WithField("user_id", userID).Error("Failed to extract domain from typing sender")
|
||||
_ = msg.Ack()
|
||||
return true
|
||||
}
|
||||
|
|
@ -168,9 +169,9 @@ func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bo
|
|||
return true
|
||||
}
|
||||
|
||||
joined, err := t.db.GetJoinedHosts(ctx, ote.Event.RoomID)
|
||||
joined, err := t.db.GetJoinedHosts(ctx, roomID)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("room_id", ote.Event.RoomID).Error("failed to get joined hosts for room")
|
||||
log.WithError(err).WithField("room_id", roomID).Error("failed to get joined hosts for room")
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
@ -179,16 +180,16 @@ func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bo
|
|||
names[i] = joined[i].ServerName
|
||||
}
|
||||
|
||||
edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
|
||||
edu := &gomatrixserverlib.EDU{Type: "m.typing"}
|
||||
if edu.Content, err = json.Marshal(map[string]interface{}{
|
||||
"room_id": ote.Event.RoomID,
|
||||
"user_id": ote.Event.UserID,
|
||||
"typing": ote.Event.Typing,
|
||||
"room_id": roomID,
|
||||
"user_id": userID,
|
||||
"typing": typing,
|
||||
}); err != nil {
|
||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||
return true
|
||||
}
|
||||
|
||||
log.Debugf("sending edu: %+v", edu)
|
||||
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
|
||||
log.WithError(err).Error("failed to send EDU")
|
||||
return false
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ package federationapi
|
|||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/consumers"
|
||||
|
|
@ -56,7 +55,6 @@ func AddPublicRoutes(
|
|||
keyRing gomatrixserverlib.JSONVerifier,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
federationAPI federationAPI.FederationInternalAPI,
|
||||
eduAPI eduserverAPI.EDUServerInputAPI,
|
||||
keyAPI keyserverAPI.KeyInternalAPI,
|
||||
mscCfg *config.MSCs,
|
||||
servers federationAPI.ServersInRoomProvider,
|
||||
|
|
@ -67,11 +65,12 @@ func AddPublicRoutes(
|
|||
JetStream: js,
|
||||
TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
|
||||
TopicSendToDeviceEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
|
||||
TopicTypingEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
|
||||
}
|
||||
|
||||
routing.Setup(
|
||||
fedRouter, keyRouter, wellKnownRouter, cfg, rsAPI,
|
||||
eduAPI, federationAPI, keyRing,
|
||||
federationAPI, keyRing,
|
||||
federation, userAPI, keyAPI, mscCfg,
|
||||
servers, producer,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import (
|
|||
type SyncAPIProducer struct {
|
||||
TopicReceiptEvent string
|
||||
TopicSendToDeviceEvent string
|
||||
TopicTypingEvent string
|
||||
JetStream nats.JetStreamContext
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
UserAPI userapi.UserInternalAPI
|
||||
|
|
@ -126,3 +127,18 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncAPIProducer) SendTyping(
|
||||
ctx context.Context, userID, roomID string, typing bool, timeoutMS int64,
|
||||
) error {
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicTypingEvent,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
m.Header.Set(jetstream.RoomID, roomID)
|
||||
m.Header.Set("typing", strconv.FormatBool(typing))
|
||||
m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS)))
|
||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import (
|
|||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/producers"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
|
@ -45,7 +44,6 @@ func Setup(
|
|||
fedMux, keyMux, wkMux *mux.Router,
|
||||
cfg *config.FederationAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
eduAPI eduserverAPI.EDUServerInputAPI,
|
||||
fsAPI federationAPI.FederationInternalAPI,
|
||||
keys gomatrixserverlib.JSONVerifier,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
|
|
@ -118,7 +116,7 @@ func Setup(
|
|||
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
|
||||
return Send(
|
||||
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
|
||||
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers, producer,
|
||||
cfg, rsAPI, keyAPI, keys, federation, mu, servers, producer,
|
||||
)
|
||||
},
|
||||
)).Methods(http.MethodPut, http.MethodOptions)
|
||||
|
|
|
|||
|
|
@ -88,7 +88,6 @@ func Send(
|
|||
txnID gomatrixserverlib.TransactionID,
|
||||
cfg *config.FederationAPI,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
eduAPI eduserverAPI.EDUServerInputAPI,
|
||||
keyAPI keyapi.KeyInternalAPI,
|
||||
keys gomatrixserverlib.JSONVerifier,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
|
|
@ -129,7 +128,6 @@ func Send(
|
|||
|
||||
t := txnReq{
|
||||
rsAPI: rsAPI,
|
||||
eduAPI: eduAPI,
|
||||
keys: keys,
|
||||
federation: federation,
|
||||
servers: servers,
|
||||
|
|
@ -188,7 +186,6 @@ func Send(
|
|||
type txnReq struct {
|
||||
gomatrixserverlib.Transaction
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
eduAPI eduserverAPI.EDUServerInputAPI
|
||||
keyAPI keyapi.KeyInternalAPI
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
federation txnFederationClient
|
||||
|
|
@ -333,7 +330,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
util.GetLogger(ctx).Debugf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
|
||||
continue
|
||||
}
|
||||
if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
|
||||
if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server")
|
||||
}
|
||||
case gomatrixserverlib.MDirectToDevice:
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/test"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -53,28 +52,6 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
type testEDUProducer struct {
|
||||
// this producer keeps track of calls to InputTypingEvent
|
||||
invocations []eduAPI.InputTypingEventRequest
|
||||
}
|
||||
|
||||
func (p *testEDUProducer) InputTypingEvent(
|
||||
ctx context.Context,
|
||||
request *eduAPI.InputTypingEventRequest,
|
||||
response *eduAPI.InputTypingEventResponse,
|
||||
) error {
|
||||
p.invocations = append(p.invocations, *request)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *testEDUProducer) InputCrossSigningKeyUpdate(
|
||||
ctx context.Context,
|
||||
request *eduAPI.InputCrossSigningKeyUpdateRequest,
|
||||
response *eduAPI.InputCrossSigningKeyUpdateResponse,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type testRoomserverAPI struct {
|
||||
api.RoomserverInternalAPITrace
|
||||
inputRoomEvents []api.InputRoomEvent
|
||||
|
|
@ -209,7 +186,6 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver
|
|||
func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
|
||||
t := &txnReq{
|
||||
rsAPI: rsAPI,
|
||||
eduAPI: &testEDUProducer{},
|
||||
keys: &test.NopJSONVerifier{},
|
||||
federation: fedClient,
|
||||
roomsMu: internal.NewMutexByRoom(),
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2017-2018 NewTypingCache Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
|
@ -14,11 +14,13 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cache
|
||||
package caching
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const defaultTypingTimeout = 10 * time.Second
|
||||
|
|
@ -53,8 +55,8 @@ func (t *EDUCache) newRoomData() *roomData {
|
|||
}
|
||||
}
|
||||
|
||||
// New returns a new EDUCache initialised for use.
|
||||
func New() *EDUCache {
|
||||
// NewTypingCache returns a new EDUCache initialised for use.
|
||||
func NewTypingCache() *EDUCache {
|
||||
return &EDUCache{data: make(map[string]*roomData)}
|
||||
}
|
||||
|
||||
|
|
@ -100,6 +102,7 @@ func (t *EDUCache) GetTypingUsersIfUpdatedAfter(
|
|||
func (t *EDUCache) AddTypingUser(
|
||||
userID, roomID string, expire *time.Time,
|
||||
) int64 {
|
||||
logrus.Debugf("Adding user to room: %s %s", userID, roomID)
|
||||
expireTime := getExpireTime(expire)
|
||||
if until := time.Until(expireTime); until > 0 {
|
||||
timer := time.AfterFunc(until, func() {
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2017-2018 NewTypingCache Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
|
@ -14,7 +14,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cache
|
||||
package caching
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
|
@ -24,9 +24,9 @@ import (
|
|||
)
|
||||
|
||||
func TestEDUCache(t *testing.T) {
|
||||
tCache := New()
|
||||
tCache := NewTypingCache()
|
||||
if tCache == nil {
|
||||
t.Fatal("New failed")
|
||||
t.Fatal("NewTypingCache failed")
|
||||
}
|
||||
|
||||
t.Run("AddTypingUser", func(t *testing.T) {
|
||||
|
|
@ -45,8 +45,6 @@ import (
|
|||
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
|
||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
federationIntHTTP "github.com/matrix-org/dendrite/federationapi/inthttp"
|
||||
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
|
||||
|
|
@ -245,15 +243,6 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI {
|
|||
return userAPI
|
||||
}
|
||||
|
||||
// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
|
||||
func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
|
||||
e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.apiHttpClient)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("EDUServerClient failed", b.apiHttpClient)
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// FederationAPIHTTPClient returns FederationInternalAPI for hitting
|
||||
// the federation API server over HTTP
|
||||
func (b *BaseDendrite) FederationAPIHTTPClient() federationAPI.FederationInternalAPI {
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import (
|
|||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/clientapi"
|
||||
"github.com/matrix-org/dendrite/clientapi/api"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal/transactions"
|
||||
|
|
@ -43,12 +42,11 @@ type Monolith struct {
|
|||
Client *gomatrixserverlib.Client
|
||||
FedClient *gomatrixserverlib.FederationClient
|
||||
|
||||
AppserviceAPI appserviceAPI.AppServiceQueryAPI
|
||||
EDUInternalAPI eduServerAPI.EDUServerInputAPI
|
||||
FederationAPI federationAPI.FederationInternalAPI
|
||||
RoomserverAPI roomserverAPI.RoomserverInternalAPI
|
||||
UserAPI userapi.UserInternalAPI
|
||||
KeyAPI keyAPI.KeyInternalAPI
|
||||
AppserviceAPI appserviceAPI.AppServiceQueryAPI
|
||||
FederationAPI federationAPI.FederationInternalAPI
|
||||
RoomserverAPI roomserverAPI.RoomserverInternalAPI
|
||||
UserAPI userapi.UserInternalAPI
|
||||
KeyAPI keyAPI.KeyInternalAPI
|
||||
|
||||
// Optional
|
||||
ExtPublicRoomsProvider api.ExtraPublicRoomsProvider
|
||||
|
|
@ -58,15 +56,14 @@ type Monolith struct {
|
|||
func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) {
|
||||
clientapi.AddPublicRoutes(
|
||||
process, csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB,
|
||||
m.FedClient, m.RoomserverAPI,
|
||||
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
|
||||
m.FedClient, m.RoomserverAPI, m.AppserviceAPI, transactions.New(),
|
||||
m.FederationAPI, m.UserAPI, m.KeyAPI,
|
||||
m.ExtPublicRoomsProvider, &m.Config.MSCs,
|
||||
)
|
||||
federationapi.AddPublicRoutes(
|
||||
process, ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
|
||||
m.KeyRing, m.RoomserverAPI, m.FederationAPI,
|
||||
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
|
||||
m.KeyAPI, &m.Config.MSCs, nil,
|
||||
)
|
||||
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, m.UserAPI, m.Client)
|
||||
syncapi.AddPublicRoutes(
|
||||
|
|
|
|||
|
|
@ -16,16 +16,14 @@ package consumers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
|
@ -37,7 +35,7 @@ type OutputTypingEventConsumer struct {
|
|||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
topic string
|
||||
eduCache *cache.EDUCache
|
||||
eduCache *caching.EDUCache
|
||||
stream types.StreamProvider
|
||||
notifier *notifier.Notifier
|
||||
}
|
||||
|
|
@ -48,8 +46,7 @@ func NewOutputTypingEventConsumer(
|
|||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
js nats.JetStreamContext,
|
||||
store storage.Database,
|
||||
eduCache *cache.EDUCache,
|
||||
eduCache *caching.EDUCache,
|
||||
notifier *notifier.Notifier,
|
||||
stream types.StreamProvider,
|
||||
) *OutputTypingEventConsumer {
|
||||
|
|
@ -73,34 +70,40 @@ func (s *OutputTypingEventConsumer) Start() error {
|
|||
}
|
||||
|
||||
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
||||
var output api.OutputTypingEvent
|
||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
roomID := msg.Header.Get(jetstream.RoomID)
|
||||
userID := msg.Header.Get(jetstream.UserID)
|
||||
typing, err := strconv.ParseBool(msg.Header.Get("typing"))
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("EDU server output log: typing parse failure")
|
||||
return true
|
||||
}
|
||||
timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms"))
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("EDU server output log: timeout parse failure")
|
||||
return true
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"room_id": output.Event.RoomID,
|
||||
"user_id": output.Event.UserID,
|
||||
"typing": output.Event.Typing,
|
||||
}).Debug("received data from EDU server")
|
||||
"room_id": roomID,
|
||||
"user_id": userID,
|
||||
"typing": typing,
|
||||
"timeout": timeout,
|
||||
}).Debug("syncapi received EDU data from client api")
|
||||
|
||||
var typingPos types.StreamPosition
|
||||
typingEvent := output.Event
|
||||
if typingEvent.Typing {
|
||||
if typing {
|
||||
expiry := time.Now().Add(time.Duration(timeout) * time.Millisecond)
|
||||
typingPos = types.StreamPosition(
|
||||
s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
|
||||
s.eduCache.AddTypingUser(userID, roomID, &expiry),
|
||||
)
|
||||
} else {
|
||||
typingPos = types.StreamPosition(
|
||||
s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
|
||||
s.eduCache.RemoveUser(userID, roomID),
|
||||
)
|
||||
}
|
||||
|
||||
s.stream.Advance(typingPos)
|
||||
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
|
||||
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: typingPos})
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,14 +4,14 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type TypingStreamProvider struct {
|
||||
StreamProvider
|
||||
EDUCache *cache.EDUCache
|
||||
EDUCache *caching.EDUCache
|
||||
}
|
||||
|
||||
func (p *TypingStreamProvider) CompleteSync(
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ package streams
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
|
|
@ -25,7 +25,7 @@ type Streams struct {
|
|||
func NewSyncStreamProviders(
|
||||
d storage.Database, userAPI userapi.UserInternalAPI,
|
||||
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
||||
eduCache *cache.EDUCache,
|
||||
eduCache *caching.EDUCache,
|
||||
) *Streams {
|
||||
streams := &Streams{
|
||||
PDUStreamProvider: &PDUStreamProvider{
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ import (
|
|||
"context"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
|
|
@ -56,7 +56,7 @@ func AddPublicRoutes(
|
|||
logrus.WithError(err).Panicf("failed to connect to sync db")
|
||||
}
|
||||
|
||||
eduCache := cache.New()
|
||||
eduCache := caching.NewTypingCache()
|
||||
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
|
||||
notifier := notifier.NewNotifier(streams.Latest(context.Background()))
|
||||
if err = notifier.Load(context.Background(), syncDB); err != nil {
|
||||
|
|
@ -110,7 +110,7 @@ func AddPublicRoutes(
|
|||
}
|
||||
|
||||
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
||||
process, cfg, js, syncDB, eduCache, notifier, streams.TypingStreamProvider,
|
||||
process, cfg, js, eduCache, notifier, streams.TypingStreamProvider,
|
||||
)
|
||||
if err = typingConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start typing consumer")
|
||||
|
|
|
|||
Loading…
Reference in a new issue