dendrite/syncapi/syncapi_test.go
John Terzis 40830b8a37 Space,Channel soft deletion with dendrite gating, tests (#889)
Closes HNT-244.

The following PR implements Space,Channel soft deletion using on-chain
`disabled` flag scope to space, channel respectively. On message sync,
dendrite will now gate disabled rooms by performing a leave on the user
attempting to sync unless the user is the owner (more on this later). To
re-join, given rooms (spaces,channels) are created by default using
`invite` membership state, the owner will need to undo the on-chain
`disabled` flag, setting it false then re-invite users that left the
room as a side effect of it becoming disabled previously.

The owner does not leave the space, channel because if they did then
there would be no one left to invite users let alone themselves back in
if the action is ever undone.

What is not implemented in this PR:
1. **Transitive leaves on channels in a space** - If a space is
disabled, users will leave the space but not the channels within the
space. To allow for fully disabling a space and all its' channels, the
client can offer a view to the owner that iterates over the channels and
space to disable all on-chain. Furthermore, we could implement a batch
on-chain method that fully disables all channels within a space (plus
the space) in one on-chain call to save the owner gas.
2. **Data deletion** - No data is remove from the DAGs or on-chain.
Therefore deletion is soft and reversible.
3. **New hook to check if a room is disabled** - the client can leverage
existing on-chain public read only methods `getSpaceInfoBySpaceId`,
`getChannelInfoByChannelId` to read the state of each in order to remove
spaces, channels from a member's view that are disabled.
2022-11-09 17:07:51 -07:00

720 lines
24 KiB
Go

package syncapi
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/clientapi/producers"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type syncRoomserverAPI struct {
rsapi.SyncRoomserverAPI
rooms []*test.Room
}
type clientRoomserverAPI struct {
rsapi.ClientRoomserverAPI
}
func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
var room *test.Room
for _, r := range s.rooms {
if r.ID == req.RoomID {
room = r
break
}
}
if room == nil {
res.RoomExists = false
return nil
}
res.RoomVersion = room.Version
return nil // TODO: return state
}
func (s *syncRoomserverAPI) QuerySharedUsers(ctx context.Context, req *rsapi.QuerySharedUsersRequest, res *rsapi.QuerySharedUsersResponse) error {
res.UserIDsToCount = make(map[string]int)
return nil
}
func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsapi.QueryBulkStateContentRequest, res *rsapi.QueryBulkStateContentResponse) error {
return nil
}
func (s *syncRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *rsapi.QueryMembershipForUserRequest, res *rsapi.QueryMembershipForUserResponse) error {
res.IsRoomForgotten = false
res.RoomExists = true
return nil
}
func (s *syncRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, req *rsapi.QueryMembershipAtEventRequest, res *rsapi.QueryMembershipAtEventResponse) error {
return nil
}
type syncUserAPI struct {
userapi.SyncUserAPI
accounts []userapi.Device
}
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
for _, acc := range s.accounts {
if acc.AccessToken == req.AccessToken {
res.Device = &acc
return nil
}
}
res.Err = "unknown user"
return nil
}
func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
return nil
}
type syncKeyAPI struct {
keyapi.SyncKeyAPI
}
func (s *syncKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyChangesRequest, res *keyapi.QueryKeyChangesResponse) error {
return nil
}
func (s *syncKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneTimeKeysRequest, res *keyapi.QueryOneTimeKeysResponse) error {
return nil
}
func TestSyncAPIAccessTokens(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
testSyncAccessTokens(t, dbType)
})
}
func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
user := test.NewUser(t)
room := test.NewRoom(t, user)
alice := userapi.Device{
ID: "ALICEID",
UserID: user.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "Alice",
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
msgs := toNATSMsgs(t, base, room.Events()...)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &clientRoomserverAPI{}, &syncKeyAPI{})
testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct {
name string
req *http.Request
wantCode int
wantJoinedRooms []string
}{
{
name: "missing access token",
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"timeout": "0",
})),
wantCode: 401,
},
{
name: "unknown access token",
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": "foo",
"timeout": "0",
})),
wantCode: 401,
},
{
name: "valid access token",
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
})),
wantCode: 200,
wantJoinedRooms: []string{room.ID},
},
}
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
})
for _, tc := range testCases {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
}
if tc.wantJoinedRooms != nil {
var res types.Response
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
}
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
}
t.Logf("res: %+v", res.Rooms.Join[room.ID])
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
gotEventIDs[i] = ev.EventID
}
test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
}
}
}
// Tests what happens when we create a room and then /sync before all events from /createRoom have
// been sent to the syncapi
func TestSyncAPICreateRoomSyncEarly(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
testSyncAPICreateRoomSyncEarly(t, dbType)
})
}
func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Skip("Skipped, possibly fixed")
user := test.NewUser(t)
room := test.NewRoom(t, user)
alice := userapi.Device{
ID: "ALICEID",
UserID: user.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "Alice",
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// order is:
// m.room.create
// m.room.member
// m.room.power_levels
// m.room.join_rules
// m.room.history_visibility
msgs := toNATSMsgs(t, base, room.Events()...)
sinceTokens := make([]string, len(msgs))
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &clientRoomserverAPI{}, &syncKeyAPI{})
for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond)
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
})))
if w.Code != 200 {
t.Errorf("got HTTP %d want 200", w.Code)
continue
}
var res types.Response
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
t.Errorf("failed to decode response body: %s", err)
}
sinceTokens[i] = res.NextBatch.String()
if i == 0 { // create event does not produce a room section
if len(res.Rooms.Join) != 0 {
t.Fatalf("i=%v got %d joined rooms, want 0", i, len(res.Rooms.Join))
}
} else { // we should have that room somewhere
if len(res.Rooms.Join) != 1 {
t.Fatalf("i=%v got %d joined rooms, want 1", i, len(res.Rooms.Join))
}
}
}
// sync with no token "" and with the penultimate token and this should neatly return room events in the timeline block
sinceTokens = append([]string{""}, sinceTokens[:len(sinceTokens)-1]...)
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
for i, since := range sinceTokens {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"since": since,
})))
if w.Code != 200 {
t.Errorf("since=%s got HTTP %d want 200", since, w.Code)
}
var res types.Response
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
t.Errorf("failed to decode response body: %s", err)
}
if len(res.Rooms.Join) != 1 {
t.Fatalf("since=%s got %d joined rooms, want 1", since, len(res.Rooms.Join))
}
t.Logf("since=%s res state:%+v res timeline:%+v", since, res.Rooms.Join[room.ID].State.Events, res.Rooms.Join[room.ID].Timeline.Events)
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
for j, ev := range res.Rooms.Join[room.ID].Timeline.Events {
gotEventIDs[j] = ev.EventID
}
test.AssertEventIDsEqual(t, gotEventIDs, room.Events()[i:])
}
}
// Test that if we hit /sync we get back presence: online, regardless of whether messages get delivered
// via NATS. Regression test for a flakey test "User sees their own presence in a sync"
func TestSyncAPIUpdatePresenceImmediately(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
testSyncAPIUpdatePresenceImmediately(t, dbType)
})
}
func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
user := test.NewUser(t)
alice := userapi.Device{
ID: "ALICEID",
UserID: user.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "Alice",
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
base.Cfg.Global.Presence.EnableOutbound = true
base.Cfg.Global.Presence.EnableInbound = true
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &clientRoomserverAPI{}, &syncKeyAPI{})
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"set_presence": "online",
})))
if w.Code != 200 {
t.Fatalf("got HTTP %d want %d", w.Code, 200)
}
var res types.Response
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
t.Errorf("failed to decode response body: %s", err)
}
if len(res.Presence.Events) != 1 {
t.Fatalf("expected 1 presence events, got: %+v", res.Presence.Events)
}
if res.Presence.Events[0].Sender != alice.UserID {
t.Errorf("sender: got %v want %v", res.Presence.Events[0].Sender, alice.UserID)
}
if res.Presence.Events[0].Type != "m.presence" {
t.Errorf("type: got %v want %v", res.Presence.Events[0].Type, "m.presence")
}
if gjson.ParseBytes(res.Presence.Events[0].Content).Get("presence").Str != "online" {
t.Errorf("content: not online, got %v", res.Presence.Events[0].Content)
}
}
// This is mainly what Sytest is doing in "test_history_visibility"
func TestMessageHistoryVisibility(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
testHistoryVisibility(t, dbType)
})
}
func testHistoryVisibility(t *testing.T, dbType test.DBType) {
type result struct {
seeWithoutJoin bool
seeBeforeJoin bool
seeAfterInvite bool
}
// create the users
alice := test.NewUser(t)
aliceDev := userapi.Device{
ID: "ALICEID",
UserID: alice.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "ALICE",
}
bob := test.NewUser(t)
bobDev := userapi.Device{
ID: "BOBID",
UserID: bob.ID,
AccessToken: "BOD_BEARER_TOKEN",
DisplayName: "BOB",
}
ctx := context.Background()
// check guest and normal user accounts
for _, accType := range []userapi.AccountType{userapi.AccountTypeGuest, userapi.AccountTypeUser} {
testCases := []struct {
historyVisibility gomatrixserverlib.HistoryVisibility
wantResult result
}{
{
historyVisibility: gomatrixserverlib.HistoryVisibilityWorldReadable,
wantResult: result{
seeWithoutJoin: true,
seeBeforeJoin: true,
seeAfterInvite: true,
},
},
{
historyVisibility: gomatrixserverlib.HistoryVisibilityShared,
wantResult: result{
seeWithoutJoin: false,
seeBeforeJoin: true,
seeAfterInvite: true,
},
},
{
historyVisibility: gomatrixserverlib.HistoryVisibilityInvited,
wantResult: result{
seeWithoutJoin: false,
seeBeforeJoin: false,
seeAfterInvite: true,
},
},
{
historyVisibility: gomatrixserverlib.HistoryVisibilityJoined,
wantResult: result{
seeWithoutJoin: false,
seeBeforeJoin: false,
seeAfterInvite: false,
},
},
}
bobDev.AccountType = accType
userType := "guest"
if accType == userapi.AccountTypeUser {
userType = "real user"
}
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use the actual internal roomserver API
rsAPI := roomserver.NewInternalAPI(base)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, rsAPI, &syncKeyAPI{})
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
t.Run(testname, func(t *testing.T) {
// create a room with the given visibility
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
// send the events/messages to NATS to create the rooms
beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
eventsToSend := append(room.Events(), beforeJoinEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
})))
if w.Code != 200 {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
}
// We only care about the returned events at this point
var res struct {
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
}
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
t.Errorf("failed to decode response body: %s", err)
}
verifyEventVisible(t, tc.wantResult.seeWithoutJoin, beforeJoinEv, res.Chunk)
// Create invite, a message, join the room and create another message.
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
})))
if w.Code != 200 {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
}
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
t.Errorf("failed to decode response body: %s", err)
}
// verify results
verifyEventVisible(t, tc.wantResult.seeBeforeJoin, beforeJoinEv, res.Chunk)
verifyEventVisible(t, tc.wantResult.seeAfterInvite, afterInviteEv, res.Chunk)
})
}
}
}
func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []gomatrixserverlib.ClientEvent) {
t.Helper()
if wantVisible {
for _, ev := range chunk {
if ev.EventID == wantVisibleEvent.EventID() {
return
}
}
t.Fatalf("expected to see event %s but didn't: %+v", wantVisibleEvent.EventID(), chunk)
} else {
for _, ev := range chunk {
if ev.EventID == wantVisibleEvent.EventID() {
t.Fatalf("expected not to see event %s: %+v", wantVisibleEvent.EventID(), string(ev.Content))
}
}
}
}
func TestSendToDevice(t *testing.T) {
test.WithAllDatabases(t, testSendToDevice)
}
func testSendToDevice(t *testing.T, dbType test.DBType) {
user := test.NewUser(t)
alice := userapi.Device{
ID: "ALICEID",
UserID: user.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "Alice",
AccountType: userapi.AccountTypeUser,
}
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &clientRoomserverAPI{}, &syncKeyAPI{})
producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
JetStream: jsctx,
}
msgCounter := 0
testCases := []struct {
name string
since string
want []string
sendMessagesCount int
}{
{
name: "initial sync, no messages",
want: []string{},
},
{
name: "initial sync, one new message",
sendMessagesCount: 1,
want: []string{
"message 1",
},
},
{
name: "initial sync, two new messages", // we didn't advance the since token, so we'll receive two messages
sendMessagesCount: 1,
want: []string{
"message 1",
"message 2",
},
},
{
name: "incremental sync, one message", // this deletes message 1, as we advanced the since token
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
want: []string{
"message 2",
},
},
{
name: "failed incremental sync, one message", // didn't advance since, so still the same message
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
want: []string{
"message 2",
},
},
{
name: "incremental sync, no message", // this should delete message 2
since: types.StreamingToken{SendToDevicePosition: 2}.String(), // next_batch from previous sync
want: []string{},
},
{
name: "incremental sync, three new messages",
since: types.StreamingToken{SendToDevicePosition: 2}.String(),
sendMessagesCount: 3,
want: []string{
"message 3", // message 2 was deleted in the previous test
"message 4",
"message 5",
},
},
{
name: "initial sync, three messages", // we expect three messages, as we didn't go beyond "2"
want: []string{
"message 3",
"message 4",
"message 5",
},
},
{
name: "incremental sync, no messages", // advance the sync token, no new messages
since: types.StreamingToken{SendToDevicePosition: 5}.String(),
want: []string{},
},
}
ctx := context.Background()
for _, tc := range testCases {
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
for i := 0; i < tc.sendMessagesCount; i++ {
msgCounter++
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
t.Fatalf("unable to send to device message: %v", err)
}
}
syncUntil(t, base, alice.AccessToken,
len(tc.want) == 0,
func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
},
)
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"since": tc.since,
})))
// Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
events := gjson.Get(w.Body.String(), "to_device.events.#.content.dummy").Array()
got := make([]string, len(events))
for i := range events {
got[i] = events[i].String()
}
// Ensure the messages we received are as we expect them to be
if !reflect.DeepEqual(got, tc.want) {
t.Logf("[%s|since=%s]: Sync: %s", tc.name, tc.since, w.Body.String())
t.Fatalf("[%s|since=%s]: got: %+v, want: %+v", tc.name, tc.since, got, tc.want)
}
}
}
func syncUntil(t *testing.T,
base *base.BaseDendrite, accessToken string,
skip bool,
checkFunc func(syncBody string) bool,
) {
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}
if skip {
return
}
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
// to the syncAPI when hitting /sync
done := make(chan bool)
defer close(done)
go func() {
for {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))
if checkFunc(w.Body.String()) {
done <- true
return
}
}
}()
select {
case <-done:
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for messages")
}
}
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {
var addsStateIDs []string
if ev.StateKey() != nil {
addsStateIDs = append(addsStateIDs, ev.EventID())
}
result[i] = testrig.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{
Type: rsapi.OutputTypeNewRoomEvent,
NewRoomEvent: &rsapi.OutputNewRoomEvent{
Event: ev,
AddsStateEventIDs: addsStateIDs,
HistoryVisibility: ev.Visibility,
},
})
}
return result
}