Groundwork for send-to-device messaging

This commit is contained in:
Neil Alexander 2020-05-27 13:33:56 +01:00
parent 57841fc35e
commit a90ea4c320
12 changed files with 243 additions and 8 deletions

View file

@ -14,6 +14,7 @@ package producers
import (
"context"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/eduserver/api"
@ -52,3 +53,25 @@ func (p *EDUServerProducer) SendTyping(
return err
}
// SendToDevice sends a typing event to EDU server
func (p *EDUServerProducer) SendToDevice(
ctx context.Context, userID, deviceID, eventType string,
message interface{},
) error {
js, err := json.Marshal(message)
if err != nil {
return err
}
requestData := api.InputSendToDeviceEvent{
UserID: userID,
DeviceID: deviceID,
EventType: eventType,
Message: js,
}
request := api.InputSendToDeviceEventRequest{
InputSendToDeviceEvent: requestData,
}
response := api.InputSendToDeviceEventResponse{}
return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response)
}

View file

@ -274,6 +274,17 @@ func Setup(
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/sendToDevice/{eventType}/{txnID}",
internal.MakeAuthAPI("send_to_device", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars, err := internal.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/account/whoami",
internal.MakeAuthAPI("whoami", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return Whoami(req, device)

View file

@ -0,0 +1,66 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/util"
)
// SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId}
// sends the device events to the EDU Server
func SendToDevice(
req *http.Request, device *authtypes.Device,
eduProducer *producers.EDUServerProducer,
txnCache *transactions.Cache,
eventType string, txnID *string,
) util.JSONResponse {
if txnID != nil {
// Try to fetch response from transactionsCache
if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok {
return *res
}
}
// parse the incoming http request
var httpReq struct {
Messages map[string]map[string]json.RawMessage `json:"messages"`
}
resErr := httputil.UnmarshalJSONRequest(req, &req)
if resErr != nil {
return *resErr
}
for userID, byUser := range httpReq.Messages {
for deviceID, message := range byUser {
if err := eduProducer.SendToDevice(
req.Context(), userID, deviceID, eventType, message,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed")
return jsonerror.InternalServerError()
}
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

View file

@ -15,6 +15,7 @@ package api
import (
"context"
"encoding/json"
"errors"
"net/http"
@ -37,6 +38,17 @@ type InputTypingEvent struct {
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
}
type InputSendToDeviceEvent struct {
// The user ID to send the update to.
UserID string `json:"user_id"`
// The device ID to send the update to.
DeviceID string `json:"device_id"`
// The type of the event.
EventType string `json:"event_type"`
// The contents of the message.
Message json.RawMessage `json:"message"`
}
// InputTypingEventRequest is a request to EDUServerInputAPI
type InputTypingEventRequest struct {
InputTypingEvent InputTypingEvent `json:"input_typing_event"`
@ -45,6 +57,14 @@ type InputTypingEventRequest struct {
// InputTypingEventResponse is a response to InputTypingEvents
type InputTypingEventResponse struct{}
// InputSendToDeviceEventRequest is a request to EDUServerInputAPI
type InputSendToDeviceEventRequest struct {
InputSendToDeviceEvent InputSendToDeviceEvent `json:"input_send_to_device_event"`
}
// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest
type InputSendToDeviceEventResponse struct{}
// EDUServerInputAPI is used to write events to the typing server.
type EDUServerInputAPI interface {
InputTypingEvent(
@ -52,11 +72,20 @@ type EDUServerInputAPI interface {
request *InputTypingEventRequest,
response *InputTypingEventResponse,
) error
InputSendToDeviceEvent(
ctx context.Context,
request *InputSendToDeviceEventRequest,
response *InputSendToDeviceEventResponse,
) error
}
// EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API.
const EDUServerInputTypingEventPath = "/eduserver/input"
// EDUServerInputSendToDeviceEventPath is the HTTP path for the InputSendToDeviceEvent API.
const EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
// NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) (EDUServerInputAPI, error) {
if httpClient == nil {
@ -70,7 +99,7 @@ type httpEDUServerInputAPI struct {
httpClient *http.Client
}
// InputRoomEvents implements EDUServerInputAPI
// InputTypingEvent implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputTypingEvent(
ctx context.Context,
request *InputTypingEventRequest,
@ -82,3 +111,16 @@ func (h *httpEDUServerInputAPI) InputTypingEvent(
apiURL := h.eduServerURL + EDUServerInputTypingEventPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// InputSendToDeviceEvent implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputSendToDeviceEvent(
ctx context.Context,
request *InputSendToDeviceEventRequest,
response *InputSendToDeviceEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputSendToDeviceEvent")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -12,7 +12,10 @@
package api
import "time"
import (
"encoding/json"
"time"
)
// OutputTypingEvent is an entry in typing server output kafka log.
// This contains the event with extra fields used to create 'm.typing' event
@ -32,3 +35,13 @@ type TypingEvent struct {
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
// OutputTypingEvent is an entry in typing server output kafka log.
// This contains the event with extra fields used to create 'm.typing' event
// in clientapi & federation.
type OutputSendToDeviceEvent struct {
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
EventType string `json:"event_type"`
Message json.RawMessage `json:"message"`
}

View file

@ -28,9 +28,10 @@ func SetupEDUServerComponent(
eduCache *cache.EDUCache,
) api.EDUServerInputAPI {
inputAPI := &input.EDUServerInputAPI{
Cache: eduCache,
Producer: base.KafkaProducer,
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
Cache: eduCache,
Producer: base.KafkaProducer,
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEventTopic),
}
inputAPI.SetupHTTP(base.InternalAPIMux)

View file

@ -33,6 +33,8 @@ type EDUServerInputAPI struct {
Cache *cache.EDUCache
// The kafka topic to output new typing events to.
OutputTypingEventTopic string
// The kafka topic to output new send to device events to.
OutputSendToDeviceEventTopic string
// kafka producer
Producer sarama.SyncProducer
}
@ -54,10 +56,20 @@ func (t *EDUServerInputAPI) InputTypingEvent(
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
}
return t.sendEvent(ite)
return t.sendTypingEvent(ite)
}
func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
// InputTypingEvent implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputSendToDeviceEvent(
ctx context.Context,
request *api.InputSendToDeviceEventRequest,
response *api.InputSendToDeviceEventResponse,
) error {
ise := &request.InputSendToDeviceEvent
return t.sendToDeviceEvent(ise)
}
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: ite.RoomID,
@ -90,6 +102,29 @@ func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
return err
}
func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error {
ote := &api.OutputSendToDeviceEvent{
UserID: ise.UserID,
DeviceID: ise.DeviceID,
EventType: ise.EventType,
Message: ise.Message,
}
eventJSON, err := json.Marshal(ote)
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: string(t.OutputSendToDeviceEventTopic),
Key: sarama.StringEncoder(ote.UserID),
Value: sarama.ByteEncoder(eventJSON),
}
_, _, err = t.Producer.SendMessage(m)
return err
}
// SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux.
func (t *EDUServerInputAPI) SetupHTTP(internalAPIMux *mux.Router) {
internalAPIMux.Handle(api.EDUServerInputTypingEventPath,
@ -105,4 +140,17 @@ func (t *EDUServerInputAPI) SetupHTTP(internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(api.EDUServerInputSendToDeviceEventPath,
internal.MakeInternalAPI("inputSendToDeviceEvents", func(req *http.Request) util.JSONResponse {
var request api.InputSendToDeviceEventRequest
var response api.InputSendToDeviceEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputSendToDeviceEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -265,6 +265,25 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
}
case gomatrixserverlib.MDirectToDevice:
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
var directPayload struct {
Sender string `json:"sender"`
EventType string `json:"type"`
MessageID string `json:"message_id"`
Messages map[string]map[string]json.RawMessage `json:"message"`
}
if err := json.Unmarshal(e.Content, &directPayload); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal send-to-device events")
continue
}
for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser {
if err := t.eduProducer.SendToDevice(t.context, userID, deviceID, directPayload.EventType, message); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send send-to-device event to edu server")
}
}
}
default:
util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu")
}

View file

@ -77,6 +77,14 @@ func (p *testEDUProducer) InputTypingEvent(
return nil
}
func (p *testEDUProducer) InputSendToDeviceEvent(
ctx context.Context,
request *eduAPI.InputSendToDeviceEventRequest,
response *eduAPI.InputSendToDeviceEventResponse,
) error {
return nil
}
type testRoomserverAPI struct {
inputRoomEvents []api.InputRoomEvent
queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse

2
go.mod
View file

@ -18,7 +18,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200521102632-2a81324a04ae
github.com/matrix-org/gomatrixserverlib v0.0.0-20200527122606-3151c3b2d2f2
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible

2
go.sum
View file

@ -360,6 +360,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200521102632-2a81324a04ae h1:kFMh2aU3pMCkVCUeH57rtgm05XImbxKOHFYeUp80RCk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200521102632-2a81324a04ae/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200527122606-3151c3b2d2f2 h1:uwaQb5X4sf66lxcfUqPNbOU26dHYLyzrjNvKUqTwNgg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200527122606-3151c3b2d2f2/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=

View file

@ -152,6 +152,8 @@ type Dendrite struct {
OutputClientData Topic `yaml:"output_client_data"`
// Topic for eduserver/api.OutputTypingEvent events.
OutputTypingEvent Topic `yaml:"output_typing_event"`
// Topic for eduserver/api.OutputSendToDeviceEvent events.
OutputSendToDeviceEventTopic Topic `yaml:"output_send_to_device_event"`
// Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"`
}