mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 11:13:12 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/roomservertests2
This commit is contained in:
commit
62cf38441e
36
CHANGES.md
36
CHANGES.md
|
|
@ -1,5 +1,41 @@
|
|||
# Changelog
|
||||
|
||||
## Dendrite 0.8.3 (2022-05-09)
|
||||
|
||||
### Features
|
||||
|
||||
* Open registration is now harder to enable, which should reduce the chance that Dendrite servers will be used to conduct spam or abuse attacks
|
||||
* Dendrite will only enable open registration if you pass the `--really-enable-open-registration` command line flag at startup
|
||||
* If open registration is enabled but this command line flag is not passed, Dendrite will fail to start up
|
||||
* Dendrite now supports phone-home statistic reporting
|
||||
* These statistics include things like the number of registered and active users, some configuration options and platform/environment details, to help us to understand how Dendrite is used
|
||||
* This is not enabled by default — it must be enabled in the `global.report_stats` section of the config file
|
||||
* Monolith installations can now be configured with a single global database connection pool (in `global.database` in the config) rather than having to configure each component separately
|
||||
* This also means that you no longer need to balance connection counts between different components, as they will share the same larger pool
|
||||
* Specific components can override the global database settings by specifying their own `database` block
|
||||
* To use only the global pool, you must configure `global.database` and then remove the `database` block from all of the component sections of the config file
|
||||
* A new admin API endpoint `/_dendrite/admin/evacuateRoom/{roomID}` has been added, allowing server admins to forcefully part all local users from a given room
|
||||
* The sync notifier now only loads members for the relevant rooms, which should reduce CPU usage and load on the database
|
||||
* A number of component interfaces have been refactored for cleanliness and developer ease
|
||||
* Event auth errors in the log should now be much more useful, including the reason for the event failures
|
||||
* The forward extremity calculation in the roomserver has been simplified
|
||||
* A new index has been added to the one-time keys table in the keyserver which should speed up key count lookups
|
||||
|
||||
### Fixes
|
||||
|
||||
* Dendrite will no longer process events for rooms where there are no local users joined, which should help to reduce CPU and RAM usage
|
||||
* A bug has been fixed in event auth when changing the user levels in `m.room.power_levels` events
|
||||
* Usernames should no longer be duplicated when no room name is set
|
||||
* Device display names should now be correctly propagated over federation
|
||||
* A panic when uploading cross-signing signatures has been fixed
|
||||
* Presence is now correctly limited in `/sync` based on the filters
|
||||
* The presence stream position returned by `/sync` will now be correct if no presence events were returned
|
||||
* The media `/config` endpoint will no longer return a maximum upload size field if it is configured to be unlimited in the Dendrite config
|
||||
* The server notices room will no longer produce "User is already joined to the room" errors
|
||||
* Consumer errors will no longer flood the logs during a graceful shutdown
|
||||
* Sync API and federation API consumers will no longer unnecessarily query added state events matching the one in the output event
|
||||
* The Sync API will no longer unnecessarily track invites for remote users
|
||||
|
||||
## Dendrite 0.8.2 (2022-04-27)
|
||||
|
||||
### Features
|
||||
|
|
|
|||
|
|
@ -146,28 +146,25 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
|
|||
// processMessage updates the list of currently joined hosts in the room
|
||||
// and then sends the event to the hosts that were joined before the event.
|
||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||
eventsRes := &api.QueryEventsByIDResponse{}
|
||||
if len(ore.AddsStateEventIDs) > 0 {
|
||||
addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
|
||||
|
||||
// Ask the roomserver and add in the rest of the results into the set.
|
||||
// Finally, work out if there are any more events missing.
|
||||
if len(missingEventIDs) > 0 {
|
||||
eventsReq := &api.QueryEventsByIDRequest{
|
||||
EventIDs: ore.AddsStateEventIDs,
|
||||
EventIDs: missingEventIDs,
|
||||
}
|
||||
eventsRes := &api.QueryEventsByIDResponse{}
|
||||
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, event := range eventsRes.Events {
|
||||
if event.EventID() == ore.Event.EventID() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
eventsRes.Events = append(eventsRes.Events, ore.Event)
|
||||
if len(eventsRes.Events) != len(missingEventIDs) {
|
||||
return fmt.Errorf("missing state events")
|
||||
}
|
||||
addsStateEvents = append(addsStateEvents, eventsRes.Events...)
|
||||
}
|
||||
|
||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events))
|
||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ func TestMain(m *testing.M) {
|
|||
)
|
||||
|
||||
// Finally, build the server key APIs.
|
||||
sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics)
|
||||
sbase := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics)
|
||||
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ const (
|
|||
VersionMajor = 0
|
||||
VersionMinor = 8
|
||||
VersionPatch = 3
|
||||
VersionTag = "rc1" // example: "rc1"
|
||||
VersionTag = "" // example: "rc1"
|
||||
)
|
||||
|
||||
func VersionString() string {
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ func GenerateThumbnails(
|
|||
mediaMetadata *types.MediaMetadata,
|
||||
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
|
||||
maxThumbnailGenerators int,
|
||||
db *storage.Database,
|
||||
db storage.Database,
|
||||
logger *log.Entry,
|
||||
) (busy bool, errorReturn error) {
|
||||
buffer, err := bimg.Read(string(src))
|
||||
|
|
@ -49,7 +49,7 @@ func GenerateThumbnails(
|
|||
for _, config := range configs {
|
||||
// Note: createThumbnail does locking based on activeThumbnailGeneration
|
||||
busy, err = createThumbnail(
|
||||
ctx, src, img, config, mediaMetadata, activeThumbnailGeneration,
|
||||
ctx, src, img, types.ThumbnailSize(config), mediaMetadata, activeThumbnailGeneration,
|
||||
maxThumbnailGenerators, db, logger,
|
||||
)
|
||||
if err != nil {
|
||||
|
|
@ -71,7 +71,7 @@ func GenerateThumbnail(
|
|||
mediaMetadata *types.MediaMetadata,
|
||||
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
|
||||
maxThumbnailGenerators int,
|
||||
db *storage.Database,
|
||||
db storage.Database,
|
||||
logger *log.Entry,
|
||||
) (busy bool, errorReturn error) {
|
||||
buffer, err := bimg.Read(string(src))
|
||||
|
|
@ -109,7 +109,7 @@ func createThumbnail(
|
|||
mediaMetadata *types.MediaMetadata,
|
||||
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
|
||||
maxThumbnailGenerators int,
|
||||
db *storage.Database,
|
||||
db storage.Database,
|
||||
logger *log.Entry,
|
||||
) (busy bool, errorReturn error) {
|
||||
logger = logger.WithFields(log.Fields{
|
||||
|
|
|
|||
|
|
@ -163,6 +163,19 @@ type OutputNewRoomEvent struct {
|
|||
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
||||
}
|
||||
|
||||
func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
|
||||
addsStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, 1)
|
||||
missingEventIDs := make([]string, 0, len(o.AddsStateEventIDs))
|
||||
for _, eventID := range o.AddsStateEventIDs {
|
||||
if eventID != o.Event.EventID() {
|
||||
missingEventIDs = append(missingEventIDs, eventID)
|
||||
} else {
|
||||
addsStateEvents = append(addsStateEvents, o.Event)
|
||||
}
|
||||
}
|
||||
return addsStateEvents, missingEventIDs
|
||||
}
|
||||
|
||||
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
||||
// This will typically happen as a result of getting either missing events
|
||||
// or backfilling. Downstream components may wish to send these events to
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ type BaseDendrite struct {
|
|||
DNSCache *gomatrixserverlib.DNSCache
|
||||
Database *sql.DB
|
||||
DatabaseWriter sqlutil.Writer
|
||||
EnableMetrics bool
|
||||
}
|
||||
|
||||
const NoListener = ""
|
||||
|
|
@ -96,7 +97,7 @@ const HTTPClientTimeout = time.Second * 30
|
|||
type BaseDendriteOptions int
|
||||
|
||||
const (
|
||||
NoCacheMetrics BaseDendriteOptions = iota
|
||||
DisableMetrics BaseDendriteOptions = iota
|
||||
UseHTTPAPIs
|
||||
PolylithMode
|
||||
)
|
||||
|
|
@ -107,12 +108,12 @@ const (
|
|||
func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...BaseDendriteOptions) *BaseDendrite {
|
||||
platformSanityChecks()
|
||||
useHTTPAPIs := false
|
||||
cacheMetrics := true
|
||||
enableMetrics := true
|
||||
isMonolith := true
|
||||
for _, opt := range options {
|
||||
switch opt {
|
||||
case NoCacheMetrics:
|
||||
cacheMetrics = false
|
||||
case DisableMetrics:
|
||||
enableMetrics = false
|
||||
case UseHTTPAPIs:
|
||||
useHTTPAPIs = true
|
||||
case PolylithMode:
|
||||
|
|
@ -160,7 +161,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
|
|||
}
|
||||
}
|
||||
|
||||
cache, err := caching.NewInMemoryLRUCache(cacheMetrics)
|
||||
cache, err := caching.NewInMemoryLRUCache(enableMetrics)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warnf("Failed to create cache")
|
||||
}
|
||||
|
|
@ -246,6 +247,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
|
|||
apiHttpClient: &apiClient,
|
||||
Database: db, // set if monolith with global connection pool only
|
||||
DatabaseWriter: writer, // set if monolith with global connection pool only
|
||||
EnableMetrics: enableMetrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
|
||||
natsserver "github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
natsclient "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
|
|
@ -21,6 +22,13 @@ type NATSInstance struct {
|
|||
sync.Mutex
|
||||
}
|
||||
|
||||
func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
|
||||
for _, stream := range streams { // streams are defined in streams.go
|
||||
name := cfg.Prefixed(stream.Name)
|
||||
_ = js.DeleteStream(name)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||
// check if we need an in-process NATS Server
|
||||
if len(cfg.Addresses) != 0 {
|
||||
|
|
|
|||
|
|
@ -154,41 +154,61 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
|||
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||
) error {
|
||||
ev := msg.Event
|
||||
addsStateEvents, missingEventIDs := msg.NeededStateEventIDs()
|
||||
|
||||
addsStateEvents := []*gomatrixserverlib.HeaderedEvent{}
|
||||
foundEventIDs := map[string]bool{}
|
||||
if len(msg.AddsStateEventIDs) > 0 {
|
||||
for _, eventID := range msg.AddsStateEventIDs {
|
||||
foundEventIDs[eventID] = false
|
||||
}
|
||||
foundEvents, err := s.db.Events(ctx, msg.AddsStateEventIDs)
|
||||
// Work out the list of events we need to find out about. Either
|
||||
// they will be the event supplied in the request, we will find it
|
||||
// in the sync API database or we'll need to ask the roomserver.
|
||||
knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs))
|
||||
for _, eventID := range missingEventIDs {
|
||||
knownEventIDs[eventID] = false
|
||||
}
|
||||
|
||||
// Look the events up in the database. If we know them, add them into
|
||||
// the set of adds state events.
|
||||
if len(missingEventIDs) > 0 {
|
||||
alreadyKnown, err := s.db.Events(ctx, missingEventIDs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s.db.Events: %w", err)
|
||||
}
|
||||
for _, event := range foundEvents {
|
||||
foundEventIDs[event.EventID()] = true
|
||||
for _, knownEvent := range alreadyKnown {
|
||||
knownEventIDs[knownEvent.EventID()] = true
|
||||
addsStateEvents = append(addsStateEvents, knownEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// Now work out if there are any remaining events we don't know. For
|
||||
// these we will need to ask the roomserver for help.
|
||||
missingEventIDs = missingEventIDs[:0]
|
||||
for eventID, known := range knownEventIDs {
|
||||
if !known {
|
||||
missingEventIDs = append(missingEventIDs, eventID)
|
||||
}
|
||||
}
|
||||
|
||||
// Ask the roomserver and add in the rest of the results into the set.
|
||||
// Finally, work out if there are any more events missing.
|
||||
if len(missingEventIDs) > 0 {
|
||||
eventsReq := &api.QueryEventsByIDRequest{
|
||||
EventIDs: missingEventIDs,
|
||||
}
|
||||
eventsReq := &api.QueryEventsByIDRequest{}
|
||||
eventsRes := &api.QueryEventsByIDResponse{}
|
||||
for eventID, found := range foundEventIDs {
|
||||
if !found {
|
||||
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
|
||||
}
|
||||
}
|
||||
if err = s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
|
||||
if err := s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
|
||||
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||
}
|
||||
for _, event := range eventsRes.Events {
|
||||
eventID := event.EventID()
|
||||
foundEvents = append(foundEvents, event)
|
||||
foundEventIDs[eventID] = true
|
||||
addsStateEvents = append(addsStateEvents, event)
|
||||
knownEventIDs[event.EventID()] = true
|
||||
}
|
||||
for eventID, found := range foundEventIDs {
|
||||
|
||||
// This should never happen because this would imply that the
|
||||
// roomserver has sent us adds_state_event_ids for events that it
|
||||
// also doesn't know about, but let's just be sure.
|
||||
for eventID, found := range knownEventIDs {
|
||||
if !found {
|
||||
return fmt.Errorf("event %s is missing", eventID)
|
||||
}
|
||||
}
|
||||
addsStateEvents = foundEvents
|
||||
}
|
||||
|
||||
ev, err := s.updateStateEvent(ev)
|
||||
|
|
@ -327,9 +347,11 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
|||
ctx context.Context, msg api.OutputNewInviteEvent,
|
||||
) {
|
||||
if msg.Event.StateKey() == nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(msg.Event.JSON()),
|
||||
}).Panicf("roomserver output log: invite has no state key")
|
||||
return
|
||||
}
|
||||
if _, serverName, err := gomatrixserverlib.SplitID('@', *msg.Event.StateKey()); err != nil {
|
||||
return
|
||||
} else if serverName != s.cfg.Matrix.ServerName {
|
||||
return
|
||||
}
|
||||
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
||||
|
|
|
|||
|
|
@ -65,11 +65,13 @@ func NewRequestPool(
|
|||
userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI,
|
||||
rsAPI roomserverAPI.SyncRoomserverAPI,
|
||||
streams *streams.Streams, notifier *notifier.Notifier,
|
||||
producer PresencePublisher,
|
||||
producer PresencePublisher, enableMetrics bool,
|
||||
) *RequestPool {
|
||||
prometheus.MustRegister(
|
||||
activeSyncRequests, waitingSyncRequests,
|
||||
)
|
||||
if enableMetrics {
|
||||
prometheus.MustRegister(
|
||||
activeSyncRequests, waitingSyncRequests,
|
||||
)
|
||||
}
|
||||
rp := &RequestPool{
|
||||
db: db,
|
||||
cfg: cfg,
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ func AddPublicRoutes(
|
|||
JetStream: js,
|
||||
}
|
||||
|
||||
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
|
||||
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer, base.EnableMetrics)
|
||||
|
||||
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
|
||||
JetStream: js,
|
||||
|
|
|
|||
162
syncapi/syncapi_test.go
Normal file
162
syncapi/syncapi_test.go
Normal file
|
|
@ -0,0 +1,162 @@
|
|||
package syncapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type syncRoomserverAPI struct {
|
||||
rsapi.SyncRoomserverAPI
|
||||
rooms []*test.Room
|
||||
}
|
||||
|
||||
func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
|
||||
var room *test.Room
|
||||
for _, r := range s.rooms {
|
||||
if r.ID == req.RoomID {
|
||||
room = r
|
||||
break
|
||||
}
|
||||
}
|
||||
if room == nil {
|
||||
res.RoomExists = false
|
||||
return nil
|
||||
}
|
||||
res.RoomVersion = room.Version
|
||||
return nil // TODO: return state
|
||||
}
|
||||
|
||||
type syncUserAPI struct {
|
||||
userapi.SyncUserAPI
|
||||
accounts []userapi.Device
|
||||
}
|
||||
|
||||
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
|
||||
for _, acc := range s.accounts {
|
||||
if acc.AccessToken == req.AccessToken {
|
||||
res.Device = &acc
|
||||
return nil
|
||||
}
|
||||
}
|
||||
res.Err = "unknown user"
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type syncKeyAPI struct {
|
||||
keyapi.KeyInternalAPI
|
||||
}
|
||||
|
||||
func TestSyncAPI(t *testing.T) {
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
testSync(t, dbType)
|
||||
})
|
||||
}
|
||||
|
||||
func testSync(t *testing.T, dbType test.DBType) {
|
||||
user := test.NewUser()
|
||||
room := test.NewRoom(t, user)
|
||||
alice := userapi.Device{
|
||||
ID: "ALICEID",
|
||||
UserID: user.ID,
|
||||
AccessToken: "ALICE_BEARER_TOKEN",
|
||||
DisplayName: "Alice",
|
||||
AccountType: userapi.AccountTypeUser,
|
||||
}
|
||||
|
||||
base, close := test.CreateBaseDendrite(t, dbType)
|
||||
defer close()
|
||||
|
||||
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
||||
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
||||
var msgs []*nats.Msg
|
||||
for _, ev := range room.Events() {
|
||||
var addsStateIDs []string
|
||||
if ev.StateKey() != nil {
|
||||
addsStateIDs = append(addsStateIDs, ev.EventID())
|
||||
}
|
||||
msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{
|
||||
Type: rsapi.OutputTypeNewRoomEvent,
|
||||
NewRoomEvent: &rsapi.OutputNewRoomEvent{
|
||||
Event: ev,
|
||||
AddsStateEventIDs: addsStateIDs,
|
||||
},
|
||||
}))
|
||||
}
|
||||
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{})
|
||||
test.MustPublishMsgs(t, jsctx, msgs...)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
req *http.Request
|
||||
wantCode int
|
||||
wantJoinedRooms []string
|
||||
}{
|
||||
{
|
||||
name: "missing access token",
|
||||
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
||||
"timeout": "0",
|
||||
})),
|
||||
wantCode: 401,
|
||||
},
|
||||
{
|
||||
name: "unknown access token",
|
||||
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
||||
"access_token": "foo",
|
||||
"timeout": "0",
|
||||
})),
|
||||
wantCode: 401,
|
||||
},
|
||||
{
|
||||
name: "valid access token",
|
||||
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
||||
"access_token": alice.AccessToken,
|
||||
"timeout": "0",
|
||||
})),
|
||||
wantCode: 200,
|
||||
wantJoinedRooms: []string{room.ID},
|
||||
},
|
||||
}
|
||||
// TODO: find a better way
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
for _, tc := range testCases {
|
||||
w := httptest.NewRecorder()
|
||||
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
|
||||
if w.Code != tc.wantCode {
|
||||
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
|
||||
}
|
||||
if tc.wantJoinedRooms != nil {
|
||||
var res types.Response
|
||||
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
||||
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
|
||||
}
|
||||
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
|
||||
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
|
||||
}
|
||||
t.Logf("res: %+v", res.Rooms.Join[room.ID])
|
||||
|
||||
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
||||
for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
||||
gotEventIDs[i] = ev.EventID
|
||||
}
|
||||
test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
|
||||
}
|
||||
}
|
||||
}
|
||||
72
test/base.go
72
test/base.go
|
|
@ -1,11 +1,83 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) {
|
||||
var cfg config.Dendrite
|
||||
cfg.Defaults(false)
|
||||
cfg.Global.JetStream.InMemory = true
|
||||
|
||||
switch dbType {
|
||||
case DBTypePostgres:
|
||||
cfg.Global.Defaults(true) // autogen a signing key
|
||||
cfg.MediaAPI.Defaults(true) // autogen a media path
|
||||
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
|
||||
// the file system event with InMemory=true :(
|
||||
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
|
||||
connStr, close := PrepareDBConnectionString(t, dbType)
|
||||
cfg.Global.DatabaseOptions = config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
MaxOpenConnections: 10,
|
||||
MaxIdleConnections: 2,
|
||||
ConnMaxLifetimeSeconds: 60,
|
||||
}
|
||||
return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), close
|
||||
case DBTypeSQLite:
|
||||
cfg.Defaults(true) // sets a sqlite db per component
|
||||
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
|
||||
// the file system event with InMemory=true :(
|
||||
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
|
||||
return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), func() {
|
||||
// cleanup db files. This risks getting out of sync as we add more database strings :(
|
||||
dbFiles := []config.DataSource{
|
||||
cfg.AppServiceAPI.Database.ConnectionString,
|
||||
cfg.FederationAPI.Database.ConnectionString,
|
||||
cfg.KeyServer.Database.ConnectionString,
|
||||
cfg.MSCs.Database.ConnectionString,
|
||||
cfg.MediaAPI.Database.ConnectionString,
|
||||
cfg.RoomServer.Database.ConnectionString,
|
||||
cfg.SyncAPI.Database.ConnectionString,
|
||||
cfg.UserAPI.AccountDatabase.ConnectionString,
|
||||
}
|
||||
for _, fileURI := range dbFiles {
|
||||
path := strings.TrimPrefix(string(fileURI), "file:")
|
||||
err := os.Remove(path)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
t.Fatalf("failed to cleanup sqlite db '%s': %s", fileURI, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
t.Fatalf("unknown db type: %v", dbType)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) {
|
||||
if cfg == nil {
|
||||
cfg = &config.Dendrite{}
|
||||
|
|
|
|||
45
test/http.go
Normal file
45
test/http.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type HTTPRequestOpt func(req *http.Request)
|
||||
|
||||
func WithJSONBody(t *testing.T, body interface{}) HTTPRequestOpt {
|
||||
t.Helper()
|
||||
b, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
t.Fatalf("WithJSONBody: %s", err)
|
||||
}
|
||||
return func(req *http.Request) {
|
||||
req.Body = io.NopCloser(bytes.NewBuffer(b))
|
||||
}
|
||||
}
|
||||
|
||||
func WithQueryParams(qps map[string]string) HTTPRequestOpt {
|
||||
var vals url.Values = map[string][]string{}
|
||||
for k, v := range qps {
|
||||
vals.Set(k, v)
|
||||
}
|
||||
return func(req *http.Request) {
|
||||
req.URL.RawQuery = vals.Encode()
|
||||
}
|
||||
}
|
||||
|
||||
func NewRequest(t *testing.T, method, path string, opts ...HTTPRequestOpt) *http.Request {
|
||||
t.Helper()
|
||||
req, err := http.NewRequest(method, "http://localhost"+path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to make new HTTP request %v %v : %v", method, path, err)
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(req)
|
||||
}
|
||||
return req
|
||||
}
|
||||
35
test/jetstream.go
Normal file
35
test/jetstream.go
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
|
||||
t.Helper()
|
||||
for _, msg := range msgs {
|
||||
if _, err := jsctx.PublishMsg(msg); err != nil {
|
||||
t.Fatalf("MustPublishMsgs: failed to publish message: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
|
||||
t.Helper()
|
||||
msg := &nats.Msg{
|
||||
Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
||||
Header: nats.Header{},
|
||||
}
|
||||
msg.Header.Set(jetstream.RoomID, roomID)
|
||||
var err error
|
||||
msg.Data, err = json.Marshal(update)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to marshal update: %s", err)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
Loading…
Reference in a new issue