Delete correct Send-to-Device messages (#2608)

* Add send-to-device tests

* Update tests, fix message deletion

* PR comments
This commit is contained in:
Till 2022-08-02 17:00:16 +02:00 committed by GitHub
parent e384eb683f
commit df5d4dc7a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 145 additions and 21 deletions

View file

@ -58,7 +58,7 @@ const selectSendToDeviceMessagesSQL = `
const deleteSendToDeviceMessagesSQL = ` const deleteSendToDeviceMessagesSQL = `
DELETE FROM syncapi_send_to_device DELETE FROM syncapi_send_to_device
WHERE user_id = $1 AND device_id = $2 AND id < $3 WHERE user_id = $1 AND device_id = $2 AND id <= $3
` `
const selectMaxSendToDeviceIDSQL = "" + const selectMaxSendToDeviceIDSQL = "" +

View file

@ -55,7 +55,7 @@ const selectSendToDeviceMessagesSQL = `
const deleteSendToDeviceMessagesSQL = ` const deleteSendToDeviceMessagesSQL = `
DELETE FROM syncapi_send_to_device DELETE FROM syncapi_send_to_device
WHERE user_id = $1 AND device_id = $2 AND id < $3 WHERE user_id = $1 AND device_id = $2 AND id <= $3
` `
const selectMaxSendToDeviceIDSQL = "" + const selectMaxSendToDeviceIDSQL = "" +

View file

@ -416,11 +416,6 @@ func TestSendToDeviceBehaviour(t *testing.T) {
t.Fatal("first call should have no updates") t.Fatal("first call should have no updates")
} }
err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, 100)
if err != nil {
return
}
// Try sending a message. // Try sending a message.
streamPos, err := db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{ streamPos, err := db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{
Sender: bob.ID, Sender: bob.ID,
@ -441,43 +436,35 @@ func TestSendToDeviceBehaviour(t *testing.T) {
if count := len(events); count != 1 { if count := len(events); count != 1 {
t.Fatalf("second call should have one update, got %d", count) t.Fatalf("second call should have one update, got %d", count)
} }
err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos)
if err != nil {
return
}
// At this point we should still have one message because we haven't progressed the // At this point we should still have one message because we haven't progressed the
// sync position yet. This is equivalent to the client failing to /sync and retrying // sync position yet. This is equivalent to the client failing to /sync and retrying
// with the same position. // with the same position.
streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100) streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, streamPos)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(events) != 1 { if len(events) != 1 {
t.Fatal("third call should have one update still") t.Fatal("third call should have one update still")
} }
err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1) err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos)
if err != nil { if err != nil {
return return
} }
// At this point we should now have no updates, because we've progressed the sync // At this point we should now have no updates, because we've progressed the sync
// position. Therefore the update from before will not be sent again. // position. Therefore the update from before will not be sent again.
_, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos+1, streamPos+2) _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos, streamPos+10)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(events) != 0 { if len(events) != 0 {
t.Fatal("fourth call should have no updates") t.Fatal("fourth call should have no updates")
} }
err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1)
if err != nil {
return
}
// At this point we should still have no updates, because no new updates have been // At this point we should still have no updates, because no new updates have been
// sent. // sent.
_, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos, streamPos+2) _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos, streamPos+10)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -491,7 +478,7 @@ func TestSendToDeviceBehaviour(t *testing.T) {
streamPos, err = db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{ streamPos, err = db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{
Sender: bob.ID, Sender: bob.ID,
Type: "m.type", Type: "m.type",
Content: json.RawMessage(fmt.Sprintf(`{ "count": %d }`, i)), Content: json.RawMessage(fmt.Sprintf(`{"count":%d}`, i)),
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View file

@ -3,11 +3,14 @@ package syncapi
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect"
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/producers"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
@ -311,6 +314,139 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
} }
func TestSendToDevice(t *testing.T) {
test.WithAllDatabases(t, testSendToDevice)
}
func testSendToDevice(t *testing.T, dbType test.DBType) {
user := test.NewUser(t)
alice := userapi.Device{
ID: "ALICEID",
UserID: user.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "Alice",
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &syncKeyAPI{})
producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
JetStream: jsctx,
}
msgCounter := 0
testCases := []struct {
name string
since string
want []string
sendMessagesCount int
}{
{
name: "initial sync, no messages",
want: []string{},
},
{
name: "initial sync, one new message",
sendMessagesCount: 1,
want: []string{
"message 1",
},
},
{
name: "initial sync, two new messages", // we didn't advance the since token, so we'll receive two messages
sendMessagesCount: 1,
want: []string{
"message 1",
"message 2",
},
},
{
name: "incremental sync, one message", // this deletes message 1, as we advanced the since token
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
want: []string{
"message 2",
},
},
{
name: "failed incremental sync, one message", // didn't advance since, so still the same message
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
want: []string{
"message 2",
},
},
{
name: "incremental sync, no message", // this should delete message 2
since: types.StreamingToken{SendToDevicePosition: 2}.String(), // next_batch from previous sync
want: []string{},
},
{
name: "incremental sync, three new messages",
since: types.StreamingToken{SendToDevicePosition: 2}.String(),
sendMessagesCount: 3,
want: []string{
"message 3", // message 2 was deleted in the previous test
"message 4",
"message 5",
},
},
{
name: "initial sync, three messages", // we expect three messages, as we didn't go beyond "2"
want: []string{
"message 3",
"message 4",
"message 5",
},
},
{
name: "incremental sync, no messages", // advance the sync token, no new messages
since: types.StreamingToken{SendToDevicePosition: 5}.String(),
want: []string{},
},
}
ctx := context.Background()
for _, tc := range testCases {
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
for i := 0; i < tc.sendMessagesCount; i++ {
msgCounter++
msg := map[string]string{
"dummy": fmt.Sprintf("message %d", msgCounter),
}
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
t.Fatalf("unable to send to device message: %v", err)
}
}
time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"since": tc.since,
})))
// Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
events := gjson.Get(w.Body.String(), "to_device.events.#.content.dummy").Array()
got := make([]string, len(events))
for i := range events {
got[i] = events[i].String()
}
// Ensure the messages we received are as we expect them to be
if !reflect.DeepEqual(got, tc.want) {
t.Logf("[%s|since=%s]: Sync: %s", tc.name, tc.since, w.Body.String())
t.Fatalf("[%s|since=%s]: got: %+v, want: %+v", tc.name, tc.since, got, tc.want)
}
}
}
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg { func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input)) result := make([]*nats.Msg, len(input))
for i, ev := range input { for i, ev := range input {

View file

@ -32,11 +32,11 @@ func CreateBaseDendrite(t *testing.T, dbType test.DBType) (*base.BaseDendrite, f
var cfg config.Dendrite var cfg config.Dendrite
cfg.Defaults(false) cfg.Defaults(false)
cfg.Global.JetStream.InMemory = true cfg.Global.JetStream.InMemory = true
switch dbType { switch dbType {
case test.DBTypePostgres: case test.DBTypePostgres:
cfg.Global.Defaults(true) // autogen a signing key cfg.Global.Defaults(true) // autogen a signing key
cfg.MediaAPI.Defaults(true) // autogen a media path cfg.MediaAPI.Defaults(true) // autogen a media path
cfg.Global.ServerName = "test"
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
// the file system event with InMemory=true :( // the file system event with InMemory=true :(
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType) cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
@ -50,6 +50,7 @@ func CreateBaseDendrite(t *testing.T, dbType test.DBType) (*base.BaseDendrite, f
return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), close return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), close
case test.DBTypeSQLite: case test.DBTypeSQLite:
cfg.Defaults(true) // sets a sqlite db per component cfg.Defaults(true) // sets a sqlite db per component
cfg.Global.ServerName = "test"
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
// the file system event with InMemory=true :( // the file system event with InMemory=true :(
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType) cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)