diff --git a/clientapi/producers/eduserver.go b/clientapi/producers/eduserver.go index f87988b0e..2d2c3d74d 100644 --- a/clientapi/producers/eduserver.go +++ b/clientapi/producers/eduserver.go @@ -64,10 +64,12 @@ func (p *EDUServerProducer) SendToDevice( return err } requestData := api.InputSendToDeviceEvent{ - UserID: userID, - DeviceID: deviceID, - EventType: eventType, - Message: js, + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + UserID: userID, + DeviceID: deviceID, + EventType: eventType, + Message: js, + }, } request := api.InputSendToDeviceEventRequest{ InputSendToDeviceEvent: requestData, diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 26670a792..190975292 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -15,7 +15,6 @@ package api import ( "context" - "encoding/json" "errors" "net/http" @@ -39,14 +38,7 @@ type InputTypingEvent struct { } type InputSendToDeviceEvent struct { - // The user ID to send the update to. - UserID string `json:"user_id"` - // The device ID to send the update to. - DeviceID string `json:"device_id"` - // The type of the event. - EventType string `json:"event_type"` - // The contents of the message. - Message json.RawMessage `json:"message"` + gomatrixserverlib.SendToDeviceEvent } // InputTypingEventRequest is a request to EDUServerInputAPI diff --git a/eduserver/api/output.go b/eduserver/api/output.go index 902d63973..d7aed6326 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -13,8 +13,9 @@ package api import ( - "encoding/json" "time" + + "github.com/matrix-org/gomatrixserverlib" ) // OutputTypingEvent is an entry in typing server output kafka log. @@ -40,8 +41,5 @@ type TypingEvent struct { // This contains the event with extra fields used to create 'm.typing' event // in clientapi & federation. type OutputSendToDeviceEvent struct { - UserID string `json:"user_id"` - DeviceID string `json:"device_id"` - EventType string `json:"event_type"` - Message json.RawMessage `json:"message"` + gomatrixserverlib.SendToDeviceEvent } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 4b749637d..b0198f8b8 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -104,10 +104,12 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { ote := &api.OutputSendToDeviceEvent{ - UserID: ise.UserID, - DeviceID: ise.DeviceID, - EventType: ise.EventType, - Message: ise.Message, + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + UserID: ise.UserID, + DeviceID: ise.DeviceID, + EventType: ise.EventType, + Message: ise.Message, + }, } eventJSON, err := json.Marshal(ote) diff --git a/go.mod b/go.mod index 4365ea503..a43b6e0aa 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200528122156-fbb320a2ee61 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200528131445-0aa540ad74d1 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index c08cfa5d5..1262456c1 100644 --- a/go.sum +++ b/go.sum @@ -358,6 +358,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200528122156-fbb320a2ee61 h1:3rgoGvj/skUWg+u9E6ycEFs2ZGenEjr28ZtAhAhmZeM= github.com/matrix-org/gomatrixserverlib v0.0.0-20200528122156-fbb320a2ee61/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200528131445-0aa540ad74d1 h1:ueCm+xtIYsPn0MKsGvd61EC/mRnaThNOYEhmPXIPGr4= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200528131445-0aa540ad74d1/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/syncapi/storage/postgres/send_to_device_table.go b/syncapi/storage/postgres/send_to_device_table.go new file mode 100644 index 000000000..bd5eb9f2b --- /dev/null +++ b/syncapi/storage/postgres/send_to_device_table.go @@ -0,0 +1,146 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const sendToDeviceSchema = ` +CREATE SEQUENCE IF NOT EXISTS syncapi_send_to_device_id; + +-- Stores send-to-device messages. +CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( + -- The ID that uniquely identifies this message. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_send_to_device_id'), + -- The user ID to send the message to. + user_id TEXT NOT NULL, + -- The device ID to send the message to. + device_id TEXT NOT NULL, + -- The event type. + event_type TEXT NOT NULL, + -- The event content JSON. + content TEXT NOT NULL, + -- The sync token that was supplied when we tried to send the message, + -- or NULL if we haven't tried to send it yet. + sent_by_token TEXT +); +` + +const insertSendToDeviceMessageSQL = ` + INSERT INTO syncapi_send_to_device (user_id, device_id, event_type, content) + VALUES ($1, $2, $3, $4) +` + +const selectSendToDeviceMessagesSQL = ` + SELECT id, user_id, device_id, event_type, content, sent_by_token + FROM syncapi_send_to_device + WHERE user_id = $1 AND device_id = $2 +` + +const updateSentSendToDeviceMessagesSQL = ` + UPDATE syncapi_send_to_device SET sent_by_token = $1 + WHERE id = ANY($2) +` + +const deleteSendToDeviceMessagesSQL = ` + DELETE FROM syncapi_send_to_device WHERE id = ANY($1) +` + +type sendToDeviceStatements struct { + insertSendToDeviceMessageStmt *sql.Stmt + selectSendToDeviceMessagesStmt *sql.Stmt + updateSentSendToDeviceMessagesStmt *sql.Stmt + deleteSendToDeviceMessagesStmt *sql.Stmt +} + +func NewPostgresSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) { + s := &sendToDeviceStatements{} + _, err := db.Exec(sendToDeviceSchema) + if err != nil { + return nil, err + } + if s.insertSendToDeviceMessageStmt, err = db.Prepare(insertSendToDeviceMessageSQL); err != nil { + return nil, err + } + if s.selectSendToDeviceMessagesStmt, err = db.Prepare(selectSendToDeviceMessagesSQL); err != nil { + return nil, err + } + if s.updateSentSendToDeviceMessagesStmt, err = db.Prepare(updateSentSendToDeviceMessagesSQL); err != nil { + return nil, err + } + if s.deleteSendToDeviceMessagesStmt, err = db.Prepare(deleteSendToDeviceMessagesSQL); err != nil { + return nil, err + } + return s, nil +} + +func (s *sendToDeviceStatements) InsertSendToDeviceMessage( + ctx context.Context, txn *sql.Tx, userID, deviceID, eventType, content string, +) (err error) { + _, err = txn.Stmt(s.insertSendToDeviceMessageStmt).ExecContext(ctx, userID, deviceID, eventType, content) + return +} + +func (s *sendToDeviceStatements) SelectSendToDeviceMessages( + ctx context.Context, userID, deviceID string, +) (events []types.SendToDeviceEvent, err error) { + rows, err := s.selectSendToDeviceMessagesStmt.QueryContext(ctx, userID, deviceID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectSendToDeviceMessages: rows.close() failed") + + for rows.Next() { + var id types.SendToDeviceNID + var userID, deviceID, eventType, message string + var sentByToken *string + if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil { + return + } + events = append(events, types.SendToDeviceEvent{ + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + UserID: userID, + DeviceID: deviceID, + EventType: eventType, + Message: json.RawMessage(message), + }, + SentByToken: sentByToken, + }) + } + + return events, rows.Err() +} + +func (s *sendToDeviceStatements) UpdateSentSendToDeviceMessages( + ctx context.Context, txn *sql.Tx, token string, nids []types.SendToDeviceNID, +) (err error) { + _, err = txn.Stmt(s.updateSentSendToDeviceMessagesStmt).ExecContext(ctx, token, nids) + return +} + +func (s *sendToDeviceStatements) DeleteSendToDeviceMessages( + ctx context.Context, txn *sql.Tx, nids []types.SendToDeviceNID, +) (err error) { + _, err = txn.Stmt(s.deleteSendToDeviceMessagesStmt).ExecContext(ctx, nids) + return +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index dc73350a2..3684b7ed1 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -69,6 +69,10 @@ func NewDatabase(dbDataSourceName string, dbProperties internal.DbProperties) (* if err != nil { return nil, err } + sendToDevice, err := NewPostgresSendToDeviceTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Invites: invites, @@ -77,6 +81,7 @@ func NewDatabase(dbDataSourceName string, dbProperties internal.DbProperties) (* Topology: topology, CurrentRoomState: currState, BackwardExtremities: backwardExtremities, + SendToDevice: sendToDevice, EDUCache: cache.New(), } return &d, nil diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 888f85e0b..adf23dc68 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -27,6 +27,7 @@ type Database struct { Topology tables.Topology CurrentRoomState tables.CurrentRoomState BackwardExtremities tables.BackwardsExtremities + SendToDevice tables.SendToDevice EDUCache *cache.EDUCache } diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go new file mode 100644 index 000000000..32db430f7 --- /dev/null +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -0,0 +1,148 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "encoding/json" + "strings" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const sendToDeviceSchema = ` +-- Stores send-to-device messages. +CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( + -- The ID that uniquely identifies this message. + id INTEGER PRIMARY KEY AUTOINCREMENT, + -- The user ID to send the message to. + user_id TEXT NOT NULL, + -- The device ID to send the message to. + device_id TEXT NOT NULL, + -- The event type. + event_type TEXT NOT NULL, + -- The event content JSON. + content TEXT NOT NULL, + -- The sync token that was supplied when we tried to send the message, + -- or NULL if we haven't tried to send it yet. + sent_by_token TEXT +); +` + +const insertSendToDeviceMessageSQL = ` + INSERT INTO syncapi_send_to_device (user_id, device_id, event_type, content) + VALUES ($1, $2, $3, $4) +` + +const selectSendToDeviceMessagesSQL = ` + SELECT id, user_id, device_id, event_type, content, sent_by_token + FROM syncapi_send_to_device + WHERE user_id = $1 AND device_id = $2 +` + +const updateSentSendToDeviceMessagesSQL = ` + UPDATE syncapi_send_to_device SET sent_by_token = $1 + WHERE id IN ($2) +` + +const deleteSendToDeviceMessagesSQL = ` + DELETE FROM syncapi_send_to_device WHERE id IN ($1) +` + +type sendToDeviceStatements struct { + insertSendToDeviceMessageStmt *sql.Stmt + selectSendToDeviceMessagesStmt *sql.Stmt +} + +func NewSqliteSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) { + s := &sendToDeviceStatements{} + _, err := db.Exec(sendToDeviceSchema) + if err != nil { + return nil, err + } + if s.insertSendToDeviceMessageStmt, err = db.Prepare(insertSendToDeviceMessageSQL); err != nil { + return nil, err + } + if s.selectSendToDeviceMessagesStmt, err = db.Prepare(selectSendToDeviceMessagesSQL); err != nil { + return nil, err + } + return s, nil +} + +func (s *sendToDeviceStatements) InsertSendToDeviceMessage( + ctx context.Context, txn *sql.Tx, userID, deviceID, eventType, content string, +) (err error) { + _, err = txn.Stmt(s.insertSendToDeviceMessageStmt).ExecContext(ctx, userID, deviceID, eventType, content) + return +} + +func (s *sendToDeviceStatements) SelectSendToDeviceMessages( + ctx context.Context, userID, deviceID string, +) (events []types.SendToDeviceEvent, err error) { + rows, err := s.selectSendToDeviceMessagesStmt.QueryContext(ctx, userID, deviceID) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectSendToDeviceMessages: rows.close() failed") + + for rows.Next() { + var id types.SendToDeviceNID + var userID, deviceID, eventType, message string + var sentByToken *string + if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil { + return + } + events = append(events, types.SendToDeviceEvent{ + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + UserID: userID, + DeviceID: deviceID, + EventType: eventType, + Message: json.RawMessage(message), + }, + SentByToken: sentByToken, + }) + } + + return events, rows.Err() +} + +func (s *sendToDeviceStatements) UpdateSentSendToDeviceMessages( + ctx context.Context, txn *sql.Tx, token string, nids []types.SendToDeviceNID, +) (err error) { + query := strings.Replace(updateSentSendToDeviceMessagesSQL, "($2)", internal.QueryVariadic(len(nids)), 1) + params := make([]interface{}, 1+len(nids)) + params[0] = token + for k, v := range nids { + params[k+1] = v + } + _, err = txn.ExecContext(ctx, query, params...) + return +} + +func (s *sendToDeviceStatements) DeleteSendToDeviceMessages( + ctx context.Context, txn *sql.Tx, nids []types.SendToDeviceNID, +) (err error) { + query := strings.Replace(deleteSendToDeviceMessagesSQL, "($1)", internal.QueryVariadic(len(nids)), 1) + params := make([]interface{}, 1+len(nids)) + for k, v := range nids { + params[k] = v + } + _, err = txn.ExecContext(ctx, query, params...) + return +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 8ab1d4040..de8730a16 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -95,6 +95,10 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } + sendToDevice, err := NewSqliteSendToDeviceTable(d.db) + if err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Invites: invites, @@ -103,6 +107,7 @@ func (d *SyncServerDatasource) prepare() (err error) { BackwardExtremities: bwExtrem, CurrentRoomState: roomState, Topology: topology, + SendToDevice: sendToDevice, EDUCache: cache.New(), } return nil diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index bc3b6941a..43527b25a 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -94,3 +94,10 @@ type BackwardsExtremities interface { // DeleteBackwardExtremity removes a backwards extremity for a room, if one existed. DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error) } + +type SendToDevice interface { + InsertSendToDeviceMessage(ctx context.Context, txn *sql.Tx, userID, deviceID, eventType, content string) (err error) + SelectSendToDeviceMessages(ctx context.Context, userID, deviceID string) (events []types.SendToDeviceEvent, err error) + UpdateSentSendToDeviceMessages(ctx context.Context, txn *sql.Tx, token string, nids []types.SendToDeviceNID) (err error) + DeleteSendToDeviceMessages(ctx context.Context, txn *sql.Tx, nids []types.SendToDeviceNID) (err error) +} diff --git a/syncapi/types/types.go b/syncapi/types/types.go index caa1b3ade..bc0c807d3 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -393,3 +393,10 @@ func NewLeaveResponse() *LeaveResponse { res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0) return &res } + +type SendToDeviceNID int + +type SendToDeviceEvent struct { + gomatrixserverlib.SendToDeviceEvent + SentByToken *string +}