From e4eba873f53769432b3997b8092469f170209d6f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 28 May 2020 18:38:23 +0100 Subject: [PATCH] Refactor a bit --- clientapi/producers/eduserver.go | 11 ++++--- clientapi/routing/sendtodevice.go | 2 +- eduserver/api/input.go | 2 ++ eduserver/api/output.go | 2 ++ eduserver/input/input.go | 11 +++---- federationapi/routing/send.go | 9 ++---- go.mod | 2 +- go.sum | 2 ++ syncapi/consumers/eduserver_sendtodevice.go | 6 ++-- syncapi/storage/interface.go | 2 +- .../storage/postgres/send_to_device_table.go | 32 +++++++++---------- syncapi/storage/shared/syncserver.go | 8 ++--- .../storage/sqlite3/send_to_device_table.go | 32 +++++++++---------- syncapi/storage/storage_test.go | 9 +++--- syncapi/storage/tables/interface.go | 2 +- syncapi/types/types.go | 2 ++ 16 files changed, 66 insertions(+), 68 deletions(-) diff --git a/clientapi/producers/eduserver.go b/clientapi/producers/eduserver.go index 2d2c3d74d..102c1fad8 100644 --- a/clientapi/producers/eduserver.go +++ b/clientapi/producers/eduserver.go @@ -56,7 +56,7 @@ func (p *EDUServerProducer) SendTyping( // SendToDevice sends a typing event to EDU server func (p *EDUServerProducer) SendToDevice( - ctx context.Context, userID, deviceID, eventType string, + ctx context.Context, sender, userID, deviceID, eventType string, message interface{}, ) error { js, err := json.Marshal(message) @@ -64,11 +64,12 @@ func (p *EDUServerProducer) SendToDevice( return err } requestData := api.InputSendToDeviceEvent{ + UserID: userID, + DeviceID: deviceID, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ - UserID: userID, - DeviceID: deviceID, - EventType: eventType, - Message: js, + Sender: sender, + Type: eventType, + Content: js, }, } request := api.InputSendToDeviceEventRequest{ diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index 6657fd103..45ea86c75 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -51,7 +51,7 @@ func SendToDevice( for userID, byUser := range httpReq.Messages { for deviceID, message := range byUser { if err := eduProducer.SendToDevice( - req.Context(), userID, deviceID, eventType, message, + req.Context(), device.UserID, userID, deviceID, eventType, message, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 190975292..a6e3697f7 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -38,6 +38,8 @@ type InputTypingEvent struct { } type InputSendToDeviceEvent struct { + UserID string `json:"user_id"` + DeviceID string `json:"device_id"` gomatrixserverlib.SendToDeviceEvent } diff --git a/eduserver/api/output.go b/eduserver/api/output.go index d7aed6326..148b5d003 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -41,5 +41,7 @@ type TypingEvent struct { // This contains the event with extra fields used to create 'm.typing' event // in clientapi & federation. type OutputSendToDeviceEvent struct { + UserID string `json:"user_id"` + DeviceID string `json:"device_id"` gomatrixserverlib.SendToDeviceEvent } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index ed9ef18f0..a90461672 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -105,18 +105,15 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { ote := &api.OutputSendToDeviceEvent{ - SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ - UserID: ise.UserID, - DeviceID: ise.DeviceID, - EventType: ise.EventType, - Message: ise.Message, - }, + UserID: ise.UserID, + DeviceID: ise.DeviceID, + SendToDeviceEvent: ise.SendToDeviceEvent, } logrus.WithFields(logrus.Fields{ "user_id": ise.UserID, "device_id": ise.DeviceID, - "event_type": ise.EventType, + "event_type": ise.Type, }).Info("handling send-to-device message") eventJSON, err := json.Marshal(ote) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index df0aebc11..0d5d0ce5d 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -267,12 +267,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { } case gomatrixserverlib.MDirectToDevice: // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema - var directPayload struct { - Sender string `json:"sender"` - EventType string `json:"type"` - MessageID string `json:"message_id"` - Messages map[string]map[string]json.RawMessage `json:"message"` - } + var directPayload gomatrixserverlib.ToDeviceMessage if err := json.Unmarshal(e.Content, &directPayload); err != nil { util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal send-to-device events") continue @@ -280,7 +275,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { for userID, byUser := range directPayload.Messages { for deviceID, message := range byUser { // TODO: check that the user and the device actually exist here - if err := t.eduProducer.SendToDevice(t.context, userID, deviceID, directPayload.EventType, message); err != nil { + if err := t.eduProducer.SendToDevice(t.context, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { util.GetLogger(t.context).WithError(err).Error("Failed to send send-to-device event to edu server") } } diff --git a/go.mod b/go.mod index a43b6e0aa..08f2a5802 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200528131445-0aa540ad74d1 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200528171922-504ef926060e github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 1262456c1..e37c06f67 100644 --- a/go.sum +++ b/go.sum @@ -360,6 +360,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200528122156-fbb320a2ee61 h1:3r github.com/matrix-org/gomatrixserverlib v0.0.0-20200528122156-fbb320a2ee61/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200528131445-0aa540ad74d1 h1:ueCm+xtIYsPn0MKsGvd61EC/mRnaThNOYEhmPXIPGr4= github.com/matrix-org/gomatrixserverlib v0.0.0-20200528131445-0aa540ad74d1/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200528171922-504ef926060e h1:OivrqPGYy5NYqGZqf+DE9dX1xiJtJ0ip8TJgt3jqcrA= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200528171922-504ef926060e/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 3c8be703e..d71cb1d71 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -81,10 +81,12 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) log.WithFields(log.Fields{ "user_id": output.UserID, "device_id": output.DeviceID, - "event_type": output.EventType, + "event_type": output.Type, }).Debug("received send-to-device event from EDU server") - newPos, err := s.db.StoreNewSendForDeviceMessage(context.TODO(), output.SendToDeviceEvent) + newPos, err := s.db.StoreNewSendForDeviceMessage( + context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent, + ) if err != nil { log.WithError(err).Errorf("failed to store send-to-device message") return err diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index bfe1c53cf..51bcd5e9b 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -109,5 +109,5 @@ type Database interface { // that we can clean up old events properly. SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, token types.StreamingToken) ([]types.SendToDeviceEvent, error) // StoreNewSendForDeviceMessage stores a new send-to-device event for a user's device. - StoreNewSendForDeviceMessage(ctx context.Context, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error) + StoreNewSendForDeviceMessage(ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error) } diff --git a/syncapi/storage/postgres/send_to_device_table.go b/syncapi/storage/postgres/send_to_device_table.go index b9e682eb8..07da77a6f 100644 --- a/syncapi/storage/postgres/send_to_device_table.go +++ b/syncapi/storage/postgres/send_to_device_table.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) const sendToDeviceSchema = ` @@ -33,12 +32,12 @@ CREATE SEQUENCE IF NOT EXISTS syncapi_send_to_device_id; CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( -- The ID that uniquely identifies this message. id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_send_to_device_id'), + -- The sender of the message. + sender TEXT NOT NULL, -- The user ID to send the message to. user_id TEXT NOT NULL, -- The device ID to send the message to. device_id TEXT NOT NULL, - -- The event type. - event_type TEXT NOT NULL, -- The event content JSON. content TEXT NOT NULL, -- The sync token that was supplied when we tried to send the message, @@ -48,12 +47,12 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( ` const insertSendToDeviceMessageSQL = ` - INSERT INTO syncapi_send_to_device (user_id, device_id, event_type, content) - VALUES ($1, $2, $3, $4) + INSERT INTO syncapi_send_to_device (user_id, device_id, content) + VALUES ($1, $2, $3) ` const selectSendToDeviceMessagesSQL = ` - SELECT id, user_id, device_id, event_type, content, sent_by_token + SELECT id, sender, user_id, device_id, content, sent_by_token FROM syncapi_send_to_device WHERE user_id = $1 AND device_id = $2 ` @@ -96,9 +95,9 @@ func NewPostgresSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) { } func (s *sendToDeviceStatements) InsertSendToDeviceMessage( - ctx context.Context, txn *sql.Tx, userID, deviceID, eventType, content string, + ctx context.Context, txn *sql.Tx, userID, deviceID, content string, ) (err error) { - _, err = internal.TxStmt(txn, s.insertSendToDeviceMessageStmt).ExecContext(ctx, userID, deviceID, eventType, content) + _, err = internal.TxStmt(txn, s.insertSendToDeviceMessageStmt).ExecContext(ctx, userID, deviceID, content) return } @@ -113,19 +112,18 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages( for rows.Next() { var id types.SendToDeviceNID - var userID, deviceID, eventType, message string + var userID, deviceID, content string var sentByToken *string - if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil { + if err = rows.Scan(&id, &userID, &deviceID, &content, &sentByToken); err != nil { return } event := types.SendToDeviceEvent{ - ID: id, - SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ - UserID: userID, - DeviceID: deviceID, - EventType: eventType, - Message: json.RawMessage(message), - }, + ID: id, + UserID: userID, + DeviceID: deviceID, + } + if err = json.Unmarshal([]byte(content), &event.SendToDeviceEvent); err != nil { + return } if sentByToken != nil { if token, err := types.NewStreamTokenFromString(*sentByToken); err == nil { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 8ffbe6498..8460aeac7 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1031,18 +1031,18 @@ func (d *Database) currentStateStreamEventsForRoom( func (d *Database) AddSendToDeviceEvent( ctx context.Context, txn *sql.Tx, - userID, deviceID, eventType, message string, + userID, deviceID, eventType, content string, ) error { return d.SendToDevice.InsertSendToDeviceMessage( - ctx, txn, userID, deviceID, eventType, message, + ctx, txn, userID, deviceID, content, ) } func (d *Database) StoreNewSendForDeviceMessage( - ctx context.Context, event gomatrixserverlib.SendToDeviceEvent, + ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent, ) (types.StreamPosition, error) { err := d.AddSendToDeviceEvent( - ctx, nil, event.UserID, event.DeviceID, event.EventType, string(event.Message), + ctx, nil, userID, deviceID, event.Type, string(event.Content), ) if err != nil { return 0, err diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go index 2bcb8f62f..9fa742210 100644 --- a/syncapi/storage/sqlite3/send_to_device_table.go +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) const sendToDeviceSchema = ` @@ -31,12 +30,12 @@ const sendToDeviceSchema = ` CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( -- The ID that uniquely identifies this message. id INTEGER PRIMARY KEY AUTOINCREMENT, + -- The sender of the message. + sender TEXT NOT NULL, -- The user ID to send the message to. user_id TEXT NOT NULL, -- The device ID to send the message to. device_id TEXT NOT NULL, - -- The event type. - event_type TEXT NOT NULL, -- The event content JSON. content TEXT NOT NULL, -- The sync token that was supplied when we tried to send the message, @@ -46,12 +45,12 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( ` const insertSendToDeviceMessageSQL = ` - INSERT INTO syncapi_send_to_device (user_id, device_id, event_type, content) - VALUES ($1, $2, $3, $4) + INSERT INTO syncapi_send_to_device (user_id, device_id, content) + VALUES ($1, $2, $3) ` const selectSendToDeviceMessagesSQL = ` - SELECT id, user_id, device_id, event_type, content, sent_by_token + SELECT id, user_id, device_id, content, sent_by_token FROM syncapi_send_to_device WHERE user_id = $1 AND device_id = $2 ` @@ -86,9 +85,9 @@ func NewSqliteSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) { } func (s *sendToDeviceStatements) InsertSendToDeviceMessage( - ctx context.Context, txn *sql.Tx, userID, deviceID, eventType, content string, + ctx context.Context, txn *sql.Tx, userID, deviceID, content string, ) (err error) { - _, err = internal.TxStmt(txn, s.insertSendToDeviceMessageStmt).ExecContext(ctx, userID, deviceID, eventType, content) + _, err = internal.TxStmt(txn, s.insertSendToDeviceMessageStmt).ExecContext(ctx, userID, deviceID, content) return } @@ -103,19 +102,18 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages( for rows.Next() { var id types.SendToDeviceNID - var userID, deviceID, eventType, message string + var sender, userID, deviceID, content string var sentByToken *string - if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil { + if err = rows.Scan(&id, &sender, &userID, &deviceID, &content, &sentByToken); err != nil { return } event := types.SendToDeviceEvent{ - ID: id, - SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ - UserID: userID, - DeviceID: deviceID, - EventType: eventType, - Message: json.RawMessage(message), - }, + ID: id, + UserID: userID, + DeviceID: deviceID, + } + if err = json.Unmarshal([]byte(content), &event.SendToDeviceEvent); err != nil { + return } if sentByToken != nil { if token, err := types.NewStreamTokenFromString(*sentByToken); err == nil { diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 184bbdda5..bc664bd67 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -528,11 +528,10 @@ func TestSendToDeviceBehaviour(t *testing.T) { } // Try sending a message. - streamPos, err := db.StoreNewSendForDeviceMessage(ctx, gomatrixserverlib.SendToDeviceEvent{ - UserID: "alice", - DeviceID: "one", - EventType: "m.type", - Message: json.RawMessage("{}"), + streamPos, err := db.StoreNewSendForDeviceMessage(ctx, "alice", "one", gomatrixserverlib.SendToDeviceEvent{ + Sender: "bob", + Type: "m.type", + Content: json.RawMessage("{}"), }) if err != nil { t.Fatal(err) diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 9bdbb4d0a..0ae1d4d91 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -96,7 +96,7 @@ type BackwardsExtremities interface { } type SendToDevice interface { - InsertSendToDeviceMessage(ctx context.Context, txn *sql.Tx, userID, deviceID, eventType, content string) (err error) + InsertSendToDeviceMessage(ctx context.Context, txn *sql.Tx, userID, deviceID, content string) (err error) SelectSendToDeviceMessages(ctx context.Context, txn *sql.Tx, userID, deviceID string) (events []types.SendToDeviceEvent, err error) UpdateSentSendToDeviceMessages(ctx context.Context, txn *sql.Tx, token string, nids []types.SendToDeviceNID) (err error) DeleteSendToDeviceMessages(ctx context.Context, txn *sql.Tx, nids []types.SendToDeviceNID) (err error) diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 198b1bc22..2c8082c45 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -404,5 +404,7 @@ type SendToDeviceNID int type SendToDeviceEvent struct { gomatrixserverlib.SendToDeviceEvent ID SendToDeviceNID + UserID string + DeviceID string SentByToken *StreamingToken }