diff --git a/README.md b/README.md index f27cb4029..ea61dac13 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ Then point your favourite Matrix client at `http://localhost:8008`. We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it -updates with CI. As of October 2020 we're at around 56% CS API coverage and 77% Federation coverage, though check +updates with CI. As of October 2020 we're at around 57% CS API coverage and 81% Federation coverage, though check CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse servers such as matrix.org reasonably well. There's a long list of features that are not implemented, notably: - Receipts diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go new file mode 100644 index 000000000..9fb14f056 --- /dev/null +++ b/cmd/resolve-state/main.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strconv" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/roomserver/state" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// This is a utility for inspecting state snapshots and running state resolution +// against real snapshots in an actual database. +// It takes one or more state snapshot NIDs as arguments, along with a room version +// to use for unmarshalling events, and will produce resolved output. +// +// Usage: ./resolve-state --roomversion=version snapshot [snapshot ...] +// e.g. ./resolve-state --roomversion=5 1254 1235 1282 + +var roomVersion = flag.String("roomversion", "5", "the room version to parse events as") + +// nolint:gocyclo +func main() { + ctx := context.Background() + cfg := setup.ParseFlags(true) + args := os.Args[1:] + + fmt.Println("Room version", *roomVersion) + + snapshotNIDs := []types.StateSnapshotNID{} + for _, arg := range args { + if i, err := strconv.Atoi(arg); err == nil { + snapshotNIDs = append(snapshotNIDs, types.StateSnapshotNID(i)) + } + } + + fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs") + + cache, err := caching.NewInMemoryLRUCache(true) + if err != nil { + panic(err) + } + + roomserverDB, err := storage.Open(&cfg.RoomServer.Database, cache) + if err != nil { + panic(err) + } + + blockNIDs, err := roomserverDB.StateBlockNIDs(ctx, snapshotNIDs) + if err != nil { + panic(err) + } + + var stateEntries []types.StateEntryList + for _, list := range blockNIDs { + entries, err2 := roomserverDB.StateEntries(ctx, list.StateBlockNIDs) + if err2 != nil { + panic(err2) + } + stateEntries = append(stateEntries, entries...) + } + + var eventNIDs []types.EventNID + for _, entry := range stateEntries { + for _, e := range entry.StateEntries { + eventNIDs = append(eventNIDs, e.EventNID) + } + } + + fmt.Println("Fetching", len(eventNIDs), "state events") + eventEntries, err := roomserverDB.Events(ctx, eventNIDs) + if err != nil { + panic(err) + } + + authEventIDMap := make(map[string]struct{}) + eventPtrs := make([]*gomatrixserverlib.Event, len(eventEntries)) + for i := range eventEntries { + eventPtrs[i] = &eventEntries[i].Event + for _, authEventID := range eventEntries[i].AuthEventIDs() { + authEventIDMap[authEventID] = struct{}{} + } + } + + authEventIDs := make([]string, 0, len(authEventIDMap)) + for authEventID := range authEventIDMap { + authEventIDs = append(authEventIDs, authEventID) + } + + fmt.Println("Fetching", len(authEventIDs), "auth events") + authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs) + if err != nil { + panic(err) + } + + authEventPtrs := make([]*gomatrixserverlib.Event, len(authEventEntries)) + for i := range authEventEntries { + authEventPtrs[i] = &authEventEntries[i].Event + } + + events := make([]gomatrixserverlib.Event, len(eventEntries)) + authEvents := make([]gomatrixserverlib.Event, len(authEventEntries)) + for i, ptr := range eventPtrs { + events[i] = *ptr + } + for i, ptr := range authEventPtrs { + authEvents[i] = *ptr + } + + fmt.Println("Resolving state") + resolved, err := state.ResolveConflictsAdhoc( + gomatrixserverlib.RoomVersion(*roomVersion), + events, + authEvents, + ) + if err != nil { + panic(err) + } + + fmt.Println("Resolved state contains", len(resolved), "events") + for _, event := range resolved { + fmt.Println() + fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey()) + fmt.Printf(" %s\n", string(event.Content())) + } +} diff --git a/go.mod b/go.mod index d3060fa0f..f785dd391 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd - github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8 + github.com/matrix-org/gomatrixserverlib v0.0.0-20201020162226-22169fe9cda7 github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.2 @@ -40,6 +40,7 @@ require ( github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20201006093556-760d9a7fd5ee go.uber.org/atomic v1.6.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a + golang.org/x/net v0.0.0-20200528225125-3c3fba18258b gopkg.in/h2non/bimg.v1 v1.1.4 gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index 377a2e093..7c24516d2 100644 --- a/go.sum +++ b/go.sum @@ -569,8 +569,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8 h1:GF1PxbvImWDoz1DQZNMoaYtIqQXtyLAtmQOzwwmw1OI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20201020162226-22169fe9cda7 h1:YPuewGCKaJh08NslYAhyGiLw2tg6ew9LtkW7Xr+4uTU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20201020162226-22169fe9cda7/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/internal/setup/base.go b/internal/setup/base.go index 4118c76f5..e4e9b5c2e 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -15,8 +15,10 @@ package setup import ( + "crypto/tls" "fmt" "io" + "net" "net/http" "net/url" "time" @@ -25,6 +27,8 @@ import ( "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/storage/accounts" @@ -107,7 +111,22 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo logrus.WithError(err).Warnf("Failed to create cache") } - apiClient := http.Client{Timeout: time.Minute * 10} + apiClient := http.Client{ + Timeout: time.Minute * 10, + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) { + // Ordinarily HTTP/2 would expect TLS, but the remote listener is + // H2C-enabled (HTTP/2 without encryption). Overriding the DialTLS + // function with a plain Dial allows us to trick the HTTP client + // into establishing a HTTP/2 connection without TLS. + // TODO: Eventually we will want to look at authenticating and + // encrypting these internal HTTP APIs, at which point we will have + // to reconsider H2C and change all this anyway. + return net.Dial(network, addr) + }, + }, + } client := http.Client{Timeout: HTTPClientTimeout} if cfg.FederationSender.Proxy.Enabled { client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{ @@ -269,10 +288,17 @@ func (b *BaseDendrite) SetupAndServeHTTP( internalServ := externalServ if internalAddr != NoListener && externalAddr != internalAddr { + // H2C allows us to accept HTTP/2 connections without TLS + // encryption. Since we don't currently require any form of + // authentication or encryption on these internal HTTP APIs, + // H2C gives us all of the advantages of HTTP/2 (such as + // stream multiplexing and avoiding head-of-line blocking) + // without enabling TLS. + internalH2S := &http2.Server{} internalRouter = mux.NewRouter().SkipClean(true).UseEncodedPath() internalServ = &http.Server{ Addr: string(internalAddr), - Handler: internalRouter, + Handler: h2c.NewHandler(internalRouter, internalH2S), } } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 5631959b7..dc758e9b0 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -17,7 +17,6 @@ package input import ( - "bytes" "context" "fmt" @@ -28,7 +27,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // updateLatestEvents updates the list of latest events for this room in the database and writes the @@ -141,27 +139,32 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // Work out what the latest events are. This will include the new // event if it is not already referenced. - if err := u.calculateLatest( - oldLatest, + extremitiesChanged, err := u.calculateLatest( + oldLatest, &u.event, types.StateAtEventAndReference{ EventReference: u.event.EventReference(), StateAtEvent: u.stateAtEvent, }, - ); err != nil { + ) + if err != nil { return fmt.Errorf("u.calculateLatest: %w", err) } // Now that we know what the latest events are, it's time to get the // latest state. - if err := u.latestState(); err != nil { - return fmt.Errorf("u.latestState: %w", err) - } + var updates []api.OutputEvent + if extremitiesChanged { + if err = u.latestState(); err != nil { + return fmt.Errorf("u.latestState: %w", err) + } - // If we need to generate any output events then here's where we do it. - // TODO: Move this! - updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added) - if err != nil { - return fmt.Errorf("u.api.updateMemberships: %w", err) + // If we need to generate any output events then here's where we do it. + // TODO: Move this! + if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil { + return fmt.Errorf("u.api.updateMemberships: %w", err) + } + } else { + u.newStateNID = u.oldStateNID } update, err := u.makeOutputNewRoomEvent() @@ -250,50 +253,74 @@ func (u *latestEventsUpdater) latestState() error { // true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, - newEvent types.StateAtEventAndReference, -) error { - var newLatest []types.StateAtEventAndReference - - // First of all, let's see if any of the existing forward extremities - // now have entries in the previous events table. If they do then we - // will no longer include them as forward extremities. - for _, l := range oldLatest { - referenced, err := u.updater.IsReferenced(l.EventReference) - if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID) - return fmt.Errorf("u.updater.IsReferenced (old): %w", err) - } else if !referenced { - newLatest = append(newLatest, l) - } + newEvent *gomatrixserverlib.Event, + newStateAndRef types.StateAtEventAndReference, +) (bool, error) { + // First of all, get a list of all of the events in our current + // set of forward extremities. + existingRefs := make(map[string]*types.StateAtEventAndReference) + existingNIDs := make([]types.EventNID, len(oldLatest)) + for i, old := range oldLatest { + existingRefs[old.EventID] = &oldLatest[i] + existingNIDs[i] = old.EventNID } - // Then check and see if our new event is already included in that set. - // This ordinarily won't happen but it covers the edge-case that we've - // already seen this event before and it's a forward extremity, so rather - // than adding a duplicate, we'll just return the set as complete. - for _, l := range newLatest { - if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) { - // We've already referenced this new event so we can just return - // the newly completed extremities at this point. - u.latest = newLatest - return nil - } - } - - // At this point we've processed the old extremities, and we've checked - // that our new event isn't already in that set. Therefore now we can - // check if our *new* event is a forward extremity, and if it is, add - // it in. - referenced, err := u.updater.IsReferenced(newEvent.EventReference) + // Look up the old extremity events. This allows us to find their + // prev events. + events, err := u.api.DB.Events(u.ctx, existingNIDs) if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID) - return fmt.Errorf("u.updater.IsReferenced (new): %w", err) - } else if !referenced || len(newLatest) == 0 { - newLatest = append(newLatest, newEvent) + return false, fmt.Errorf("u.api.DB.Events: %w", err) + } + + // Make a list of all of the prev events as referenced by all of + // the current forward extremities. + existingPrevs := make(map[string]struct{}) + for _, old := range events { + for _, prevEventID := range old.PrevEventIDs() { + existingPrevs[prevEventID] = struct{}{} + } + } + + // If the "new" event is already referenced by a forward extremity + // then do nothing - it's not a candidate to be a new extremity if + // it has been referenced. + if _, ok := existingPrevs[newEvent.EventID()]; ok { + return false, nil + } + + // If the "new" event is already a forward extremity then stop, as + // nothing changes. + for _, event := range events { + if event.EventID() == newEvent.EventID() { + return false, nil + } + } + + // Include our new event in the extremities. + newLatest := []types.StateAtEventAndReference{newStateAndRef} + + // Then run through and see if the other extremities are still valid. + // If our new event references them then they are no longer good + // candidates. + for _, prevEventID := range newEvent.PrevEventIDs() { + delete(existingRefs, prevEventID) + } + + // Ensure that we don't add any candidate forward extremities from + // the old set that are, themselves, referenced by the old set of + // forward extremities. This shouldn't happen but guards against + // the possibility anyway. + for prevEventID := range existingPrevs { + delete(existingRefs, prevEventID) + } + + // Then re-add any old extremities that are still valid after all. + for _, old := range existingRefs { + newLatest = append(newLatest, *old) } u.latest = newLatest - return nil + return true, nil } func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 2944f71c1..d23f14c84 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -526,13 +526,7 @@ func (v StateResolution) CalculateAndStoreStateBeforeEvent( isRejected bool, ) (types.StateSnapshotNID, error) { // Load the state at the prev events. - prevEventRefs := event.PrevEvents() - prevEventIDs := make([]string, len(prevEventRefs)) - for i := range prevEventRefs { - prevEventIDs[i] = prevEventRefs[i].EventID - } - - prevStates, err := v.db.StateAtEventIDs(ctx, prevEventIDs) + prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs()) if err != nil { return 0, err } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 51dcb8887..aec15ab22 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -27,23 +27,24 @@ import ( const redactionsArePermanent = true type Database struct { - DB *sql.DB - Cache caching.RoomServerCaches - Writer sqlutil.Writer - EventsTable tables.Events - EventJSONTable tables.EventJSON - EventTypesTable tables.EventTypes - EventStateKeysTable tables.EventStateKeys - RoomsTable tables.Rooms - TransactionsTable tables.Transactions - StateSnapshotTable tables.StateSnapshot - StateBlockTable tables.StateBlock - RoomAliasesTable tables.RoomAliases - PrevEventsTable tables.PreviousEvents - InvitesTable tables.Invites - MembershipTable tables.Membership - PublishedTable tables.Published - RedactionsTable tables.Redactions + DB *sql.DB + Cache caching.RoomServerCaches + Writer sqlutil.Writer + EventsTable tables.Events + EventJSONTable tables.EventJSON + EventTypesTable tables.EventTypes + EventStateKeysTable tables.EventStateKeys + RoomsTable tables.Rooms + TransactionsTable tables.Transactions + StateSnapshotTable tables.StateSnapshot + StateBlockTable tables.StateBlock + RoomAliasesTable tables.RoomAliases + PrevEventsTable tables.PreviousEvents + InvitesTable tables.Invites + MembershipTable tables.Membership + PublishedTable tables.Published + RedactionsTable tables.Redactions + GetLatestEventsForUpdateFn func(ctx context.Context, roomInfo types.RoomInfo) (*LatestEventsUpdater, error) } func (d *Database) SupportsConcurrentRoomInputs() bool { @@ -372,6 +373,9 @@ func (d *Database) MembershipUpdater( func (d *Database) GetLatestEventsForUpdate( ctx context.Context, roomInfo types.RoomInfo, ) (*LatestEventsUpdater, error) { + if d.GetLatestEventsForUpdateFn != nil { + return d.GetLatestEventsForUpdateFn(ctx, roomInfo) + } txn, err := d.DB.Begin() if err != nil { return nil, err diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 4a74bf736..6d9b860f5 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -120,23 +120,24 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) return nil, err } d.Database = shared.Database{ - DB: d.db, - Cache: cache, - Writer: d.writer, - EventsTable: d.events, - EventTypesTable: d.eventTypes, - EventStateKeysTable: d.eventStateKeys, - EventJSONTable: d.eventJSON, - RoomsTable: d.rooms, - TransactionsTable: d.transactions, - StateBlockTable: stateBlock, - StateSnapshotTable: stateSnapshot, - PrevEventsTable: d.prevEvents, - RoomAliasesTable: roomAliases, - InvitesTable: d.invites, - MembershipTable: d.membership, - PublishedTable: published, - RedactionsTable: redactions, + DB: d.db, + Cache: cache, + Writer: d.writer, + EventsTable: d.events, + EventTypesTable: d.eventTypes, + EventStateKeysTable: d.eventStateKeys, + EventJSONTable: d.eventJSON, + RoomsTable: d.rooms, + TransactionsTable: d.transactions, + StateBlockTable: stateBlock, + StateSnapshotTable: stateSnapshot, + PrevEventsTable: d.prevEvents, + RoomAliasesTable: roomAliases, + InvitesTable: d.invites, + MembershipTable: d.membership, + PublishedTable: published, + RedactionsTable: redactions, + GetLatestEventsForUpdateFn: d.GetLatestEventsForUpdate, } return &d, nil } diff --git a/sytest-whitelist b/sytest-whitelist index cecf24f75..1a12b591b 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -485,3 +485,4 @@ Event with an invalid signature in the send_join response should not cause room Inbound federation rejects typing notifications from wrong remote Should not be able to take over the room by pretending there is no PL event Can get rooms/{roomId}/state for a departed room (SPEC-216) +Users cannot set notifications powerlevel higher than their own