mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-17 02:53:11 -06:00
Closes HNT-244. The following PR implements Space,Channel soft deletion using on-chain `disabled` flag scope to space, channel respectively. On message sync, dendrite will now gate disabled rooms by performing a leave on the user attempting to sync unless the user is the owner (more on this later). To re-join, given rooms (spaces,channels) are created by default using `invite` membership state, the owner will need to undo the on-chain `disabled` flag, setting it false then re-invite users that left the room as a side effect of it becoming disabled previously. The owner does not leave the space, channel because if they did then there would be no one left to invite users let alone themselves back in if the action is ever undone. What is not implemented in this PR: 1. **Transitive leaves on channels in a space** - If a space is disabled, users will leave the space but not the channels within the space. To allow for fully disabling a space and all its' channels, the client can offer a view to the owner that iterates over the channels and space to disable all on-chain. Furthermore, we could implement a batch on-chain method that fully disables all channels within a space (plus the space) in one on-chain call to save the owner gas. 2. **Data deletion** - No data is remove from the DAGs or on-chain. Therefore deletion is soft and reversible. 3. **New hook to check if a room is disabled** - the client can leverage existing on-chain public read only methods `getSpaceInfoBySpaceId`, `getChannelInfoByChannelId` to read the state of each in order to remove spaces, channels from a member's view that are disabled.
720 lines
24 KiB
Go
720 lines
24 KiB
Go
package syncapi
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/tidwall/gjson"
|
|
|
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
|
"github.com/matrix-org/dendrite/roomserver"
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/setup/base"
|
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
"github.com/matrix-org/dendrite/test"
|
|
"github.com/matrix-org/dendrite/test/testrig"
|
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
)
|
|
|
|
type syncRoomserverAPI struct {
|
|
rsapi.SyncRoomserverAPI
|
|
rooms []*test.Room
|
|
}
|
|
|
|
type clientRoomserverAPI struct {
|
|
rsapi.ClientRoomserverAPI
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
|
|
var room *test.Room
|
|
for _, r := range s.rooms {
|
|
if r.ID == req.RoomID {
|
|
room = r
|
|
break
|
|
}
|
|
}
|
|
if room == nil {
|
|
res.RoomExists = false
|
|
return nil
|
|
}
|
|
res.RoomVersion = room.Version
|
|
return nil // TODO: return state
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QuerySharedUsers(ctx context.Context, req *rsapi.QuerySharedUsersRequest, res *rsapi.QuerySharedUsersResponse) error {
|
|
res.UserIDsToCount = make(map[string]int)
|
|
return nil
|
|
}
|
|
func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsapi.QueryBulkStateContentRequest, res *rsapi.QueryBulkStateContentResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *rsapi.QueryMembershipForUserRequest, res *rsapi.QueryMembershipForUserResponse) error {
|
|
res.IsRoomForgotten = false
|
|
res.RoomExists = true
|
|
return nil
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, req *rsapi.QueryMembershipAtEventRequest, res *rsapi.QueryMembershipAtEventResponse) error {
|
|
return nil
|
|
}
|
|
|
|
type syncUserAPI struct {
|
|
userapi.SyncUserAPI
|
|
accounts []userapi.Device
|
|
}
|
|
|
|
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
|
|
for _, acc := range s.accounts {
|
|
if acc.AccessToken == req.AccessToken {
|
|
res.Device = &acc
|
|
return nil
|
|
}
|
|
}
|
|
res.Err = "unknown user"
|
|
return nil
|
|
}
|
|
|
|
func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
|
|
return nil
|
|
}
|
|
|
|
type syncKeyAPI struct {
|
|
keyapi.SyncKeyAPI
|
|
}
|
|
|
|
func (s *syncKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyChangesRequest, res *keyapi.QueryKeyChangesResponse) error {
|
|
return nil
|
|
}
|
|
func (s *syncKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneTimeKeysRequest, res *keyapi.QueryOneTimeKeysResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func TestSyncAPIAccessTokens(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAccessTokens(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
room := test.NewRoom(t, user)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
msgs := toNATSMsgs(t, base, room.Events()...)
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &clientRoomserverAPI{}, &syncKeyAPI{})
|
|
testrig.MustPublishMsgs(t, jsctx, msgs...)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
req *http.Request
|
|
wantCode int
|
|
wantJoinedRooms []string
|
|
}{
|
|
{
|
|
name: "missing access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 401,
|
|
},
|
|
{
|
|
name: "unknown access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": "foo",
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 401,
|
|
},
|
|
{
|
|
name: "valid access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 200,
|
|
wantJoinedRooms: []string{room.ID},
|
|
},
|
|
}
|
|
|
|
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
for _, tc := range testCases {
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
|
|
if w.Code != tc.wantCode {
|
|
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
|
|
}
|
|
if tc.wantJoinedRooms != nil {
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
|
|
}
|
|
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
|
|
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
|
|
}
|
|
t.Logf("res: %+v", res.Rooms.Join[room.ID])
|
|
|
|
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[i] = ev.EventID
|
|
}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tests what happens when we create a room and then /sync before all events from /createRoom have
|
|
// been sent to the syncapi
|
|
func TestSyncAPICreateRoomSyncEarly(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAPICreateRoomSyncEarly(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
|
|
t.Skip("Skipped, possibly fixed")
|
|
user := test.NewUser(t)
|
|
room := test.NewRoom(t, user)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
// order is:
|
|
// m.room.create
|
|
// m.room.member
|
|
// m.room.power_levels
|
|
// m.room.join_rules
|
|
// m.room.history_visibility
|
|
msgs := toNATSMsgs(t, base, room.Events()...)
|
|
sinceTokens := make([]string, len(msgs))
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &clientRoomserverAPI{}, &syncKeyAPI{})
|
|
for i, msg := range msgs {
|
|
testrig.MustPublishMsgs(t, jsctx, msg)
|
|
time.Sleep(100 * time.Millisecond)
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Errorf("got HTTP %d want 200", w.Code)
|
|
continue
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
sinceTokens[i] = res.NextBatch.String()
|
|
if i == 0 { // create event does not produce a room section
|
|
if len(res.Rooms.Join) != 0 {
|
|
t.Fatalf("i=%v got %d joined rooms, want 0", i, len(res.Rooms.Join))
|
|
}
|
|
} else { // we should have that room somewhere
|
|
if len(res.Rooms.Join) != 1 {
|
|
t.Fatalf("i=%v got %d joined rooms, want 1", i, len(res.Rooms.Join))
|
|
}
|
|
}
|
|
}
|
|
|
|
// sync with no token "" and with the penultimate token and this should neatly return room events in the timeline block
|
|
sinceTokens = append([]string{""}, sinceTokens[:len(sinceTokens)-1]...)
|
|
|
|
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
|
|
for i, since := range sinceTokens {
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"since": since,
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Errorf("since=%s got HTTP %d want 200", since, w.Code)
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
if len(res.Rooms.Join) != 1 {
|
|
t.Fatalf("since=%s got %d joined rooms, want 1", since, len(res.Rooms.Join))
|
|
}
|
|
t.Logf("since=%s res state:%+v res timeline:%+v", since, res.Rooms.Join[room.ID].State.Events, res.Rooms.Join[room.ID].Timeline.Events)
|
|
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for j, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[j] = ev.EventID
|
|
}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, room.Events()[i:])
|
|
}
|
|
}
|
|
|
|
// Test that if we hit /sync we get back presence: online, regardless of whether messages get delivered
|
|
// via NATS. Regression test for a flakey test "User sees their own presence in a sync"
|
|
func TestSyncAPIUpdatePresenceImmediately(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAPIUpdatePresenceImmediately(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
base.Cfg.Global.Presence.EnableOutbound = true
|
|
base.Cfg.Global.Presence.EnableInbound = true
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &clientRoomserverAPI{}, &syncKeyAPI{})
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"set_presence": "online",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
if len(res.Presence.Events) != 1 {
|
|
t.Fatalf("expected 1 presence events, got: %+v", res.Presence.Events)
|
|
}
|
|
if res.Presence.Events[0].Sender != alice.UserID {
|
|
t.Errorf("sender: got %v want %v", res.Presence.Events[0].Sender, alice.UserID)
|
|
}
|
|
if res.Presence.Events[0].Type != "m.presence" {
|
|
t.Errorf("type: got %v want %v", res.Presence.Events[0].Type, "m.presence")
|
|
}
|
|
if gjson.ParseBytes(res.Presence.Events[0].Content).Get("presence").Str != "online" {
|
|
t.Errorf("content: not online, got %v", res.Presence.Events[0].Content)
|
|
}
|
|
|
|
}
|
|
|
|
// This is mainly what Sytest is doing in "test_history_visibility"
|
|
func TestMessageHistoryVisibility(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testHistoryVisibility(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testHistoryVisibility(t *testing.T, dbType test.DBType) {
|
|
type result struct {
|
|
seeWithoutJoin bool
|
|
seeBeforeJoin bool
|
|
seeAfterInvite bool
|
|
}
|
|
|
|
// create the users
|
|
alice := test.NewUser(t)
|
|
aliceDev := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: alice.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "ALICE",
|
|
}
|
|
|
|
bob := test.NewUser(t)
|
|
|
|
bobDev := userapi.Device{
|
|
ID: "BOBID",
|
|
UserID: bob.ID,
|
|
AccessToken: "BOD_BEARER_TOKEN",
|
|
DisplayName: "BOB",
|
|
}
|
|
|
|
ctx := context.Background()
|
|
// check guest and normal user accounts
|
|
for _, accType := range []userapi.AccountType{userapi.AccountTypeGuest, userapi.AccountTypeUser} {
|
|
testCases := []struct {
|
|
historyVisibility gomatrixserverlib.HistoryVisibility
|
|
wantResult result
|
|
}{
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityWorldReadable,
|
|
wantResult: result{
|
|
seeWithoutJoin: true,
|
|
seeBeforeJoin: true,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityShared,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: true,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityInvited,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: false,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityJoined,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: false,
|
|
seeAfterInvite: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
bobDev.AccountType = accType
|
|
userType := "guest"
|
|
if accType == userapi.AccountTypeUser {
|
|
userType = "real user"
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
|
|
// Use the actual internal roomserver API
|
|
rsAPI := roomserver.NewInternalAPI(base)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, rsAPI, &syncKeyAPI{})
|
|
|
|
for _, tc := range testCases {
|
|
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
|
|
t.Run(testname, func(t *testing.T) {
|
|
// create a room with the given visibility
|
|
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
|
|
|
|
// send the events/messages to NATS to create the rooms
|
|
beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
|
|
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
|
|
eventsToSend := append(room.Events(), beforeJoinEv)
|
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
syncUntil(t, base, aliceDev.AccessToken, false,
|
|
func(syncBody string) bool {
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
|
|
return gjson.Get(syncBody, path).Exists()
|
|
},
|
|
)
|
|
|
|
// There is only one event, we expect only to be able to see this, if the room is world_readable
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
"dir": "b",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
// We only care about the returned events at this point
|
|
var res struct {
|
|
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
|
|
}
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
|
|
verifyEventVisible(t, tc.wantResult.seeWithoutJoin, beforeJoinEv, res.Chunk)
|
|
|
|
// Create invite, a message, join the room and create another message.
|
|
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
|
|
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
|
|
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
|
|
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
|
|
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
|
|
|
|
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
|
|
|
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
syncUntil(t, base, aliceDev.AccessToken, false,
|
|
func(syncBody string) bool {
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
|
|
return gjson.Get(syncBody, path).Exists()
|
|
},
|
|
)
|
|
|
|
// Verify the messages after/before invite are visible or not
|
|
w = httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
"dir": "b",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
// verify results
|
|
verifyEventVisible(t, tc.wantResult.seeBeforeJoin, beforeJoinEv, res.Chunk)
|
|
verifyEventVisible(t, tc.wantResult.seeAfterInvite, afterInviteEv, res.Chunk)
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []gomatrixserverlib.ClientEvent) {
|
|
t.Helper()
|
|
if wantVisible {
|
|
for _, ev := range chunk {
|
|
if ev.EventID == wantVisibleEvent.EventID() {
|
|
return
|
|
}
|
|
}
|
|
t.Fatalf("expected to see event %s but didn't: %+v", wantVisibleEvent.EventID(), chunk)
|
|
} else {
|
|
for _, ev := range chunk {
|
|
if ev.EventID == wantVisibleEvent.EventID() {
|
|
t.Fatalf("expected not to see event %s: %+v", wantVisibleEvent.EventID(), string(ev.Content))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSendToDevice(t *testing.T) {
|
|
test.WithAllDatabases(t, testSendToDevice)
|
|
}
|
|
|
|
func testSendToDevice(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
|
|
defer baseClose()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &clientRoomserverAPI{}, &syncKeyAPI{})
|
|
|
|
producer := producers.SyncAPIProducer{
|
|
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
|
JetStream: jsctx,
|
|
}
|
|
|
|
msgCounter := 0
|
|
|
|
testCases := []struct {
|
|
name string
|
|
since string
|
|
want []string
|
|
sendMessagesCount int
|
|
}{
|
|
{
|
|
name: "initial sync, no messages",
|
|
want: []string{},
|
|
},
|
|
{
|
|
name: "initial sync, one new message",
|
|
sendMessagesCount: 1,
|
|
want: []string{
|
|
"message 1",
|
|
},
|
|
},
|
|
{
|
|
name: "initial sync, two new messages", // we didn't advance the since token, so we'll receive two messages
|
|
sendMessagesCount: 1,
|
|
want: []string{
|
|
"message 1",
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, one message", // this deletes message 1, as we advanced the since token
|
|
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
|
|
want: []string{
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "failed incremental sync, one message", // didn't advance since, so still the same message
|
|
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
|
|
want: []string{
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, no message", // this should delete message 2
|
|
since: types.StreamingToken{SendToDevicePosition: 2}.String(), // next_batch from previous sync
|
|
want: []string{},
|
|
},
|
|
{
|
|
name: "incremental sync, three new messages",
|
|
since: types.StreamingToken{SendToDevicePosition: 2}.String(),
|
|
sendMessagesCount: 3,
|
|
want: []string{
|
|
"message 3", // message 2 was deleted in the previous test
|
|
"message 4",
|
|
"message 5",
|
|
},
|
|
},
|
|
{
|
|
name: "initial sync, three messages", // we expect three messages, as we didn't go beyond "2"
|
|
want: []string{
|
|
"message 3",
|
|
"message 4",
|
|
"message 5",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, no messages", // advance the sync token, no new messages
|
|
since: types.StreamingToken{SendToDevicePosition: 5}.String(),
|
|
want: []string{},
|
|
},
|
|
}
|
|
|
|
ctx := context.Background()
|
|
for _, tc := range testCases {
|
|
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
|
for i := 0; i < tc.sendMessagesCount; i++ {
|
|
msgCounter++
|
|
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
|
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
|
t.Fatalf("unable to send to device message: %v", err)
|
|
}
|
|
}
|
|
|
|
syncUntil(t, base, alice.AccessToken,
|
|
len(tc.want) == 0,
|
|
func(body string) bool {
|
|
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
|
|
},
|
|
)
|
|
|
|
// Execute a /sync request, recording the response
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"since": tc.since,
|
|
})))
|
|
|
|
// Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
|
|
events := gjson.Get(w.Body.String(), "to_device.events.#.content.dummy").Array()
|
|
got := make([]string, len(events))
|
|
for i := range events {
|
|
got[i] = events[i].String()
|
|
}
|
|
|
|
// Ensure the messages we received are as we expect them to be
|
|
if !reflect.DeepEqual(got, tc.want) {
|
|
t.Logf("[%s|since=%s]: Sync: %s", tc.name, tc.since, w.Body.String())
|
|
t.Fatalf("[%s|since=%s]: got: %+v, want: %+v", tc.name, tc.since, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func syncUntil(t *testing.T,
|
|
base *base.BaseDendrite, accessToken string,
|
|
skip bool,
|
|
checkFunc func(syncBody string) bool,
|
|
) {
|
|
if checkFunc == nil {
|
|
t.Fatalf("No checkFunc defined")
|
|
}
|
|
if skip {
|
|
return
|
|
}
|
|
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
|
|
// to the syncAPI when hitting /sync
|
|
done := make(chan bool)
|
|
defer close(done)
|
|
go func() {
|
|
for {
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": accessToken,
|
|
"timeout": "1000",
|
|
})))
|
|
if checkFunc(w.Body.String()) {
|
|
done <- true
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
case <-time.After(time.Second * 5):
|
|
t.Fatalf("Timed out waiting for messages")
|
|
}
|
|
}
|
|
|
|
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
|
|
result := make([]*nats.Msg, len(input))
|
|
for i, ev := range input {
|
|
var addsStateIDs []string
|
|
if ev.StateKey() != nil {
|
|
addsStateIDs = append(addsStateIDs, ev.EventID())
|
|
}
|
|
result[i] = testrig.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{
|
|
Type: rsapi.OutputTypeNewRoomEvent,
|
|
NewRoomEvent: &rsapi.OutputNewRoomEvent{
|
|
Event: ev,
|
|
AddsStateEventIDs: addsStateIDs,
|
|
HistoryVisibility: ev.Visibility,
|
|
},
|
|
})
|
|
}
|
|
return result
|
|
}
|