mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-18 18:04:27 -06:00
7d83f8b633
This also fixes an issue regarding updates to relations for invalid events, which could result in us retrying said event over and over again, if we fail to unmarshal the event to `gomatrixserverlib.RelationContent`, this was discovered by `@sleroq:virto.community`
1217 lines
39 KiB
Go
1217 lines
39 KiB
Go
package syncapi
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/tidwall/gjson"
|
|
|
|
"github.com/matrix-org/dendrite/syncapi/routing"
|
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
|
|
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
|
"github.com/matrix-org/dendrite/roomserver"
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/setup/base"
|
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
"github.com/matrix-org/dendrite/test"
|
|
"github.com/matrix-org/dendrite/test/testrig"
|
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
)
|
|
|
|
type syncRoomserverAPI struct {
|
|
rsapi.SyncRoomserverAPI
|
|
rooms []*test.Room
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
|
|
var room *test.Room
|
|
for _, r := range s.rooms {
|
|
if r.ID == req.RoomID {
|
|
room = r
|
|
break
|
|
}
|
|
}
|
|
if room == nil {
|
|
res.RoomExists = false
|
|
return nil
|
|
}
|
|
res.RoomVersion = room.Version
|
|
return nil // TODO: return state
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QuerySharedUsers(ctx context.Context, req *rsapi.QuerySharedUsersRequest, res *rsapi.QuerySharedUsersResponse) error {
|
|
res.UserIDsToCount = make(map[string]int)
|
|
return nil
|
|
}
|
|
func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsapi.QueryBulkStateContentRequest, res *rsapi.QueryBulkStateContentResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *rsapi.QueryMembershipForUserRequest, res *rsapi.QueryMembershipForUserResponse) error {
|
|
res.IsRoomForgotten = false
|
|
res.RoomExists = true
|
|
return nil
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, req *rsapi.QueryMembershipAtEventRequest, res *rsapi.QueryMembershipAtEventResponse) error {
|
|
return nil
|
|
}
|
|
|
|
type syncUserAPI struct {
|
|
userapi.SyncUserAPI
|
|
accounts []userapi.Device
|
|
}
|
|
|
|
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
|
|
for _, acc := range s.accounts {
|
|
if acc.AccessToken == req.AccessToken {
|
|
res.Device = &acc
|
|
return nil
|
|
}
|
|
}
|
|
res.Err = "unknown user"
|
|
return nil
|
|
}
|
|
|
|
func (s *syncUserAPI) QueryKeyChanges(ctx context.Context, req *userapi.QueryKeyChangesRequest, res *userapi.QueryKeyChangesResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *syncUserAPI) QueryOneTimeKeys(ctx context.Context, req *userapi.QueryOneTimeKeysRequest, res *userapi.QueryOneTimeKeysResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func TestSyncAPIAccessTokens(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAccessTokens(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
room := test.NewRoom(t, user)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
msgs := toNATSMsgs(t, base, room.Events()...)
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}})
|
|
testrig.MustPublishMsgs(t, jsctx, msgs...)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
req *http.Request
|
|
wantCode int
|
|
wantJoinedRooms []string
|
|
}{
|
|
{
|
|
name: "missing access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 401,
|
|
},
|
|
{
|
|
name: "unknown access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": "foo",
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 401,
|
|
},
|
|
{
|
|
name: "valid access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 200,
|
|
wantJoinedRooms: []string{room.ID},
|
|
},
|
|
}
|
|
|
|
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
for _, tc := range testCases {
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
|
|
if w.Code != tc.wantCode {
|
|
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
|
|
}
|
|
if tc.wantJoinedRooms != nil {
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
|
|
}
|
|
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
|
|
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
|
|
}
|
|
t.Logf("res: %+v", res.Rooms.Join[room.ID])
|
|
|
|
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[i] = ev.EventID
|
|
}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tests what happens when we create a room and then /sync before all events from /createRoom have
|
|
// been sent to the syncapi
|
|
func TestSyncAPICreateRoomSyncEarly(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAPICreateRoomSyncEarly(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
|
|
t.Skip("Skipped, possibly fixed")
|
|
user := test.NewUser(t)
|
|
room := test.NewRoom(t, user)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
// order is:
|
|
// m.room.create
|
|
// m.room.member
|
|
// m.room.power_levels
|
|
// m.room.join_rules
|
|
// m.room.history_visibility
|
|
msgs := toNATSMsgs(t, base, room.Events()...)
|
|
sinceTokens := make([]string, len(msgs))
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}})
|
|
for i, msg := range msgs {
|
|
testrig.MustPublishMsgs(t, jsctx, msg)
|
|
time.Sleep(100 * time.Millisecond)
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Errorf("got HTTP %d want 200", w.Code)
|
|
continue
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
sinceTokens[i] = res.NextBatch.String()
|
|
if i == 0 { // create event does not produce a room section
|
|
if len(res.Rooms.Join) != 0 {
|
|
t.Fatalf("i=%v got %d joined rooms, want 0", i, len(res.Rooms.Join))
|
|
}
|
|
} else { // we should have that room somewhere
|
|
if len(res.Rooms.Join) != 1 {
|
|
t.Fatalf("i=%v got %d joined rooms, want 1", i, len(res.Rooms.Join))
|
|
}
|
|
}
|
|
}
|
|
|
|
// sync with no token "" and with the penultimate token and this should neatly return room events in the timeline block
|
|
sinceTokens = append([]string{""}, sinceTokens[:len(sinceTokens)-1]...)
|
|
|
|
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
|
|
for i, since := range sinceTokens {
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"since": since,
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Errorf("since=%s got HTTP %d want 200", since, w.Code)
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
if len(res.Rooms.Join) != 1 {
|
|
t.Fatalf("since=%s got %d joined rooms, want 1", since, len(res.Rooms.Join))
|
|
}
|
|
t.Logf("since=%s res state:%+v res timeline:%+v", since, res.Rooms.Join[room.ID].State.Events, res.Rooms.Join[room.ID].Timeline.Events)
|
|
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for j, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[j] = ev.EventID
|
|
}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, room.Events()[i:])
|
|
}
|
|
}
|
|
|
|
// Test that if we hit /sync we get back presence: online, regardless of whether messages get delivered
|
|
// via NATS. Regression test for a flakey test "User sees their own presence in a sync"
|
|
func TestSyncAPIUpdatePresenceImmediately(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAPIUpdatePresenceImmediately(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
base.Cfg.Global.Presence.EnableOutbound = true
|
|
base.Cfg.Global.Presence.EnableInbound = true
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{})
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"set_presence": "online",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
if len(res.Presence.Events) != 1 {
|
|
t.Fatalf("expected 1 presence events, got: %+v", res.Presence.Events)
|
|
}
|
|
if res.Presence.Events[0].Sender != alice.UserID {
|
|
t.Errorf("sender: got %v want %v", res.Presence.Events[0].Sender, alice.UserID)
|
|
}
|
|
if res.Presence.Events[0].Type != "m.presence" {
|
|
t.Errorf("type: got %v want %v", res.Presence.Events[0].Type, "m.presence")
|
|
}
|
|
if gjson.ParseBytes(res.Presence.Events[0].Content).Get("presence").Str != "online" {
|
|
t.Errorf("content: not online, got %v", res.Presence.Events[0].Content)
|
|
}
|
|
|
|
}
|
|
|
|
// This is mainly what Sytest is doing in "test_history_visibility"
|
|
func TestMessageHistoryVisibility(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testHistoryVisibility(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testHistoryVisibility(t *testing.T, dbType test.DBType) {
|
|
type result struct {
|
|
seeWithoutJoin bool
|
|
seeBeforeJoin bool
|
|
seeAfterInvite bool
|
|
}
|
|
|
|
// create the users
|
|
alice := test.NewUser(t)
|
|
aliceDev := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: alice.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "ALICE",
|
|
}
|
|
|
|
bob := test.NewUser(t)
|
|
|
|
bobDev := userapi.Device{
|
|
ID: "BOBID",
|
|
UserID: bob.ID,
|
|
AccessToken: "BOD_BEARER_TOKEN",
|
|
DisplayName: "BOB",
|
|
}
|
|
|
|
ctx := context.Background()
|
|
// check guest and normal user accounts
|
|
for _, accType := range []userapi.AccountType{userapi.AccountTypeGuest, userapi.AccountTypeUser} {
|
|
testCases := []struct {
|
|
historyVisibility gomatrixserverlib.HistoryVisibility
|
|
wantResult result
|
|
}{
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityWorldReadable,
|
|
wantResult: result{
|
|
seeWithoutJoin: true,
|
|
seeBeforeJoin: true,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityShared,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: true,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityInvited,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: false,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityJoined,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: false,
|
|
seeAfterInvite: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
bobDev.AccountType = accType
|
|
userType := "guest"
|
|
if accType == userapi.AccountTypeUser {
|
|
userType = "real user"
|
|
}
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
|
|
// Use the actual internal roomserver API
|
|
rsAPI := roomserver.NewInternalAPI(base)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI)
|
|
|
|
for _, tc := range testCases {
|
|
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
|
|
t.Run(testname, func(t *testing.T) {
|
|
// create a room with the given visibility
|
|
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
|
|
|
|
// send the events/messages to NATS to create the rooms
|
|
beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
|
|
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
|
|
eventsToSend := append(room.Events(), beforeJoinEv)
|
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
syncUntil(t, base, aliceDev.AccessToken, false,
|
|
func(syncBody string) bool {
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
|
|
return gjson.Get(syncBody, path).Exists()
|
|
},
|
|
)
|
|
|
|
// There is only one event, we expect only to be able to see this, if the room is world_readable
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
"dir": "b",
|
|
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
// We only care about the returned events at this point
|
|
var res struct {
|
|
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
|
|
}
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
|
|
verifyEventVisible(t, tc.wantResult.seeWithoutJoin, beforeJoinEv, res.Chunk)
|
|
|
|
// Create invite, a message, join the room and create another message.
|
|
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
|
|
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
|
|
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
|
|
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
|
|
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
|
|
|
|
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
|
|
|
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
syncUntil(t, base, aliceDev.AccessToken, false,
|
|
func(syncBody string) bool {
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
|
|
return gjson.Get(syncBody, path).Exists()
|
|
},
|
|
)
|
|
|
|
// Verify the messages after/before invite are visible or not
|
|
w = httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
"dir": "b",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
// verify results
|
|
verifyEventVisible(t, tc.wantResult.seeBeforeJoin, beforeJoinEv, res.Chunk)
|
|
verifyEventVisible(t, tc.wantResult.seeAfterInvite, afterInviteEv, res.Chunk)
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []gomatrixserverlib.ClientEvent) {
|
|
t.Helper()
|
|
if wantVisible {
|
|
for _, ev := range chunk {
|
|
if ev.EventID == wantVisibleEvent.EventID() {
|
|
return
|
|
}
|
|
}
|
|
t.Fatalf("expected to see event %s but didn't: %+v", wantVisibleEvent.EventID(), chunk)
|
|
} else {
|
|
for _, ev := range chunk {
|
|
if ev.EventID == wantVisibleEvent.EventID() {
|
|
t.Fatalf("expected not to see event %s: %+v", wantVisibleEvent.EventID(), string(ev.Content))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestGetMembership(t *testing.T) {
|
|
alice := test.NewUser(t)
|
|
|
|
aliceDev := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: alice.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
bob := test.NewUser(t)
|
|
bobDev := userapi.Device{
|
|
ID: "BOBID",
|
|
UserID: bob.ID,
|
|
AccessToken: "notjoinedtoanyrooms",
|
|
}
|
|
|
|
testCases := []struct {
|
|
name string
|
|
roomID string
|
|
additionalEvents func(t *testing.T, room *test.Room)
|
|
request func(t *testing.T, room *test.Room) *http.Request
|
|
wantOK bool
|
|
wantMemberCount int
|
|
useSleep bool // :/
|
|
}{
|
|
{
|
|
name: "/members - Alice joined",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "/members - Bob never joined",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: false,
|
|
},
|
|
{
|
|
name: "/joined_members - Bob never joined",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/joined_members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: false,
|
|
},
|
|
{
|
|
name: "/joined_members - Alice joined",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/joined_members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: true,
|
|
},
|
|
{
|
|
name: "Alice leaves before Bob joins, should not be able to see Bob",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "leave",
|
|
}, test.WithStateKey(alice.ID))
|
|
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
},
|
|
useSleep: true,
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "Alice leaves after Bob joins, should be able to see Bob",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "leave",
|
|
}, test.WithStateKey(alice.ID))
|
|
},
|
|
useSleep: true,
|
|
wantOK: true,
|
|
wantMemberCount: 2,
|
|
},
|
|
{
|
|
name: "/joined_members - Alice leaves, shouldn't be able to see members ",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/joined_members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "leave",
|
|
}, test.WithStateKey(alice.ID))
|
|
},
|
|
useSleep: true,
|
|
wantOK: false,
|
|
},
|
|
{
|
|
name: "'at' specified, returns memberships before Bob joins",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"at": "t2_5",
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
},
|
|
useSleep: true,
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "'membership=leave' specified, returns no memberships",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"membership": "leave",
|
|
}))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 0,
|
|
},
|
|
{
|
|
name: "'not_membership=join' specified, returns no memberships",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"not_membership": "join",
|
|
}))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 0,
|
|
},
|
|
{
|
|
name: "'not_membership=leave' & 'membership=join' specified, returns correct memberships",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"not_membership": "leave",
|
|
"membership": "join",
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
|
"membership": "leave",
|
|
}, test.WithStateKey(bob.ID))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "non-existent room ID",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", "!notavalidroom:test"), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: false,
|
|
},
|
|
}
|
|
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
|
|
base, close := testrig.CreateBaseDendrite(t, dbType)
|
|
defer close()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
|
|
// Use an actual roomserver for this
|
|
rsAPI := roomserver.NewInternalAPI(base)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI)
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
room := test.NewRoom(t, alice)
|
|
t.Cleanup(func() {
|
|
t.Logf("running cleanup for %s", tc.name)
|
|
})
|
|
// inject additional events
|
|
if tc.additionalEvents != nil {
|
|
tc.additionalEvents(t, room)
|
|
}
|
|
if err := api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
|
|
// wait for the events to come down sync
|
|
if tc.useSleep {
|
|
time.Sleep(time.Millisecond * 100)
|
|
} else {
|
|
syncUntil(t, base, aliceDev.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, tc.request(t, room))
|
|
if w.Code != 200 && tc.wantOK {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
t.Logf("[%s] Resp: %s", tc.name, w.Body.String())
|
|
|
|
// check we got the expected events
|
|
if tc.wantOK {
|
|
memberCount := len(gjson.GetBytes(w.Body.Bytes(), "chunk").Array())
|
|
if memberCount != tc.wantMemberCount {
|
|
t.Fatalf("expected %d members, got %d", tc.wantMemberCount, memberCount)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestSendToDevice(t *testing.T) {
|
|
test.WithAllDatabases(t, testSendToDevice)
|
|
}
|
|
|
|
func testSendToDevice(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
|
|
defer baseClose()
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{})
|
|
|
|
producer := producers.SyncAPIProducer{
|
|
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
|
JetStream: jsctx,
|
|
}
|
|
|
|
msgCounter := 0
|
|
|
|
testCases := []struct {
|
|
name string
|
|
since string
|
|
want []string
|
|
sendMessagesCount int
|
|
}{
|
|
{
|
|
name: "initial sync, no messages",
|
|
want: []string{},
|
|
},
|
|
{
|
|
name: "initial sync, one new message",
|
|
sendMessagesCount: 1,
|
|
want: []string{
|
|
"message 1",
|
|
},
|
|
},
|
|
{
|
|
name: "initial sync, two new messages", // we didn't advance the since token, so we'll receive two messages
|
|
sendMessagesCount: 1,
|
|
want: []string{
|
|
"message 1",
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, one message", // this deletes message 1, as we advanced the since token
|
|
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
|
|
want: []string{
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "failed incremental sync, one message", // didn't advance since, so still the same message
|
|
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
|
|
want: []string{
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, no message", // this should delete message 2
|
|
since: types.StreamingToken{SendToDevicePosition: 2}.String(), // next_batch from previous sync
|
|
want: []string{},
|
|
},
|
|
{
|
|
name: "incremental sync, three new messages",
|
|
since: types.StreamingToken{SendToDevicePosition: 2}.String(),
|
|
sendMessagesCount: 3,
|
|
want: []string{
|
|
"message 3", // message 2 was deleted in the previous test
|
|
"message 4",
|
|
"message 5",
|
|
},
|
|
},
|
|
{
|
|
name: "initial sync, three messages", // we expect three messages, as we didn't go beyond "2"
|
|
want: []string{
|
|
"message 3",
|
|
"message 4",
|
|
"message 5",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, no messages", // advance the sync token, no new messages
|
|
since: types.StreamingToken{SendToDevicePosition: 5}.String(),
|
|
want: []string{},
|
|
},
|
|
}
|
|
|
|
ctx := context.Background()
|
|
for _, tc := range testCases {
|
|
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
|
for i := 0; i < tc.sendMessagesCount; i++ {
|
|
msgCounter++
|
|
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
|
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
|
t.Fatalf("unable to send to device message: %v", err)
|
|
}
|
|
}
|
|
|
|
syncUntil(t, base, alice.AccessToken,
|
|
len(tc.want) == 0,
|
|
func(body string) bool {
|
|
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
|
|
},
|
|
)
|
|
|
|
// Execute a /sync request, recording the response
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"since": tc.since,
|
|
})))
|
|
|
|
// Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
|
|
events := gjson.Get(w.Body.String(), "to_device.events.#.content.dummy").Array()
|
|
got := make([]string, len(events))
|
|
for i := range events {
|
|
got[i] = events[i].String()
|
|
}
|
|
|
|
// Ensure the messages we received are as we expect them to be
|
|
if !reflect.DeepEqual(got, tc.want) {
|
|
t.Logf("[%s|since=%s]: Sync: %s", tc.name, tc.since, w.Body.String())
|
|
t.Fatalf("[%s|since=%s]: got: %+v, want: %+v", tc.name, tc.since, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestContext(t *testing.T) {
|
|
test.WithAllDatabases(t, testContext)
|
|
}
|
|
|
|
func testContext(t *testing.T, dbType test.DBType) {
|
|
|
|
tests := []struct {
|
|
name string
|
|
roomID string
|
|
eventID string
|
|
params map[string]string
|
|
wantError bool
|
|
wantStateLength int
|
|
wantBeforeLength int
|
|
wantAfterLength int
|
|
}{
|
|
{
|
|
name: "invalid filter",
|
|
params: map[string]string{
|
|
"filter": "{",
|
|
},
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "invalid limit",
|
|
params: map[string]string{
|
|
"limit": "abc",
|
|
},
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "high limit",
|
|
params: map[string]string{
|
|
"limit": "100000",
|
|
},
|
|
},
|
|
{
|
|
name: "fine limit",
|
|
params: map[string]string{
|
|
"limit": "10",
|
|
},
|
|
},
|
|
{
|
|
name: "last event without lazy loading",
|
|
wantStateLength: 5,
|
|
},
|
|
{
|
|
name: "last event with lazy loading",
|
|
params: map[string]string{
|
|
"filter": `{"lazy_load_members":true}`,
|
|
},
|
|
wantStateLength: 1,
|
|
},
|
|
{
|
|
name: "invalid room",
|
|
roomID: "!doesnotexist",
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "invalid eventID",
|
|
eventID: "$doesnotexist",
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "state is limited",
|
|
params: map[string]string{
|
|
"limit": "1",
|
|
},
|
|
wantStateLength: 1,
|
|
},
|
|
{
|
|
name: "events are not limited",
|
|
wantBeforeLength: 7,
|
|
},
|
|
{
|
|
name: "all events are limited",
|
|
params: map[string]string{
|
|
"limit": "1",
|
|
},
|
|
wantStateLength: 1,
|
|
wantBeforeLength: 1,
|
|
wantAfterLength: 1,
|
|
},
|
|
}
|
|
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
|
|
defer baseClose()
|
|
|
|
// Use an actual roomserver for this
|
|
rsAPI := roomserver.NewInternalAPI(base)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
|
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI)
|
|
|
|
room := test.NewRoom(t, user)
|
|
|
|
room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world 1!"})
|
|
room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world 2!"})
|
|
thirdMsg := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world3!"})
|
|
room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world4!"})
|
|
|
|
if err := api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
|
|
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
|
|
|
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
params := map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
}
|
|
w := httptest.NewRecorder()
|
|
// test overrides
|
|
roomID := room.ID
|
|
if tc.roomID != "" {
|
|
roomID = tc.roomID
|
|
}
|
|
eventID := thirdMsg.EventID()
|
|
if tc.eventID != "" {
|
|
eventID = tc.eventID
|
|
}
|
|
requestPath := fmt.Sprintf("/_matrix/client/v3/rooms/%s/context/%s", roomID, eventID)
|
|
if tc.params != nil {
|
|
for k, v := range tc.params {
|
|
params[k] = v
|
|
}
|
|
}
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
|
|
|
|
if tc.wantError && w.Code == 200 {
|
|
t.Fatalf("Expected an error, but got none")
|
|
}
|
|
t.Log(w.Body.String())
|
|
resp := routing.ContextRespsonse{}
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if tc.wantStateLength > 0 && tc.wantStateLength != len(resp.State) {
|
|
t.Fatalf("expected %d state events, got %d", tc.wantStateLength, len(resp.State))
|
|
}
|
|
if tc.wantBeforeLength > 0 && tc.wantBeforeLength != len(resp.EventsBefore) {
|
|
t.Fatalf("expected %d before events, got %d", tc.wantBeforeLength, len(resp.EventsBefore))
|
|
}
|
|
if tc.wantAfterLength > 0 && tc.wantAfterLength != len(resp.EventsAfter) {
|
|
t.Fatalf("expected %d after events, got %d", tc.wantAfterLength, len(resp.EventsAfter))
|
|
}
|
|
|
|
if !tc.wantError && resp.Event.EventID != eventID {
|
|
t.Fatalf("unexpected eventID %s, expected %s", resp.Event.EventID, eventID)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpdateRelations(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
eventContent map[string]interface{}
|
|
eventType string
|
|
}{
|
|
{
|
|
name: "empty event content should not error",
|
|
},
|
|
{
|
|
name: "unable to unmarshal event should not error",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": map[string]interface{}{}, // this should be a string and not struct
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "empty event ID is ignored",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "empty rel_type is ignored",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "$randomEventID",
|
|
"rel_type": "",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "redactions are ignored",
|
|
eventType: gomatrixserverlib.MRoomRedaction,
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "$randomEventID",
|
|
"rel_type": "m.replace",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "valid event is correctly written",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "$randomEventID",
|
|
"rel_type": "m.replace",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
alice := test.NewUser(t)
|
|
room := test.NewRoom(t, alice)
|
|
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
base, shutdownBase := testrig.CreateBaseDendrite(t, dbType)
|
|
t.Cleanup(shutdownBase)
|
|
db, err := storage.NewSyncServerDatasource(base, &base.Cfg.SyncAPI.Database)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
evType := "m.room.message"
|
|
if tc.eventType != "" {
|
|
evType = tc.eventType
|
|
}
|
|
ev := room.CreateEvent(t, alice, evType, tc.eventContent)
|
|
err = db.UpdateRelations(ctx, ev)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
func syncUntil(t *testing.T,
|
|
base *base.BaseDendrite, accessToken string,
|
|
skip bool,
|
|
checkFunc func(syncBody string) bool,
|
|
) {
|
|
if checkFunc == nil {
|
|
t.Fatalf("No checkFunc defined")
|
|
}
|
|
if skip {
|
|
return
|
|
}
|
|
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
|
|
// to the syncAPI when hitting /sync
|
|
done := make(chan bool)
|
|
defer close(done)
|
|
go func() {
|
|
for {
|
|
w := httptest.NewRecorder()
|
|
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": accessToken,
|
|
"timeout": "1000",
|
|
})))
|
|
if checkFunc(w.Body.String()) {
|
|
done <- true
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
case <-time.After(time.Second * 5):
|
|
t.Fatalf("Timed out waiting for messages")
|
|
}
|
|
}
|
|
|
|
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
|
|
result := make([]*nats.Msg, len(input))
|
|
for i, ev := range input {
|
|
var addsStateIDs []string
|
|
if ev.StateKey() != nil {
|
|
addsStateIDs = append(addsStateIDs, ev.EventID())
|
|
}
|
|
result[i] = testrig.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{
|
|
Type: rsapi.OutputTypeNewRoomEvent,
|
|
NewRoomEvent: &rsapi.OutputNewRoomEvent{
|
|
Event: ev,
|
|
AddsStateEventIDs: addsStateIDs,
|
|
HistoryVisibility: ev.Visibility,
|
|
},
|
|
})
|
|
}
|
|
return result
|
|
}
|