diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 2e9176745..40257ed56 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -93,8 +93,10 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) "event_type": output.Type, }).Info("sync API received send-to-device event from EDU server") + streamPos := s.db.AddSendToDevice() + _, err = s.db.StoreNewSendForDeviceMessage( - context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent, + context.TODO(), streamPos, output.UserID, output.DeviceID, output.SendToDeviceEvent, ) if err != nil { log.WithError(err).Errorf("failed to store send-to-device message") @@ -104,7 +106,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) s.notifier.OnNewSendToDevice( output.UserID, []string{output.DeviceID}, // TODO: support wildcard here as per spec - types.NewStreamToken(0, 1), + types.NewStreamToken(0, streamPos), ) return nil diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index ec0f4de39..d40cbe847 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -104,12 +104,14 @@ type Database interface { StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) + // AddSendToDevice increases the EDU position in the cache and returns the stream position. + AddSendToDevice() types.StreamPosition // SendToDeviceUpdatesForSync returns a list of send-to-device updates, after having completed // updates and deletions for previous events. The sync token should be supplied to this function so // that we can clean up old events properly. SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, token types.StreamingToken) ([]types.SendToDeviceEvent, []types.SendToDeviceNID, []types.SendToDeviceNID, error) // StoreNewSendForDeviceMessage stores a new send-to-device event for a user's device. - StoreNewSendForDeviceMessage(ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error) + StoreNewSendForDeviceMessage(ctx context.Context, streamPos types.StreamPosition, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error) // CleanSendToDeviceUpdates will update or remove any send-to-device updates based on the given sync. CleanSendToDeviceUpdates(ctx context.Context, toUpdate, toDelete []types.SendToDeviceNID, token types.StreamingToken) (err error) } diff --git a/syncapi/storage/postgres/send_to_device_table.go b/syncapi/storage/postgres/send_to_device_table.go index 64591d796..a995a5275 100644 --- a/syncapi/storage/postgres/send_to_device_table.go +++ b/syncapi/storage/postgres/send_to_device_table.go @@ -53,6 +53,7 @@ const selectSendToDeviceMessagesSQL = ` SELECT id, user_id, device_id, content, sent_by_token FROM syncapi_send_to_device WHERE user_id = $1 AND device_id = $2 + ORDER BY id DESC ` const updateSentSendToDeviceMessagesSQL = ` diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 9448489af..10967b76e 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -91,6 +91,10 @@ func (d *Database) RemoveTypingUser( return types.StreamPosition(d.EDUCache.RemoveUser(userID, roomID)) } +func (d *Database) AddSendToDevice() types.StreamPosition { + return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage()) +} + func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { d.EDUCache.SetTimeoutCallback(fn) } @@ -1041,11 +1045,11 @@ func (d *Database) AddSendToDeviceEvent( } func (d *Database) StoreNewSendForDeviceMessage( - ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent, + ctx context.Context, streamPos types.StreamPosition, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent, ) (types.StreamPosition, error) { j, err := json.Marshal(event) if err != nil { - return 0, err + return streamPos, err } // Delegate the database write task to the SendToDeviceWriter. It'll guarantee // that we don't lock the table for writes in more than one place. @@ -1055,9 +1059,9 @@ func (d *Database) StoreNewSendForDeviceMessage( ) }) if err != nil { - return 0, err + return streamPos, err } - return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage()), nil + return streamPos, nil } func (d *Database) SendToDeviceUpdatesForSync( diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go index 9ba8fe055..80f120222 100644 --- a/syncapi/storage/sqlite3/send_to_device_table.go +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -51,6 +51,7 @@ const selectSendToDeviceMessagesSQL = ` SELECT id, user_id, device_id, content, sent_by_token FROM syncapi_send_to_device WHERE user_id = $1 AND device_id = $2 + ORDER BY id DESC ` const updateSentSendToDeviceMessagesSQL = ` diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 792ba1c9d..4661ede4d 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -537,7 +537,7 @@ func TestSendToDeviceBehaviour(t *testing.T) { } // Try sending a message. - streamPos, err := db.StoreNewSendForDeviceMessage(ctx, "alice", "one", gomatrixserverlib.SendToDeviceEvent{ + streamPos, err := db.StoreNewSendForDeviceMessage(ctx, types.StreamPosition(0), "alice", "one", gomatrixserverlib.SendToDeviceEvent{ Sender: "bob", Type: "m.type", Content: json.RawMessage("{}"),