mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-25 07:43:10 -06:00
Merge branch 'master' of https://github.com/matrix-org/dendrite into add-health-endpoint
This commit is contained in:
commit
77ef8eb368
|
|
@ -75,7 +75,7 @@ Then point your favourite Matrix client at `http://localhost:8008`.
|
||||||
|
|
||||||
We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver
|
We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver
|
||||||
test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it
|
test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it
|
||||||
updates with CI. As of October 2020 we're at around 56% CS API coverage and 77% Federation coverage, though check
|
updates with CI. As of October 2020 we're at around 57% CS API coverage and 81% Federation coverage, though check
|
||||||
CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse
|
CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse
|
||||||
servers such as matrix.org reasonably well. There's a long list of features that are not implemented, notably:
|
servers such as matrix.org reasonably well. There's a long list of features that are not implemented, notably:
|
||||||
- Receipts
|
- Receipts
|
||||||
|
|
|
||||||
132
cmd/resolve-state/main.go
Normal file
132
cmd/resolve-state/main.go
Normal file
|
|
@ -0,0 +1,132 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This is a utility for inspecting state snapshots and running state resolution
|
||||||
|
// against real snapshots in an actual database.
|
||||||
|
// It takes one or more state snapshot NIDs as arguments, along with a room version
|
||||||
|
// to use for unmarshalling events, and will produce resolved output.
|
||||||
|
//
|
||||||
|
// Usage: ./resolve-state --roomversion=version snapshot [snapshot ...]
|
||||||
|
// e.g. ./resolve-state --roomversion=5 1254 1235 1282
|
||||||
|
|
||||||
|
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
|
||||||
|
|
||||||
|
// nolint:gocyclo
|
||||||
|
func main() {
|
||||||
|
ctx := context.Background()
|
||||||
|
cfg := setup.ParseFlags(true)
|
||||||
|
args := os.Args[1:]
|
||||||
|
|
||||||
|
fmt.Println("Room version", *roomVersion)
|
||||||
|
|
||||||
|
snapshotNIDs := []types.StateSnapshotNID{}
|
||||||
|
for _, arg := range args {
|
||||||
|
if i, err := strconv.Atoi(arg); err == nil {
|
||||||
|
snapshotNIDs = append(snapshotNIDs, types.StateSnapshotNID(i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs")
|
||||||
|
|
||||||
|
cache, err := caching.NewInMemoryLRUCache(true)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
roomserverDB, err := storage.Open(&cfg.RoomServer.Database, cache)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blockNIDs, err := roomserverDB.StateBlockNIDs(ctx, snapshotNIDs)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var stateEntries []types.StateEntryList
|
||||||
|
for _, list := range blockNIDs {
|
||||||
|
entries, err2 := roomserverDB.StateEntries(ctx, list.StateBlockNIDs)
|
||||||
|
if err2 != nil {
|
||||||
|
panic(err2)
|
||||||
|
}
|
||||||
|
stateEntries = append(stateEntries, entries...)
|
||||||
|
}
|
||||||
|
|
||||||
|
var eventNIDs []types.EventNID
|
||||||
|
for _, entry := range stateEntries {
|
||||||
|
for _, e := range entry.StateEntries {
|
||||||
|
eventNIDs = append(eventNIDs, e.EventNID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Fetching", len(eventNIDs), "state events")
|
||||||
|
eventEntries, err := roomserverDB.Events(ctx, eventNIDs)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
authEventIDMap := make(map[string]struct{})
|
||||||
|
eventPtrs := make([]*gomatrixserverlib.Event, len(eventEntries))
|
||||||
|
for i := range eventEntries {
|
||||||
|
eventPtrs[i] = &eventEntries[i].Event
|
||||||
|
for _, authEventID := range eventEntries[i].AuthEventIDs() {
|
||||||
|
authEventIDMap[authEventID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
authEventIDs := make([]string, 0, len(authEventIDMap))
|
||||||
|
for authEventID := range authEventIDMap {
|
||||||
|
authEventIDs = append(authEventIDs, authEventID)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Fetching", len(authEventIDs), "auth events")
|
||||||
|
authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
authEventPtrs := make([]*gomatrixserverlib.Event, len(authEventEntries))
|
||||||
|
for i := range authEventEntries {
|
||||||
|
authEventPtrs[i] = &authEventEntries[i].Event
|
||||||
|
}
|
||||||
|
|
||||||
|
events := make([]gomatrixserverlib.Event, len(eventEntries))
|
||||||
|
authEvents := make([]gomatrixserverlib.Event, len(authEventEntries))
|
||||||
|
for i, ptr := range eventPtrs {
|
||||||
|
events[i] = *ptr
|
||||||
|
}
|
||||||
|
for i, ptr := range authEventPtrs {
|
||||||
|
authEvents[i] = *ptr
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Resolving state")
|
||||||
|
resolved, err := state.ResolveConflictsAdhoc(
|
||||||
|
gomatrixserverlib.RoomVersion(*roomVersion),
|
||||||
|
events,
|
||||||
|
authEvents,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Resolved state contains", len(resolved), "events")
|
||||||
|
for _, event := range resolved {
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
|
||||||
|
fmt.Printf(" %s\n", string(event.Content()))
|
||||||
|
}
|
||||||
|
}
|
||||||
3
go.mod
3
go.mod
|
|
@ -22,7 +22,7 @@ require (
|
||||||
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
|
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20201020162226-22169fe9cda7
|
||||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
|
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.2
|
github.com/mattn/go-sqlite3 v1.14.2
|
||||||
|
|
@ -40,6 +40,7 @@ require (
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20201006093556-760d9a7fd5ee
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20201006093556-760d9a7fd5ee
|
||||||
go.uber.org/atomic v1.6.0
|
go.uber.org/atomic v1.6.0
|
||||||
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
|
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
|
||||||
|
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b
|
||||||
gopkg.in/h2non/bimg.v1 v1.1.4
|
gopkg.in/h2non/bimg.v1 v1.1.4
|
||||||
gopkg.in/yaml.v2 v2.3.0
|
gopkg.in/yaml.v2 v2.3.0
|
||||||
)
|
)
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -569,8 +569,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
|
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8 h1:GF1PxbvImWDoz1DQZNMoaYtIqQXtyLAtmQOzwwmw1OI=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20201020162226-22169fe9cda7 h1:YPuewGCKaJh08NslYAhyGiLw2tg6ew9LtkW7Xr+4uTU=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20201020162226-22169fe9cda7/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
||||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
|
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
|
||||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
|
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
|
||||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
|
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
|
||||||
|
|
|
||||||
|
|
@ -15,8 +15,10 @@
|
||||||
package setup
|
package setup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -25,6 +27,8 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
"golang.org/x/net/http2/h2c"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||||
|
|
@ -107,7 +111,22 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
|
||||||
logrus.WithError(err).Warnf("Failed to create cache")
|
logrus.WithError(err).Warnf("Failed to create cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
apiClient := http.Client{Timeout: time.Minute * 10}
|
apiClient := http.Client{
|
||||||
|
Timeout: time.Minute * 10,
|
||||||
|
Transport: &http2.Transport{
|
||||||
|
AllowHTTP: true,
|
||||||
|
DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) {
|
||||||
|
// Ordinarily HTTP/2 would expect TLS, but the remote listener is
|
||||||
|
// H2C-enabled (HTTP/2 without encryption). Overriding the DialTLS
|
||||||
|
// function with a plain Dial allows us to trick the HTTP client
|
||||||
|
// into establishing a HTTP/2 connection without TLS.
|
||||||
|
// TODO: Eventually we will want to look at authenticating and
|
||||||
|
// encrypting these internal HTTP APIs, at which point we will have
|
||||||
|
// to reconsider H2C and change all this anyway.
|
||||||
|
return net.Dial(network, addr)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
client := http.Client{Timeout: HTTPClientTimeout}
|
client := http.Client{Timeout: HTTPClientTimeout}
|
||||||
if cfg.FederationSender.Proxy.Enabled {
|
if cfg.FederationSender.Proxy.Enabled {
|
||||||
client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{
|
client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{
|
||||||
|
|
@ -269,10 +288,17 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
||||||
internalServ := externalServ
|
internalServ := externalServ
|
||||||
|
|
||||||
if internalAddr != NoListener && externalAddr != internalAddr {
|
if internalAddr != NoListener && externalAddr != internalAddr {
|
||||||
|
// H2C allows us to accept HTTP/2 connections without TLS
|
||||||
|
// encryption. Since we don't currently require any form of
|
||||||
|
// authentication or encryption on these internal HTTP APIs,
|
||||||
|
// H2C gives us all of the advantages of HTTP/2 (such as
|
||||||
|
// stream multiplexing and avoiding head-of-line blocking)
|
||||||
|
// without enabling TLS.
|
||||||
|
internalH2S := &http2.Server{}
|
||||||
internalRouter = mux.NewRouter().SkipClean(true).UseEncodedPath()
|
internalRouter = mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||||
internalServ = &http.Server{
|
internalServ = &http.Server{
|
||||||
Addr: string(internalAddr),
|
Addr: string(internalAddr),
|
||||||
Handler: internalRouter,
|
Handler: h2c.NewHandler(internalRouter, internalH2S),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,6 @@
|
||||||
package input
|
package input
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
|
@ -28,7 +27,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// updateLatestEvents updates the list of latest events for this room in the database and writes the
|
// updateLatestEvents updates the list of latest events for this room in the database and writes the
|
||||||
|
|
@ -141,27 +139,32 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
|
|
||||||
// Work out what the latest events are. This will include the new
|
// Work out what the latest events are. This will include the new
|
||||||
// event if it is not already referenced.
|
// event if it is not already referenced.
|
||||||
if err := u.calculateLatest(
|
extremitiesChanged, err := u.calculateLatest(
|
||||||
oldLatest,
|
oldLatest, &u.event,
|
||||||
types.StateAtEventAndReference{
|
types.StateAtEventAndReference{
|
||||||
EventReference: u.event.EventReference(),
|
EventReference: u.event.EventReference(),
|
||||||
StateAtEvent: u.stateAtEvent,
|
StateAtEvent: u.stateAtEvent,
|
||||||
},
|
},
|
||||||
); err != nil {
|
)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("u.calculateLatest: %w", err)
|
return fmt.Errorf("u.calculateLatest: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we know what the latest events are, it's time to get the
|
// Now that we know what the latest events are, it's time to get the
|
||||||
// latest state.
|
// latest state.
|
||||||
if err := u.latestState(); err != nil {
|
var updates []api.OutputEvent
|
||||||
return fmt.Errorf("u.latestState: %w", err)
|
if extremitiesChanged {
|
||||||
}
|
if err = u.latestState(); err != nil {
|
||||||
|
return fmt.Errorf("u.latestState: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// If we need to generate any output events then here's where we do it.
|
// If we need to generate any output events then here's where we do it.
|
||||||
// TODO: Move this!
|
// TODO: Move this!
|
||||||
updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added)
|
if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("u.api.updateMemberships: %w", err)
|
||||||
return fmt.Errorf("u.api.updateMemberships: %w", err)
|
}
|
||||||
|
} else {
|
||||||
|
u.newStateNID = u.oldStateNID
|
||||||
}
|
}
|
||||||
|
|
||||||
update, err := u.makeOutputNewRoomEvent()
|
update, err := u.makeOutputNewRoomEvent()
|
||||||
|
|
@ -250,50 +253,74 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
// true if the new event is included in those extremites, false otherwise.
|
// true if the new event is included in those extremites, false otherwise.
|
||||||
func (u *latestEventsUpdater) calculateLatest(
|
func (u *latestEventsUpdater) calculateLatest(
|
||||||
oldLatest []types.StateAtEventAndReference,
|
oldLatest []types.StateAtEventAndReference,
|
||||||
newEvent types.StateAtEventAndReference,
|
newEvent *gomatrixserverlib.Event,
|
||||||
) error {
|
newStateAndRef types.StateAtEventAndReference,
|
||||||
var newLatest []types.StateAtEventAndReference
|
) (bool, error) {
|
||||||
|
// First of all, get a list of all of the events in our current
|
||||||
// First of all, let's see if any of the existing forward extremities
|
// set of forward extremities.
|
||||||
// now have entries in the previous events table. If they do then we
|
existingRefs := make(map[string]*types.StateAtEventAndReference)
|
||||||
// will no longer include them as forward extremities.
|
existingNIDs := make([]types.EventNID, len(oldLatest))
|
||||||
for _, l := range oldLatest {
|
for i, old := range oldLatest {
|
||||||
referenced, err := u.updater.IsReferenced(l.EventReference)
|
existingRefs[old.EventID] = &oldLatest[i]
|
||||||
if err != nil {
|
existingNIDs[i] = old.EventNID
|
||||||
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID)
|
|
||||||
return fmt.Errorf("u.updater.IsReferenced (old): %w", err)
|
|
||||||
} else if !referenced {
|
|
||||||
newLatest = append(newLatest, l)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then check and see if our new event is already included in that set.
|
// Look up the old extremity events. This allows us to find their
|
||||||
// This ordinarily won't happen but it covers the edge-case that we've
|
// prev events.
|
||||||
// already seen this event before and it's a forward extremity, so rather
|
events, err := u.api.DB.Events(u.ctx, existingNIDs)
|
||||||
// than adding a duplicate, we'll just return the set as complete.
|
|
||||||
for _, l := range newLatest {
|
|
||||||
if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) {
|
|
||||||
// We've already referenced this new event so we can just return
|
|
||||||
// the newly completed extremities at this point.
|
|
||||||
u.latest = newLatest
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point we've processed the old extremities, and we've checked
|
|
||||||
// that our new event isn't already in that set. Therefore now we can
|
|
||||||
// check if our *new* event is a forward extremity, and if it is, add
|
|
||||||
// it in.
|
|
||||||
referenced, err := u.updater.IsReferenced(newEvent.EventReference)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID)
|
return false, fmt.Errorf("u.api.DB.Events: %w", err)
|
||||||
return fmt.Errorf("u.updater.IsReferenced (new): %w", err)
|
}
|
||||||
} else if !referenced || len(newLatest) == 0 {
|
|
||||||
newLatest = append(newLatest, newEvent)
|
// Make a list of all of the prev events as referenced by all of
|
||||||
|
// the current forward extremities.
|
||||||
|
existingPrevs := make(map[string]struct{})
|
||||||
|
for _, old := range events {
|
||||||
|
for _, prevEventID := range old.PrevEventIDs() {
|
||||||
|
existingPrevs[prevEventID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the "new" event is already referenced by a forward extremity
|
||||||
|
// then do nothing - it's not a candidate to be a new extremity if
|
||||||
|
// it has been referenced.
|
||||||
|
if _, ok := existingPrevs[newEvent.EventID()]; ok {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the "new" event is already a forward extremity then stop, as
|
||||||
|
// nothing changes.
|
||||||
|
for _, event := range events {
|
||||||
|
if event.EventID() == newEvent.EventID() {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Include our new event in the extremities.
|
||||||
|
newLatest := []types.StateAtEventAndReference{newStateAndRef}
|
||||||
|
|
||||||
|
// Then run through and see if the other extremities are still valid.
|
||||||
|
// If our new event references them then they are no longer good
|
||||||
|
// candidates.
|
||||||
|
for _, prevEventID := range newEvent.PrevEventIDs() {
|
||||||
|
delete(existingRefs, prevEventID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that we don't add any candidate forward extremities from
|
||||||
|
// the old set that are, themselves, referenced by the old set of
|
||||||
|
// forward extremities. This shouldn't happen but guards against
|
||||||
|
// the possibility anyway.
|
||||||
|
for prevEventID := range existingPrevs {
|
||||||
|
delete(existingRefs, prevEventID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then re-add any old extremities that are still valid after all.
|
||||||
|
for _, old := range existingRefs {
|
||||||
|
newLatest = append(newLatest, *old)
|
||||||
}
|
}
|
||||||
|
|
||||||
u.latest = newLatest
|
u.latest = newLatest
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
|
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
|
||||||
|
|
|
||||||
|
|
@ -526,13 +526,7 @@ func (v StateResolution) CalculateAndStoreStateBeforeEvent(
|
||||||
isRejected bool,
|
isRejected bool,
|
||||||
) (types.StateSnapshotNID, error) {
|
) (types.StateSnapshotNID, error) {
|
||||||
// Load the state at the prev events.
|
// Load the state at the prev events.
|
||||||
prevEventRefs := event.PrevEvents()
|
prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs())
|
||||||
prevEventIDs := make([]string, len(prevEventRefs))
|
|
||||||
for i := range prevEventRefs {
|
|
||||||
prevEventIDs[i] = prevEventRefs[i].EventID
|
|
||||||
}
|
|
||||||
|
|
||||||
prevStates, err := v.db.StateAtEventIDs(ctx, prevEventIDs)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,23 +27,24 @@ import (
|
||||||
const redactionsArePermanent = true
|
const redactionsArePermanent = true
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
Cache caching.RoomServerCaches
|
Cache caching.RoomServerCaches
|
||||||
Writer sqlutil.Writer
|
Writer sqlutil.Writer
|
||||||
EventsTable tables.Events
|
EventsTable tables.Events
|
||||||
EventJSONTable tables.EventJSON
|
EventJSONTable tables.EventJSON
|
||||||
EventTypesTable tables.EventTypes
|
EventTypesTable tables.EventTypes
|
||||||
EventStateKeysTable tables.EventStateKeys
|
EventStateKeysTable tables.EventStateKeys
|
||||||
RoomsTable tables.Rooms
|
RoomsTable tables.Rooms
|
||||||
TransactionsTable tables.Transactions
|
TransactionsTable tables.Transactions
|
||||||
StateSnapshotTable tables.StateSnapshot
|
StateSnapshotTable tables.StateSnapshot
|
||||||
StateBlockTable tables.StateBlock
|
StateBlockTable tables.StateBlock
|
||||||
RoomAliasesTable tables.RoomAliases
|
RoomAliasesTable tables.RoomAliases
|
||||||
PrevEventsTable tables.PreviousEvents
|
PrevEventsTable tables.PreviousEvents
|
||||||
InvitesTable tables.Invites
|
InvitesTable tables.Invites
|
||||||
MembershipTable tables.Membership
|
MembershipTable tables.Membership
|
||||||
PublishedTable tables.Published
|
PublishedTable tables.Published
|
||||||
RedactionsTable tables.Redactions
|
RedactionsTable tables.Redactions
|
||||||
|
GetLatestEventsForUpdateFn func(ctx context.Context, roomInfo types.RoomInfo) (*LatestEventsUpdater, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) SupportsConcurrentRoomInputs() bool {
|
func (d *Database) SupportsConcurrentRoomInputs() bool {
|
||||||
|
|
@ -372,6 +373,9 @@ func (d *Database) MembershipUpdater(
|
||||||
func (d *Database) GetLatestEventsForUpdate(
|
func (d *Database) GetLatestEventsForUpdate(
|
||||||
ctx context.Context, roomInfo types.RoomInfo,
|
ctx context.Context, roomInfo types.RoomInfo,
|
||||||
) (*LatestEventsUpdater, error) {
|
) (*LatestEventsUpdater, error) {
|
||||||
|
if d.GetLatestEventsForUpdateFn != nil {
|
||||||
|
return d.GetLatestEventsForUpdateFn(ctx, roomInfo)
|
||||||
|
}
|
||||||
txn, err := d.DB.Begin()
|
txn, err := d.DB.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -120,23 +120,24 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
Cache: cache,
|
Cache: cache,
|
||||||
Writer: d.writer,
|
Writer: d.writer,
|
||||||
EventsTable: d.events,
|
EventsTable: d.events,
|
||||||
EventTypesTable: d.eventTypes,
|
EventTypesTable: d.eventTypes,
|
||||||
EventStateKeysTable: d.eventStateKeys,
|
EventStateKeysTable: d.eventStateKeys,
|
||||||
EventJSONTable: d.eventJSON,
|
EventJSONTable: d.eventJSON,
|
||||||
RoomsTable: d.rooms,
|
RoomsTable: d.rooms,
|
||||||
TransactionsTable: d.transactions,
|
TransactionsTable: d.transactions,
|
||||||
StateBlockTable: stateBlock,
|
StateBlockTable: stateBlock,
|
||||||
StateSnapshotTable: stateSnapshot,
|
StateSnapshotTable: stateSnapshot,
|
||||||
PrevEventsTable: d.prevEvents,
|
PrevEventsTable: d.prevEvents,
|
||||||
RoomAliasesTable: roomAliases,
|
RoomAliasesTable: roomAliases,
|
||||||
InvitesTable: d.invites,
|
InvitesTable: d.invites,
|
||||||
MembershipTable: d.membership,
|
MembershipTable: d.membership,
|
||||||
PublishedTable: published,
|
PublishedTable: published,
|
||||||
RedactionsTable: redactions,
|
RedactionsTable: redactions,
|
||||||
|
GetLatestEventsForUpdateFn: d.GetLatestEventsForUpdate,
|
||||||
}
|
}
|
||||||
return &d, nil
|
return &d, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -485,3 +485,4 @@ Event with an invalid signature in the send_join response should not cause room
|
||||||
Inbound federation rejects typing notifications from wrong remote
|
Inbound federation rejects typing notifications from wrong remote
|
||||||
Should not be able to take over the room by pretending there is no PL event
|
Should not be able to take over the room by pretending there is no PL event
|
||||||
Can get rooms/{roomId}/state for a departed room (SPEC-216)
|
Can get rooms/{roomId}/state for a departed room (SPEC-216)
|
||||||
|
Users cannot set notifications powerlevel higher than their own
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue