mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-03 12:13:09 -06:00
Add a way to inject jetstream messages
This commit is contained in:
parent
850bfb24ee
commit
6fdc3b0e4c
|
|
@ -29,6 +29,13 @@ func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Co
|
||||||
return pc, js, jc
|
return pc, js, jc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
|
||||||
|
for _, stream := range streams { // streams are defined in streams.go
|
||||||
|
name := cfg.Prefixed(stream.Name)
|
||||||
|
js.DeleteStream(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||||
// check if we need an in-process NATS Server
|
// check if we need an in-process NATS Server
|
||||||
if len(cfg.Addresses) != 0 {
|
if len(cfg.Addresses) != 0 {
|
||||||
|
|
|
||||||
|
|
@ -5,16 +5,15 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/test"
|
"github.com/matrix-org/dendrite/test"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
"github.com/nats-io/nats.go"
|
||||||
|
|
||||||
var (
|
|
||||||
alice = "@alice:localhost"
|
|
||||||
aliceAccessToken = "ALICE_BEARER_TOKEN"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type syncRoomserverAPI struct {
|
type syncRoomserverAPI struct {
|
||||||
|
|
@ -22,19 +21,16 @@ type syncRoomserverAPI struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type syncUserAPI struct {
|
type syncUserAPI struct {
|
||||||
userapi.UserInternalAPI
|
userapi.SyncUserAPI
|
||||||
|
accounts []userapi.Device
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
|
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
|
||||||
if req.AccessToken == aliceAccessToken {
|
for _, acc := range s.accounts {
|
||||||
res.Device = &userapi.Device{
|
if acc.AccessToken == req.AccessToken {
|
||||||
ID: "ID",
|
res.Device = &acc
|
||||||
UserID: alice,
|
return nil
|
||||||
AccessToken: aliceAccessToken,
|
|
||||||
AccountType: userapi.AccountTypeUser,
|
|
||||||
DisplayName: "Alice",
|
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
res.Err = "unknown user"
|
res.Err = "unknown user"
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -50,46 +46,88 @@ type syncKeyAPI struct {
|
||||||
|
|
||||||
func TestSyncAPI(t *testing.T) {
|
func TestSyncAPI(t *testing.T) {
|
||||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
base, close := test.CreateBaseDendrite(t, dbType)
|
testSync(t, dbType)
|
||||||
defer close()
|
|
||||||
AddPublicRoutes(base, &syncUserAPI{}, &syncRoomserverAPI{}, &syncKeyAPI{})
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
req *http.Request
|
|
||||||
wantCode int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "missing access token",
|
|
||||||
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
||||||
"timeout": "0",
|
|
||||||
})),
|
|
||||||
wantCode: 401,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "unknown access token",
|
|
||||||
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
||||||
"access_token": "foo",
|
|
||||||
"timeout": "0",
|
|
||||||
})),
|
|
||||||
wantCode: 401,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "valid access token",
|
|
||||||
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
||||||
"access_token": aliceAccessToken,
|
|
||||||
"timeout": "0",
|
|
||||||
})),
|
|
||||||
wantCode: 200,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
w := httptest.NewRecorder()
|
|
||||||
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
|
|
||||||
if w.Code != tc.wantCode {
|
|
||||||
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testSync(t *testing.T, dbType test.DBType) {
|
||||||
|
user := test.NewUser()
|
||||||
|
room := test.NewRoom(t, user)
|
||||||
|
alice := userapi.Device{
|
||||||
|
ID: "ALICEID",
|
||||||
|
UserID: user.ID,
|
||||||
|
AccessToken: "ALICE_BEARER_TOKEN",
|
||||||
|
DisplayName: "Alice",
|
||||||
|
AccountType: userapi.AccountTypeUser,
|
||||||
|
}
|
||||||
|
|
||||||
|
base, close := test.CreateBaseDendrite(t, dbType)
|
||||||
|
defer close()
|
||||||
|
|
||||||
|
jsctx, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
||||||
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
||||||
|
var msgs []*nats.Msg
|
||||||
|
for _, ev := range room.Events() {
|
||||||
|
msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{
|
||||||
|
Type: rsapi.OutputTypeNewRoomEvent,
|
||||||
|
NewRoomEvent: &rsapi.OutputNewRoomEvent{
|
||||||
|
Event: ev,
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
test.MustPublishMsgs(t, jsctx, msgs...)
|
||||||
|
|
||||||
|
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, &syncKeyAPI{})
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
req *http.Request
|
||||||
|
wantCode int
|
||||||
|
wantJoinedRooms []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "missing access token",
|
||||||
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
||||||
|
"timeout": "0",
|
||||||
|
})),
|
||||||
|
wantCode: 401,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unknown access token",
|
||||||
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
||||||
|
"access_token": "foo",
|
||||||
|
"timeout": "0",
|
||||||
|
})),
|
||||||
|
wantCode: 401,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid access token",
|
||||||
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
||||||
|
"access_token": alice.AccessToken,
|
||||||
|
"timeout": "0",
|
||||||
|
})),
|
||||||
|
wantCode: 200,
|
||||||
|
wantJoinedRooms: []string{room.ID},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// TODO: find a better way
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
|
||||||
|
if w.Code != tc.wantCode {
|
||||||
|
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
if tc.wantJoinedRooms != nil {
|
||||||
|
var res types.Response
|
||||||
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
||||||
|
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
|
||||||
|
}
|
||||||
|
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
|
||||||
|
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
|
||||||
|
}
|
||||||
|
} */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -28,6 +29,10 @@ import (
|
||||||
func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) {
|
func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) {
|
||||||
var cfg config.Dendrite
|
var cfg config.Dendrite
|
||||||
cfg.Defaults(false)
|
cfg.Defaults(false)
|
||||||
|
cfg.Global.JetStream.InMemory = true
|
||||||
|
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
|
||||||
|
// the file system event with InMemory=true :(
|
||||||
|
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
|
||||||
|
|
||||||
switch dbType {
|
switch dbType {
|
||||||
case DBTypePostgres:
|
case DBTypePostgres:
|
||||||
|
|
|
||||||
35
test/jetstream.go
Normal file
35
test/jetstream.go
Normal file
|
|
@ -0,0 +1,35 @@
|
||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/base"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
|
||||||
|
t.Helper()
|
||||||
|
for _, msg := range msgs {
|
||||||
|
if _, err := jsctx.PublishMsg(msg); err != nil {
|
||||||
|
t.Fatalf("MustPublishMsgs: failed to publish message: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
|
||||||
|
t.Helper()
|
||||||
|
msg := &nats.Msg{
|
||||||
|
Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
||||||
|
Header: nats.Header{},
|
||||||
|
}
|
||||||
|
msg.Header.Set(jetstream.RoomID, roomID)
|
||||||
|
var err error
|
||||||
|
msg.Data, err = json.Marshal(update)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to marshal update: %s", err)
|
||||||
|
}
|
||||||
|
return msg
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue