diff --git a/CHANGES.md b/CHANGES.md index 6278bcba4..b13908f73 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,41 @@ # Changelog +## Dendrite 0.8.3 (2022-05-09) + +### Features + +* Open registration is now harder to enable, which should reduce the chance that Dendrite servers will be used to conduct spam or abuse attacks + * Dendrite will only enable open registration if you pass the `--really-enable-open-registration` command line flag at startup + * If open registration is enabled but this command line flag is not passed, Dendrite will fail to start up +* Dendrite now supports phone-home statistic reporting + * These statistics include things like the number of registered and active users, some configuration options and platform/environment details, to help us to understand how Dendrite is used + * This is not enabled by default — it must be enabled in the `global.report_stats` section of the config file +* Monolith installations can now be configured with a single global database connection pool (in `global.database` in the config) rather than having to configure each component separately + * This also means that you no longer need to balance connection counts between different components, as they will share the same larger pool + * Specific components can override the global database settings by specifying their own `database` block + * To use only the global pool, you must configure `global.database` and then remove the `database` block from all of the component sections of the config file +* A new admin API endpoint `/_dendrite/admin/evacuateRoom/{roomID}` has been added, allowing server admins to forcefully part all local users from a given room +* The sync notifier now only loads members for the relevant rooms, which should reduce CPU usage and load on the database +* A number of component interfaces have been refactored for cleanliness and developer ease +* Event auth errors in the log should now be much more useful, including the reason for the event failures +* The forward extremity calculation in the roomserver has been simplified +* A new index has been added to the one-time keys table in the keyserver which should speed up key count lookups + +### Fixes + +* Dendrite will no longer process events for rooms where there are no local users joined, which should help to reduce CPU and RAM usage +* A bug has been fixed in event auth when changing the user levels in `m.room.power_levels` events +* Usernames should no longer be duplicated when no room name is set +* Device display names should now be correctly propagated over federation +* A panic when uploading cross-signing signatures has been fixed +* Presence is now correctly limited in `/sync` based on the filters +* The presence stream position returned by `/sync` will now be correct if no presence events were returned +* The media `/config` endpoint will no longer return a maximum upload size field if it is configured to be unlimited in the Dendrite config +* The server notices room will no longer produce "User is already joined to the room" errors +* Consumer errors will no longer flood the logs during a graceful shutdown +* Sync API and federation API consumers will no longer unnecessarily query added state events matching the one in the output event +* The Sync API will no longer unnecessarily track invites for remote users + ## Dendrite 0.8.2 (2022-04-27) ### Features diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index ff2c8e5d4..80317ee69 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -146,28 +146,25 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - eventsRes := &api.QueryEventsByIDResponse{} - if len(ore.AddsStateEventIDs) > 0 { + addsStateEvents, missingEventIDs := ore.NeededStateEventIDs() + + // Ask the roomserver and add in the rest of the results into the set. + // Finally, work out if there are any more events missing. + if len(missingEventIDs) > 0 { eventsReq := &api.QueryEventsByIDRequest{ - EventIDs: ore.AddsStateEventIDs, + EventIDs: missingEventIDs, } + eventsRes := &api.QueryEventsByIDResponse{} if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) } - - found := false - for _, event := range eventsRes.Events { - if event.EventID() == ore.Event.EventID() { - found = true - break - } - } - if !found { - eventsRes.Events = append(eventsRes.Events, ore.Event) + if len(eventsRes.Events) != len(missingEventIDs) { + return fmt.Errorf("missing state events") } + addsStateEvents = append(addsStateEvents, eventsRes.Events...) } - addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events)) + addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents)) if err != nil { return err } diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go index 4774c8820..31e9a4c73 100644 --- a/federationapi/federationapi_keys_test.go +++ b/federationapi/federationapi_keys_test.go @@ -102,7 +102,7 @@ func TestMain(m *testing.M) { ) // Finally, build the server key APIs. - sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics) + sbase := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics) s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true) } diff --git a/internal/version.go b/internal/version.go index e74548831..08c02cfcd 100644 --- a/internal/version.go +++ b/internal/version.go @@ -18,7 +18,7 @@ const ( VersionMajor = 0 VersionMinor = 8 VersionPatch = 3 - VersionTag = "rc1" // example: "rc1" + VersionTag = "" // example: "rc1" ) func VersionString() string { diff --git a/mediaapi/thumbnailer/thumbnailer_bimg.go b/mediaapi/thumbnailer/thumbnailer_bimg.go index 6ca533176..fa1acbf08 100644 --- a/mediaapi/thumbnailer/thumbnailer_bimg.go +++ b/mediaapi/thumbnailer/thumbnailer_bimg.go @@ -37,7 +37,7 @@ func GenerateThumbnails( mediaMetadata *types.MediaMetadata, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, - db *storage.Database, + db storage.Database, logger *log.Entry, ) (busy bool, errorReturn error) { buffer, err := bimg.Read(string(src)) @@ -49,7 +49,7 @@ func GenerateThumbnails( for _, config := range configs { // Note: createThumbnail does locking based on activeThumbnailGeneration busy, err = createThumbnail( - ctx, src, img, config, mediaMetadata, activeThumbnailGeneration, + ctx, src, img, types.ThumbnailSize(config), mediaMetadata, activeThumbnailGeneration, maxThumbnailGenerators, db, logger, ) if err != nil { @@ -71,7 +71,7 @@ func GenerateThumbnail( mediaMetadata *types.MediaMetadata, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, - db *storage.Database, + db storage.Database, logger *log.Entry, ) (busy bool, errorReturn error) { buffer, err := bimg.Read(string(src)) @@ -109,7 +109,7 @@ func createThumbnail( mediaMetadata *types.MediaMetadata, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, - db *storage.Database, + db storage.Database, logger *log.Entry, ) (busy bool, errorReturn error) { logger = logger.WithFields(log.Fields{ diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 767611ec4..a82bf8701 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -163,6 +163,19 @@ type OutputNewRoomEvent struct { TransactionID *TransactionID `json:"transaction_id,omitempty"` } +func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) { + addsStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, 1) + missingEventIDs := make([]string, 0, len(o.AddsStateEventIDs)) + for _, eventID := range o.AddsStateEventIDs { + if eventID != o.Event.EventID() { + missingEventIDs = append(missingEventIDs, eventID) + } else { + addsStateEvents = append(addsStateEvents, o.Event) + } + } + return addsStateEvents, missingEventIDs +} + // An OutputOldRoomEvent is written when the roomserver receives an old event. // This will typically happen as a result of getting either missing events // or backfilling. Downstream components may wish to send these events to diff --git a/setup/base/base.go b/setup/base/base.go index 0e7528a03..5cbd7da9c 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -86,6 +86,7 @@ type BaseDendrite struct { DNSCache *gomatrixserverlib.DNSCache Database *sql.DB DatabaseWriter sqlutil.Writer + EnableMetrics bool } const NoListener = "" @@ -96,7 +97,7 @@ const HTTPClientTimeout = time.Second * 30 type BaseDendriteOptions int const ( - NoCacheMetrics BaseDendriteOptions = iota + DisableMetrics BaseDendriteOptions = iota UseHTTPAPIs PolylithMode ) @@ -107,12 +108,12 @@ const ( func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...BaseDendriteOptions) *BaseDendrite { platformSanityChecks() useHTTPAPIs := false - cacheMetrics := true + enableMetrics := true isMonolith := true for _, opt := range options { switch opt { - case NoCacheMetrics: - cacheMetrics = false + case DisableMetrics: + enableMetrics = false case UseHTTPAPIs: useHTTPAPIs = true case PolylithMode: @@ -160,7 +161,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base } } - cache, err := caching.NewInMemoryLRUCache(cacheMetrics) + cache, err := caching.NewInMemoryLRUCache(enableMetrics) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") } @@ -246,6 +247,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base apiHttpClient: &apiClient, Database: db, // set if monolith with global connection pool only DatabaseWriter: writer, // set if monolith with global connection pool only + EnableMetrics: enableMetrics, } } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 426f02bb6..248b0e656 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -13,6 +13,7 @@ import ( "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) @@ -21,6 +22,13 @@ type NATSInstance struct { sync.Mutex } +func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) { + for _, stream := range streams { // streams are defined in streams.go + name := cfg.Prefixed(stream.Name) + _ = js.DeleteStream(name) + } +} + func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 7712c8403..f0ca2106f 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -154,41 +154,61 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( ctx context.Context, msg api.OutputNewRoomEvent, ) error { ev := msg.Event + addsStateEvents, missingEventIDs := msg.NeededStateEventIDs() - addsStateEvents := []*gomatrixserverlib.HeaderedEvent{} - foundEventIDs := map[string]bool{} - if len(msg.AddsStateEventIDs) > 0 { - for _, eventID := range msg.AddsStateEventIDs { - foundEventIDs[eventID] = false - } - foundEvents, err := s.db.Events(ctx, msg.AddsStateEventIDs) + // Work out the list of events we need to find out about. Either + // they will be the event supplied in the request, we will find it + // in the sync API database or we'll need to ask the roomserver. + knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs)) + for _, eventID := range missingEventIDs { + knownEventIDs[eventID] = false + } + + // Look the events up in the database. If we know them, add them into + // the set of adds state events. + if len(missingEventIDs) > 0 { + alreadyKnown, err := s.db.Events(ctx, missingEventIDs) if err != nil { return fmt.Errorf("s.db.Events: %w", err) } - for _, event := range foundEvents { - foundEventIDs[event.EventID()] = true + for _, knownEvent := range alreadyKnown { + knownEventIDs[knownEvent.EventID()] = true + addsStateEvents = append(addsStateEvents, knownEvent) + } + } + + // Now work out if there are any remaining events we don't know. For + // these we will need to ask the roomserver for help. + missingEventIDs = missingEventIDs[:0] + for eventID, known := range knownEventIDs { + if !known { + missingEventIDs = append(missingEventIDs, eventID) + } + } + + // Ask the roomserver and add in the rest of the results into the set. + // Finally, work out if there are any more events missing. + if len(missingEventIDs) > 0 { + eventsReq := &api.QueryEventsByIDRequest{ + EventIDs: missingEventIDs, } - eventsReq := &api.QueryEventsByIDRequest{} eventsRes := &api.QueryEventsByIDResponse{} - for eventID, found := range foundEventIDs { - if !found { - eventsReq.EventIDs = append(eventsReq.EventIDs, eventID) - } - } - if err = s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil { + if err := s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil { return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) } for _, event := range eventsRes.Events { - eventID := event.EventID() - foundEvents = append(foundEvents, event) - foundEventIDs[eventID] = true + addsStateEvents = append(addsStateEvents, event) + knownEventIDs[event.EventID()] = true } - for eventID, found := range foundEventIDs { + + // This should never happen because this would imply that the + // roomserver has sent us adds_state_event_ids for events that it + // also doesn't know about, but let's just be sure. + for eventID, found := range knownEventIDs { if !found { return fmt.Errorf("event %s is missing", eventID) } } - addsStateEvents = foundEvents } ev, err := s.updateStateEvent(ev) @@ -327,9 +347,11 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, ) { if msg.Event.StateKey() == nil { - log.WithFields(log.Fields{ - "event": string(msg.Event.JSON()), - }).Panicf("roomserver output log: invite has no state key") + return + } + if _, serverName, err := gomatrixserverlib.SplitID('@', *msg.Event.StateKey()); err != nil { + return + } else if serverName != s.cfg.Matrix.ServerName { return } pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 99d1e40c3..8ab130911 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -65,11 +65,13 @@ func NewRequestPool( userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, streams *streams.Streams, notifier *notifier.Notifier, - producer PresencePublisher, + producer PresencePublisher, enableMetrics bool, ) *RequestPool { - prometheus.MustRegister( - activeSyncRequests, waitingSyncRequests, - ) + if enableMetrics { + prometheus.MustRegister( + activeSyncRequests, waitingSyncRequests, + ) + } rp := &RequestPool{ db: db, cfg: cfg, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index dbc6e240c..d8bacb2da 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -65,7 +65,7 @@ func AddPublicRoutes( JetStream: js, } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer, base.EnableMetrics) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go new file mode 100644 index 000000000..12b5178d8 --- /dev/null +++ b/syncapi/syncapi_test.go @@ -0,0 +1,162 @@ +package syncapi + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" + rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/test" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/nats-io/nats.go" +) + +type syncRoomserverAPI struct { + rsapi.SyncRoomserverAPI + rooms []*test.Room +} + +func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error { + var room *test.Room + for _, r := range s.rooms { + if r.ID == req.RoomID { + room = r + break + } + } + if room == nil { + res.RoomExists = false + return nil + } + res.RoomVersion = room.Version + return nil // TODO: return state +} + +type syncUserAPI struct { + userapi.SyncUserAPI + accounts []userapi.Device +} + +func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error { + for _, acc := range s.accounts { + if acc.AccessToken == req.AccessToken { + res.Device = &acc + return nil + } + } + res.Err = "unknown user" + return nil +} + +func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error { + return nil +} + +type syncKeyAPI struct { + keyapi.KeyInternalAPI +} + +func TestSyncAPI(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + testSync(t, dbType) + }) +} + +func testSync(t *testing.T, dbType test.DBType) { + user := test.NewUser() + room := test.NewRoom(t, user) + alice := userapi.Device{ + ID: "ALICEID", + UserID: user.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "Alice", + AccountType: userapi.AccountTypeUser, + } + + base, close := test.CreateBaseDendrite(t, dbType) + defer close() + + jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + var msgs []*nats.Msg + for _, ev := range room.Events() { + var addsStateIDs []string + if ev.StateKey() != nil { + addsStateIDs = append(addsStateIDs, ev.EventID()) + } + msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{ + Type: rsapi.OutputTypeNewRoomEvent, + NewRoomEvent: &rsapi.OutputNewRoomEvent{ + Event: ev, + AddsStateEventIDs: addsStateIDs, + }, + })) + } + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) + test.MustPublishMsgs(t, jsctx, msgs...) + + testCases := []struct { + name string + req *http.Request + wantCode int + wantJoinedRooms []string + }{ + { + name: "missing access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "timeout": "0", + })), + wantCode: 401, + }, + { + name: "unknown access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": "foo", + "timeout": "0", + })), + wantCode: 401, + }, + { + name: "valid access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "timeout": "0", + })), + wantCode: 200, + wantJoinedRooms: []string{room.ID}, + }, + } + // TODO: find a better way + time.Sleep(500 * time.Millisecond) + + for _, tc := range testCases { + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, tc.req) + if w.Code != tc.wantCode { + t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) + } + if tc.wantJoinedRooms != nil { + var res types.Response + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Fatalf("%s: failed to decode response body: %s", tc.name, err) + } + if len(res.Rooms.Join) != len(tc.wantJoinedRooms) { + t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res) + } + t.Logf("res: %+v", res.Rooms.Join[room.ID]) + + gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events)) + for i, ev := range res.Rooms.Join[room.ID].Timeline.Events { + gotEventIDs[i] = ev.EventID + } + test.AssertEventIDsEqual(t, gotEventIDs, room.Events()) + } + } +} diff --git a/test/base.go b/test/base.go index 32fc8dc53..664442c03 100644 --- a/test/base.go +++ b/test/base.go @@ -1,11 +1,83 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package test import ( + "errors" + "fmt" + "io/fs" + "os" + "strings" + "testing" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/nats-io/nats.go" ) +func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) { + var cfg config.Dendrite + cfg.Defaults(false) + cfg.Global.JetStream.InMemory = true + + switch dbType { + case DBTypePostgres: + cfg.Global.Defaults(true) // autogen a signing key + cfg.MediaAPI.Defaults(true) // autogen a media path + // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use + // the file system event with InMemory=true :( + cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType) + connStr, close := PrepareDBConnectionString(t, dbType) + cfg.Global.DatabaseOptions = config.DatabaseOptions{ + ConnectionString: config.DataSource(connStr), + MaxOpenConnections: 10, + MaxIdleConnections: 2, + ConnMaxLifetimeSeconds: 60, + } + return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), close + case DBTypeSQLite: + cfg.Defaults(true) // sets a sqlite db per component + // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use + // the file system event with InMemory=true :( + cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType) + return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), func() { + // cleanup db files. This risks getting out of sync as we add more database strings :( + dbFiles := []config.DataSource{ + cfg.AppServiceAPI.Database.ConnectionString, + cfg.FederationAPI.Database.ConnectionString, + cfg.KeyServer.Database.ConnectionString, + cfg.MSCs.Database.ConnectionString, + cfg.MediaAPI.Database.ConnectionString, + cfg.RoomServer.Database.ConnectionString, + cfg.SyncAPI.Database.ConnectionString, + cfg.UserAPI.AccountDatabase.ConnectionString, + } + for _, fileURI := range dbFiles { + path := strings.TrimPrefix(string(fileURI), "file:") + err := os.Remove(path) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + t.Fatalf("failed to cleanup sqlite db '%s': %s", fileURI, err) + } + } + } + default: + t.Fatalf("unknown db type: %v", dbType) + } + return nil, nil +} + func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) { if cfg == nil { cfg = &config.Dendrite{} diff --git a/test/http.go b/test/http.go new file mode 100644 index 000000000..a458a3385 --- /dev/null +++ b/test/http.go @@ -0,0 +1,45 @@ +package test + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + "testing" +) + +type HTTPRequestOpt func(req *http.Request) + +func WithJSONBody(t *testing.T, body interface{}) HTTPRequestOpt { + t.Helper() + b, err := json.Marshal(body) + if err != nil { + t.Fatalf("WithJSONBody: %s", err) + } + return func(req *http.Request) { + req.Body = io.NopCloser(bytes.NewBuffer(b)) + } +} + +func WithQueryParams(qps map[string]string) HTTPRequestOpt { + var vals url.Values = map[string][]string{} + for k, v := range qps { + vals.Set(k, v) + } + return func(req *http.Request) { + req.URL.RawQuery = vals.Encode() + } +} + +func NewRequest(t *testing.T, method, path string, opts ...HTTPRequestOpt) *http.Request { + t.Helper() + req, err := http.NewRequest(method, "http://localhost"+path, nil) + if err != nil { + t.Fatalf("failed to make new HTTP request %v %v : %v", method, path, err) + } + for _, o := range opts { + o(req) + } + return req +} diff --git a/test/jetstream.go b/test/jetstream.go new file mode 100644 index 000000000..488c22beb --- /dev/null +++ b/test/jetstream.go @@ -0,0 +1,35 @@ +package test + +import ( + "encoding/json" + "testing" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/nats-io/nats.go" +) + +func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) { + t.Helper() + for _, msg := range msgs { + if _, err := jsctx.PublishMsg(msg); err != nil { + t.Fatalf("MustPublishMsgs: failed to publish message: %s", err) + } + } +} + +func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg { + t.Helper() + msg := &nats.Msg{ + Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + Header: nats.Header{}, + } + msg.Header.Set(jetstream.RoomID, roomID) + var err error + msg.Data, err = json.Marshal(update) + if err != nil { + t.Fatalf("failed to marshal update: %s", err) + } + return msg +}