diff --git a/clientapi/producers/eduserver.go b/clientapi/producers/eduserver.go index 30c40fb7f..f87988b0e 100644 --- a/clientapi/producers/eduserver.go +++ b/clientapi/producers/eduserver.go @@ -14,6 +14,7 @@ package producers import ( "context" + "encoding/json" "time" "github.com/matrix-org/dendrite/eduserver/api" @@ -52,3 +53,25 @@ func (p *EDUServerProducer) SendTyping( return err } + +// SendToDevice sends a typing event to EDU server +func (p *EDUServerProducer) SendToDevice( + ctx context.Context, userID, deviceID, eventType string, + message interface{}, +) error { + js, err := json.Marshal(message) + if err != nil { + return err + } + requestData := api.InputSendToDeviceEvent{ + UserID: userID, + DeviceID: deviceID, + EventType: eventType, + Message: js, + } + request := api.InputSendToDeviceEventRequest{ + InputSendToDeviceEvent: requestData, + } + response := api.InputSendToDeviceEventResponse{} + return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response) +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 934d9f065..205807722 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -274,6 +274,17 @@ func Setup( }), ).Methods(http.MethodPut, http.MethodOptions) + r0mux.Handle("/sendToDevice/{eventType}/{txnID}", + internal.MakeAuthAPI("send_to_device", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars, err := internal.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + txnID := vars["txnID"] + return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) + }), + ).Methods(http.MethodPut, http.MethodOptions) + r0mux.Handle("/account/whoami", internal.MakeAuthAPI("whoami", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { return Whoami(req, device) diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go new file mode 100644 index 000000000..abcf9e162 --- /dev/null +++ b/clientapi/routing/sendtodevice.go @@ -0,0 +1,66 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "encoding/json" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/internal/transactions" + "github.com/matrix-org/util" +) + +// SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId} +// sends the device events to the EDU Server +func SendToDevice( + req *http.Request, device *authtypes.Device, + eduProducer *producers.EDUServerProducer, + txnCache *transactions.Cache, + eventType string, txnID *string, +) util.JSONResponse { + if txnID != nil { + // Try to fetch response from transactionsCache + if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok { + return *res + } + } + + // parse the incoming http request + var httpReq struct { + Messages map[string]map[string]json.RawMessage `json:"messages"` + } + resErr := httputil.UnmarshalJSONRequest(req, &req) + if resErr != nil { + return *resErr + } + + for userID, byUser := range httpReq.Messages { + for deviceID, message := range byUser { + if err := eduProducer.SendToDevice( + req.Context(), userID, deviceID, eventType, message, + ); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") + return jsonerror.InternalServerError() + } + } + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 8b5b6d76a..26670a792 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -15,6 +15,7 @@ package api import ( "context" + "encoding/json" "errors" "net/http" @@ -37,6 +38,17 @@ type InputTypingEvent struct { OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` } +type InputSendToDeviceEvent struct { + // The user ID to send the update to. + UserID string `json:"user_id"` + // The device ID to send the update to. + DeviceID string `json:"device_id"` + // The type of the event. + EventType string `json:"event_type"` + // The contents of the message. + Message json.RawMessage `json:"message"` +} + // InputTypingEventRequest is a request to EDUServerInputAPI type InputTypingEventRequest struct { InputTypingEvent InputTypingEvent `json:"input_typing_event"` @@ -45,6 +57,14 @@ type InputTypingEventRequest struct { // InputTypingEventResponse is a response to InputTypingEvents type InputTypingEventResponse struct{} +// InputSendToDeviceEventRequest is a request to EDUServerInputAPI +type InputSendToDeviceEventRequest struct { + InputSendToDeviceEvent InputSendToDeviceEvent `json:"input_send_to_device_event"` +} + +// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest +type InputSendToDeviceEventResponse struct{} + // EDUServerInputAPI is used to write events to the typing server. type EDUServerInputAPI interface { InputTypingEvent( @@ -52,11 +72,20 @@ type EDUServerInputAPI interface { request *InputTypingEventRequest, response *InputTypingEventResponse, ) error + + InputSendToDeviceEvent( + ctx context.Context, + request *InputSendToDeviceEventRequest, + response *InputSendToDeviceEventResponse, + ) error } // EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. const EDUServerInputTypingEventPath = "/eduserver/input" +// EDUServerInputSendToDeviceEventPath is the HTTP path for the InputSendToDeviceEvent API. +const EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" + // NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API. func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) (EDUServerInputAPI, error) { if httpClient == nil { @@ -70,7 +99,7 @@ type httpEDUServerInputAPI struct { httpClient *http.Client } -// InputRoomEvents implements EDUServerInputAPI +// InputTypingEvent implements EDUServerInputAPI func (h *httpEDUServerInputAPI) InputTypingEvent( ctx context.Context, request *InputTypingEventRequest, @@ -82,3 +111,16 @@ func (h *httpEDUServerInputAPI) InputTypingEvent( apiURL := h.eduServerURL + EDUServerInputTypingEventPath return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +// InputSendToDeviceEvent implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputSendToDeviceEvent( + ctx context.Context, + request *InputSendToDeviceEventRequest, + response *InputSendToDeviceEventResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputSendToDeviceEvent") + defer span.Finish() + + apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath + return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/eduserver/api/output.go b/eduserver/api/output.go index 8696acf49..902d63973 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -12,7 +12,10 @@ package api -import "time" +import ( + "encoding/json" + "time" +) // OutputTypingEvent is an entry in typing server output kafka log. // This contains the event with extra fields used to create 'm.typing' event @@ -32,3 +35,13 @@ type TypingEvent struct { UserID string `json:"user_id"` Typing bool `json:"typing"` } + +// OutputTypingEvent is an entry in typing server output kafka log. +// This contains the event with extra fields used to create 'm.typing' event +// in clientapi & federation. +type OutputSendToDeviceEvent struct { + UserID string `json:"user_id"` + DeviceID string `json:"device_id"` + EventType string `json:"event_type"` + Message json.RawMessage `json:"message"` +} diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 14fbd3328..d8faaa3e1 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -28,9 +28,10 @@ func SetupEDUServerComponent( eduCache *cache.EDUCache, ) api.EDUServerInputAPI { inputAPI := &input.EDUServerInputAPI{ - Cache: eduCache, - Producer: base.KafkaProducer, - OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), + Cache: eduCache, + Producer: base.KafkaProducer, + OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), + OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEventTopic), } inputAPI.SetupHTTP(base.InternalAPIMux) diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 73777e323..4b749637d 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -33,6 +33,8 @@ type EDUServerInputAPI struct { Cache *cache.EDUCache // The kafka topic to output new typing events to. OutputTypingEventTopic string + // The kafka topic to output new send to device events to. + OutputSendToDeviceEventTopic string // kafka producer Producer sarama.SyncProducer } @@ -54,10 +56,20 @@ func (t *EDUServerInputAPI) InputTypingEvent( t.Cache.RemoveUser(ite.UserID, ite.RoomID) } - return t.sendEvent(ite) + return t.sendTypingEvent(ite) } -func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { +// InputTypingEvent implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputSendToDeviceEvent( + ctx context.Context, + request *api.InputSendToDeviceEventRequest, + response *api.InputSendToDeviceEventResponse, +) error { + ise := &request.InputSendToDeviceEvent + return t.sendToDeviceEvent(ise) +} + +func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, RoomID: ite.RoomID, @@ -90,6 +102,29 @@ func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { return err } +func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { + ote := &api.OutputSendToDeviceEvent{ + UserID: ise.UserID, + DeviceID: ise.DeviceID, + EventType: ise.EventType, + Message: ise.Message, + } + + eventJSON, err := json.Marshal(ote) + if err != nil { + return err + } + + m := &sarama.ProducerMessage{ + Topic: string(t.OutputSendToDeviceEventTopic), + Key: sarama.StringEncoder(ote.UserID), + Value: sarama.ByteEncoder(eventJSON), + } + + _, _, err = t.Producer.SendMessage(m) + return err +} + // SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux. func (t *EDUServerInputAPI) SetupHTTP(internalAPIMux *mux.Router) { internalAPIMux.Handle(api.EDUServerInputTypingEventPath, @@ -105,4 +140,17 @@ func (t *EDUServerInputAPI) SetupHTTP(internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(api.EDUServerInputSendToDeviceEventPath, + internal.MakeInternalAPI("inputSendToDeviceEvents", func(req *http.Request) util.JSONResponse { + var request api.InputSendToDeviceEventRequest + var response api.InputSendToDeviceEventResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := t.InputSendToDeviceEvent(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index b514af0ab..bde6729df 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -265,6 +265,25 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server") } + case gomatrixserverlib.MDirectToDevice: + // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema + var directPayload struct { + Sender string `json:"sender"` + EventType string `json:"type"` + MessageID string `json:"message_id"` + Messages map[string]map[string]json.RawMessage `json:"message"` + } + if err := json.Unmarshal(e.Content, &directPayload); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal send-to-device events") + continue + } + for userID, byUser := range directPayload.Messages { + for deviceID, message := range byUser { + if err := t.eduProducer.SendToDevice(t.context, userID, deviceID, directPayload.EventType, message); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to send send-to-device event to edu server") + } + } + } default: util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu") } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index cb8aec6f5..3e28a3476 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -77,6 +77,14 @@ func (p *testEDUProducer) InputTypingEvent( return nil } +func (p *testEDUProducer) InputSendToDeviceEvent( + ctx context.Context, + request *eduAPI.InputSendToDeviceEventRequest, + response *eduAPI.InputSendToDeviceEventResponse, +) error { + return nil +} + type testRoomserverAPI struct { inputRoomEvents []api.InputRoomEvent queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse diff --git a/go.mod b/go.mod index fe2cff768..fff38a9f1 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200521102632-2a81324a04ae + github.com/matrix-org/gomatrixserverlib v0.0.0-20200527122606-3151c3b2d2f2 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index aa0c8dd57..f608259fd 100644 --- a/go.sum +++ b/go.sum @@ -360,6 +360,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200521102632-2a81324a04ae h1:kFMh2aU3pMCkVCUeH57rtgm05XImbxKOHFYeUp80RCk= github.com/matrix-org/gomatrixserverlib v0.0.0-20200521102632-2a81324a04ae/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200527122606-3151c3b2d2f2 h1:uwaQb5X4sf66lxcfUqPNbOU26dHYLyzrjNvKUqTwNgg= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200527122606-3151c3b2d2f2/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/internal/config/config.go b/internal/config/config.go index 2a95069a4..54f12c5b3 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -152,6 +152,8 @@ type Dendrite struct { OutputClientData Topic `yaml:"output_client_data"` // Topic for eduserver/api.OutputTypingEvent events. OutputTypingEvent Topic `yaml:"output_typing_event"` + // Topic for eduserver/api.OutputSendToDeviceEvent events. + OutputSendToDeviceEventTopic Topic `yaml:"output_send_to_device_event"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` }