From d3bf9cb31b09118af5fb93d3df9badefa94935b2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 28 May 2020 15:11:44 +0100 Subject: [PATCH] Add some logic for send-to-device messages, add them into sync stream --- syncapi/storage/interface.go | 4 ++ .../storage/postgres/send_to_device_table.go | 11 ++-- syncapi/storage/shared/syncserver.go | 53 +++++++++++++++++++ .../storage/sqlite3/send_to_device_table.go | 11 ++-- syncapi/sync/requestpool.go | 26 +++++++++ syncapi/types/types.go | 6 ++- 6 files changed, 104 insertions(+), 7 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 7e1a40fd6..b6e2b195b 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -104,4 +104,8 @@ type Database interface { StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) + // SendToDeviceUpdatesForSync returns a list of send-to-device updates, after having completed + // updates and deletions for previous events. The sync token should be supplied to this function so + // that we can clean up old events properly. + SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, token types.StreamingToken) ([]types.SendToDeviceEvent, error) } diff --git a/syncapi/storage/postgres/send_to_device_table.go b/syncapi/storage/postgres/send_to_device_table.go index bd5eb9f2b..d31be563e 100644 --- a/syncapi/storage/postgres/send_to_device_table.go +++ b/syncapi/storage/postgres/send_to_device_table.go @@ -117,15 +117,20 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages( if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil { return } - events = append(events, types.SendToDeviceEvent{ + event := types.SendToDeviceEvent{ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ UserID: userID, DeviceID: deviceID, EventType: eventType, Message: json.RawMessage(message), }, - SentByToken: sentByToken, - }) + } + if sentByToken != nil { + if token, err := types.NewStreamTokenFromString(*sentByToken); err == nil { + event.SentByToken = &token + } + } + events = append(events, event) } return events, rows.Err() diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index adf23dc68..a989cc5a9 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1029,6 +1029,59 @@ func (d *Database) currentStateStreamEventsForRoom( return s, nil } +func (d *Database) AddSendToDeviceEvent( + ctx context.Context, txn *sql.Tx, + userID, deviceID, eventType, message string, +) error { + return d.SendToDevice.InsertSendToDeviceMessage( + ctx, txn, userID, deviceID, eventType, message, + ) +} + +func (d *Database) SendToDeviceUpdatesForSync( + ctx context.Context, + userID, deviceID string, + token types.StreamingToken, +) (events []types.SendToDeviceEvent, err error) { + err = internal.WithTransaction(d.DB, func(txn *sql.Tx) error { + // First of all, get our send-to-device updates for this user. + events, err := d.SendToDevice.SelectSendToDeviceMessages(ctx, userID, deviceID) + if err != nil { + return fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err) + } + + // Start by cleaning up any send-to-device messages that have older sent-by-tokens. + // This means that they were sent in a previous /sync and the client has happily + // progressed onto newer sync tokens. + toUpdate := []types.SendToDeviceNID{} + toDelete := []types.SendToDeviceNID{} + for pos, event := range events { + if event.SentByToken != nil && token.IsAfter(*event.SentByToken) { + // Mark the event for deletion and remove it from our list of return events. + toDelete = append(toDelete, event.ID) + events = append(events[:pos], events[pos+1:]...) + } else { + // Mark the event for update and keep it in our list of return events. + toUpdate = append(toUpdate, event.ID) + event.SentByToken = &token + } + } + + // Delete any send-to-device messages marked for deletion. + if err := d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, toDelete); err != nil { + return fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", err) + } + + // Now update any outstanding send-to-device messages with the new sync token. + if err := d.SendToDevice.UpdateSentSendToDeviceMessages(ctx, txn, token.String(), toUpdate); err != nil { + return fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err) + } + + return nil + }) + return +} + // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward. diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go index 32db430f7..db02f40c2 100644 --- a/syncapi/storage/sqlite3/send_to_device_table.go +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -108,15 +108,20 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages( if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil { return } - events = append(events, types.SendToDeviceEvent{ + event := types.SendToDeviceEvent{ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ UserID: userID, DeviceID: deviceID, EventType: eventType, Message: json.RawMessage(message), }, - SentByToken: sentByToken, - }) + } + if sentByToken != nil { + if token, err := types.NewStreamTokenFromString(*sentByToken); err == nil { + event.SentByToken = &token + } + } + events = append(events, event) } return events, rows.Err() diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index bd29b3338..140d2ca38 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -15,6 +15,7 @@ package sync import ( + "context" "net/http" "time" @@ -147,6 +148,11 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter) + if err != nil { + return + } + + res, err = rp.appendSendToDeviceMessages(res, req.device.UserID, req, latestPos) return } @@ -235,6 +241,26 @@ func (rp *RequestPool) appendAccountData( return data, nil } +func (rp *RequestPool) appendSendToDeviceMessages( + data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken, +) (*types.Response, error) { + events, err := rp.db.SendToDeviceUpdatesForSync( + context.TODO(), + userID, + req.device.ID, + currentPos, + ) + if err != nil { + return nil, err + } + + for _, event := range events { + data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent) + } + + return data, nil +} + // shouldReturnImmediately returns whether the /sync request is an initial sync, // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. diff --git a/syncapi/types/types.go b/syncapi/types/types.go index bc0c807d3..3d9d31553 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -296,6 +296,9 @@ type Response struct { Invite map[string]InviteResponse `json:"invite"` Leave map[string]LeaveResponse `json:"leave"` } `json:"rooms"` + ToDevice struct { + Events []gomatrixserverlib.SendToDeviceEvent `json:"events"` + } `json:"to_device"` } // NewResponse creates an empty response with initialised maps. @@ -398,5 +401,6 @@ type SendToDeviceNID int type SendToDeviceEvent struct { gomatrixserverlib.SendToDeviceEvent - SentByToken *string + ID SendToDeviceNID + SentByToken *StreamingToken }