Merge branch 'master' into fix-failing-ban-tests

This commit is contained in:
Neil Alexander 2021-06-30 12:32:28 +01:00 committed by GitHub
commit 56a2a5843a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 667 additions and 86 deletions

View file

@ -1,14 +1,15 @@
package conn
import (
"context"
"fmt"
"net"
"net/http"
"strings"
"github.com/gorilla/websocket"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/gomatrixserverlib"
"nhooyr.io/websocket"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
@ -17,11 +18,12 @@ import (
func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
var parent net.Conn
if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") {
c, _, err := websocket.DefaultDialer.Dial(peer, nil)
ctx := context.Background()
c, _, err := websocket.Dial(ctx, peer, nil)
if err != nil {
return fmt.Errorf("websocket.DefaultDialer.Dial: %w", err)
}
parent = WrapWebSocketConn(c)
parent = websocket.NetConn(ctx, c, websocket.MessageBinary)
} else {
var err error
parent, err = net.Dial("tcp", peer)
@ -46,7 +48,13 @@ func (y *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
}
func createTransport(s *pineconeSessions.Sessions) *http.Transport {
tr := &http.Transport{}
tr := &http.Transport{
DisableKeepAlives: false,
Dial: s.Dial,
DialContext: s.DialContext,
DialTLS: s.DialTLS,
DialTLSContext: s.DialTLSContext,
}
tr.RegisterProtocol(
"matrix", &RoundTripper{
inner: &http.Transport{

View file

@ -33,7 +33,7 @@ func FederationAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
base.PublicFederationAPIMux, base.PublicKeyAPIMux,
&base.Cfg.FederationAPI, userAPI, federation, keyRing,
rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
&base.Cfg.MSCs,
&base.Cfg.MSCs, nil,
)
base.SetupAndServeHTTP(

View file

@ -0,0 +1,100 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build wasm
package main
import (
"bufio"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"syscall/js"
)
// JSServer exposes an HTTP-like server interface which allows JS to 'send' requests to it.
type JSServer struct {
// The router which will service requests
Mux http.Handler
}
// OnRequestFromJS is the function that JS will invoke when there is a new request.
// The JS function signature is:
// function(reqString: string): Promise<{result: string, error: string}>
// Usage is like:
// const res = await global._go_js_server.fetch(reqString);
// if (res.error) {
// // handle error: this is a 'network' error, not a non-2xx error.
// }
// const rawHttpResponse = res.result;
func (h *JSServer) OnRequestFromJS(this js.Value, args []js.Value) interface{} {
// we HAVE to spawn a new goroutine and return immediately or else Go will deadlock
// if this request blocks at all e.g for /sync calls
httpStr := args[0].String()
promise := js.Global().Get("Promise").New(js.FuncOf(func(pthis js.Value, pargs []js.Value) interface{} {
// The initial callback code for new Promise() is also called on the critical path, which is why
// we need to put this in an immediately invoked goroutine.
go func() {
resolve := pargs[0]
resStr, err := h.handle(httpStr)
errStr := ""
if err != nil {
errStr = err.Error()
}
resolve.Invoke(map[string]interface{}{
"result": resStr,
"error": errStr,
})
}()
return nil
}))
return promise
}
// handle invokes the http.ServeMux for this request and returns the raw HTTP response.
func (h *JSServer) handle(httpStr string) (resStr string, err error) {
req, err := http.ReadRequest(bufio.NewReader(strings.NewReader(httpStr)))
if err != nil {
return
}
w := httptest.NewRecorder()
h.Mux.ServeHTTP(w, req)
res := w.Result()
var resBuffer strings.Builder
err = res.Write(&resBuffer)
return resBuffer.String(), err
}
// ListenAndServe registers a variable in JS-land with the given namespace. This variable is
// a function which JS-land can call to 'send' HTTP requests. The function is attached to
// a global object called "_go_js_server". See OnRequestFromJS for more info.
func (h *JSServer) ListenAndServe(namespace string) {
globalName := "_go_js_server"
// register a hook in JS-land for it to invoke stuff
server := js.Global().Get(globalName)
if !server.Truthy() {
server = js.Global().Get("Object").New()
js.Global().Set(globalName, server)
}
server.Set(namespace, js.FuncOf(h.OnRequestFromJS))
fmt.Printf("Listening for requests from JS on function %s.%s\n", globalName, namespace)
// Block forever to mimic http.ListenAndServe
select {}
}

View file

@ -0,0 +1,256 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build wasm
package main
import (
"crypto/ed25519"
"encoding/hex"
"fmt"
"log"
"os"
"syscall/js"
"time"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
_ "github.com/matrix-org/go-sqlite3-js"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
var GitCommit string
func init() {
fmt.Printf("[%s] dendrite.js starting...\n", GitCommit)
}
const publicPeer = "wss://pinecone.matrix.org/public"
const keyNameEd25519 = "_go_ed25519_key"
func readKeyFromLocalStorage() (key ed25519.PrivateKey, err error) {
localforage := js.Global().Get("localforage")
if !localforage.Truthy() {
err = fmt.Errorf("readKeyFromLocalStorage: no localforage")
return
}
// https://localforage.github.io/localForage/
item, ok := await(localforage.Call("getItem", keyNameEd25519))
if !ok || !item.Truthy() {
err = fmt.Errorf("readKeyFromLocalStorage: no key in localforage")
return
}
fmt.Println("Found key in localforage")
// extract []byte and make an ed25519 key
seed := make([]byte, 32, 32)
js.CopyBytesToGo(seed, item)
return ed25519.NewKeyFromSeed(seed), nil
}
func writeKeyToLocalStorage(key ed25519.PrivateKey) error {
localforage := js.Global().Get("localforage")
if !localforage.Truthy() {
return fmt.Errorf("writeKeyToLocalStorage: no localforage")
}
// make a Uint8Array from the key's seed
seed := key.Seed()
jsSeed := js.Global().Get("Uint8Array").New(len(seed))
js.CopyBytesToJS(jsSeed, seed)
// write it
localforage.Call("setItem", keyNameEd25519, jsSeed)
return nil
}
// taken from https://go-review.googlesource.com/c/go/+/150917
// await waits until the promise v has been resolved or rejected and returns the promise's result value.
// The boolean value ok is true if the promise has been resolved, false if it has been rejected.
// If v is not a promise, v itself is returned as the value and ok is true.
func await(v js.Value) (result js.Value, ok bool) {
if v.Type() != js.TypeObject || v.Get("then").Type() != js.TypeFunction {
return v, true
}
done := make(chan struct{})
onResolve := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
result = args[0]
ok = true
close(done)
return nil
})
defer onResolve.Release()
onReject := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
result = args[0]
ok = false
close(done)
return nil
})
defer onReject.Release()
v.Call("then", onResolve, onReject)
<-done
return
}
func generateKey() ed25519.PrivateKey {
// attempt to look for a seed in JS-land and if it exists use it.
priv, err := readKeyFromLocalStorage()
if err == nil {
fmt.Println("Read key from localStorage")
return priv
}
// generate a new key
fmt.Println(err, " : Generating new ed25519 key")
_, priv, err = ed25519.GenerateKey(nil)
if err != nil {
logrus.Fatalf("Failed to generate ed25519 key: %s", err)
}
if err := writeKeyToLocalStorage(priv); err != nil {
fmt.Println("failed to write key to localStorage: ", err)
// non-fatal, we'll just have amnesia for a while
}
return priv
}
func main() {
sk := generateKey()
pk := sk.Public().(ed25519.PublicKey)
logger := log.New(os.Stdout, "", 0)
pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil)
pSessions := pineconeSessions.NewSessions(logger, pRouter)
cfg := &config.Dendrite{}
cfg.Defaults()
cfg.UserAPI.AccountDatabase.ConnectionString = "file:/idb/dendritejs_account.db"
cfg.AppServiceAPI.Database.ConnectionString = "file:/idb/dendritejs_appservice.db"
cfg.UserAPI.DeviceDatabase.ConnectionString = "file:/idb/dendritejs_device.db"
cfg.FederationSender.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db"
cfg.MediaAPI.Database.ConnectionString = "file:/idb/dendritejs_mediaapi.db"
cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db"
cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db"
cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db"
cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db"
cfg.Global.Kafka.UseNaffka = true
cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db"
cfg.Global.TrustedIDServers = []string{}
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
cfg.Global.PrivateKey = sk
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
if err := cfg.Derive(); err != nil {
logrus.Fatalf("Failed to derive values from config: %s", err)
}
base := setup.NewBaseDendrite(cfg, "Monolith", false)
defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB()
federation := conn.CreateFederationClient(base, pSessions)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
rsAPI := roomserver.NewInternalAPI(base, keyRing)
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, keyRing, true)
rsAPI.SetFederationSenderAPI(fedSenderAPI)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
Client: conn.CreateClient(base, pSessions),
FedClient: federation,
KeyRing: keyRing,
AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI,
FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation),
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
base.PublicMediaAPIMux,
)
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
p2pRouter := pSessions.HTTP().Mux()
p2pRouter.Handle(httputil.PublicFederationPathPrefix, base.PublicFederationAPIMux)
p2pRouter.Handle(httputil.PublicMediaPathPrefix, base.PublicMediaAPIMux)
// Expose the matrix APIs via fetch - for local traffic
go func() {
logrus.Info("Listening for service-worker fetch traffic")
s := JSServer{
Mux: httpRouter,
}
s.ListenAndServe("fetch")
}()
// Connect to the static peer
go func() {
for {
if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
if err := conn.ConnectToPeer(pRouter, publicPeer); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
select {
case <-base.ProcessContext.Context().Done():
return
case <-time.After(time.Second * 5):
}
}
}()
// We want to block forever to let the fetch and libp2p handler serve the APIs
select {}
}

View file

@ -0,0 +1,23 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package main
import "fmt"
func main() {
fmt.Println("dendritejs: no-op when not compiling for WebAssembly")
}

View file

@ -0,0 +1,11 @@
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
)
type ServersInRoomProvider interface {
GetServersForRoom(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName
}

View file

@ -17,6 +17,7 @@ package federationapi
import (
"github.com/gorilla/mux"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -39,10 +40,12 @@ func AddPublicRoutes(
eduAPI eduserverAPI.EDUServerInputAPI,
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) {
routing.Setup(
fedRouter, keyRouter, cfg, rsAPI,
eduAPI, federationSenderAPI, keyRing,
federation, userAPI, keyAPI, mscCfg,
servers,
)
}

View file

@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationSenderHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs)
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))

View file

@ -20,6 +20,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil"
@ -50,6 +51,7 @@ func Setup(
userAPI userapi.UserInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) {
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
@ -99,7 +101,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu,
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers,
)
},
)).Methods(http.MethodPut, http.MethodOptions)

View file

@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
@ -26,6 +27,7 @@ import (
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
@ -100,14 +102,16 @@ func Send(
keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient,
mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
) util.JSONResponse {
t := txnReq{
rsAPI: rsAPI,
eduAPI: eduAPI,
keys: keys,
federation: federation,
hadEvents: make(map[string]bool),
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool),
servers: servers,
keyAPI: keyAPI,
roomsMu: mu,
}
@ -159,21 +163,21 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
rsAPI api.RoomserverInternalAPI
eduAPI eduserverAPI.EDUServerInputAPI
keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
servers []gomatrixserverlib.ServerName
serversMutex sync.RWMutex
roomsMu *internal.MutexByRoom
rsAPI api.RoomserverInternalAPI
eduAPI eduserverAPI.EDUServerInputAPI
keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
roomsMu *internal.MutexByRoom
// something that can tell us about which servers are in a room right now
servers federationAPI.ServersInRoomProvider
// a list of events from the auth and prev events which we already had
hadEvents map[string]bool
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
// new events which the roomserver does not know about
newEvents map[string]bool
newEventsMutex sync.RWMutex
work string // metrics
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
haveEventsMutex sync.Mutex
work string // metrics
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@ -340,19 +344,6 @@ func (e missingPrevEventsError) Error() string {
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
}
func (t *txnReq) haveEventIDs() map[string]bool {
t.newEventsMutex.RLock()
defer t.newEventsMutex.RUnlock()
result := make(map[string]bool, len(t.haveEvents))
for eventID := range t.haveEvents {
if t.newEvents[eventID] {
continue
}
result[eventID] = true
}
return result
}
func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs {
eduCountTotal.Inc()
@ -479,22 +470,24 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
}
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
t.serversMutex.Lock()
defer t.serversMutex.Unlock()
func (t *txnReq) getServers(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName {
// The server that sent us the event should be sufficient to tell us about missing
// prev and auth events.
servers := []gomatrixserverlib.ServerName{t.Origin}
// If the event origin is different to the transaction origin then we can use
// this as a last resort. The origin server that created the event would have
// had to know the auth and prev events.
if event != nil {
if origin := event.Origin(); origin != t.Origin {
servers = append(servers, origin)
}
}
// If a specific room-to-server provider exists then use that. This will primarily
// be used for the P2P demos.
if t.servers != nil {
return t.servers
servers = append(servers, t.servers.GetServersForRoom(ctx, roomID, event)...)
}
t.servers = []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID,
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
t.servers = append(t.servers, serverRes.ServerNames...)
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
}
return t.servers
return servers
}
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
@ -527,6 +520,15 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
return roomNotFoundError{e.RoomID()}
}
// Prepare a map of all the events we already had before this point, so
// that we don't send them to the roomserver again.
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
t.hadEvents[eventID] = true
}
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
t.hadEvents[eventID] = false
}
if len(stateResp.MissingAuthEventIDs) > 0 {
t.work = MetricsWorkMissingAuthEvents
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
@ -570,11 +572,14 @@ func (t *txnReq) retrieveMissingAuthEvents(
withNextEvent:
for missingAuthEventID := range missingAuthEvents {
withNextServer:
for _, server := range t.getServers(ctx, e.RoomID()) {
for _, server := range t.getServers(ctx, e.RoomID(), e) {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil {
logger.WithError(err).Warnf("Failed to retrieve auth event %q", missingAuthEventID)
if errors.Is(err, context.DeadlineExceeded) {
return err
}
continue withNextServer
}
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
@ -596,6 +601,8 @@ withNextEvent:
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
}
t.hadEvents[ev.EventID()] = true // if the roomserver didn't know about the event before, it does now
t.cacheAndReturn(ev.Headered(stateResp.RoomVersion))
delete(missingAuthEvents, missingAuthEventID)
continue withNextEvent
}
@ -618,9 +625,14 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
return gomatrixserverlib.Allowed(e, &authUsingState)
}
var processEventWithMissingStateMutexes = internal.NewMutexByRoom()
func (t *txnReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) error {
processEventWithMissingStateMutexes.Lock(e.RoomID())
defer processEventWithMissingStateMutexes.Unlock(e.RoomID())
// Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote
// side retries, we'll have fetched the missing state.
@ -734,7 +746,7 @@ func (t *txnReq) processEventWithMissingState(
api.KindOld,
resolvedState,
backwardsExtremity.Headered(roomVersion),
t.haveEventIDs(),
t.hadEvents,
)
if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err)
@ -786,7 +798,7 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
default:
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
}
t.cacheAndReturn(h)
h = t.cacheAndReturn(h)
if h.StateKey() != nil {
addedToState := false
for i := range respState.StateEvents {
@ -806,6 +818,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
}
func (t *txnReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent {
t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock()
if cached, exists := t.haveEvents[ev.EventID()]; exists {
return cached
}
@ -828,6 +842,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
// processEvent request, which is better for memory.
stateEvents[i] = t.cacheAndReturn(ev)
t.hadEvents[ev.EventID()] = true
}
// we should never access res.StateEvents again so we delete it here to make GC faster
res.StateEvents = nil
@ -835,6 +850,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
var authEvents []*gomatrixserverlib.Event
missingAuthEvents := map[string]bool{}
for _, ev := range stateEvents {
t.haveEventsMutex.Lock()
for _, ae := range ev.AuthEventIDs() {
if aev, ok := t.haveEvents[ae]; ok {
authEvents = append(authEvents, aev.Unwrap())
@ -842,6 +858,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
missingAuthEvents[ae] = true
}
}
t.haveEventsMutex.Unlock()
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
@ -858,8 +875,9 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil
}
for i := range queryRes.Events {
for i, ev := range queryRes.Events {
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
t.hadEvents[ev.EventID()] = true
}
queryRes.Events = nil
}
@ -934,12 +952,13 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
return nil, err
}
latestEvents := make([]string, len(res.LatestEvents))
for i := range res.LatestEvents {
for i, ev := range res.LatestEvents {
latestEvents[i] = res.LatestEvents[i].EventID
t.hadEvents[ev.EventID] = true
}
var missingResp *gomatrixserverlib.RespMissingEvents
servers := t.getServers(ctx, e.RoomID())
servers := t.getServers(ctx, e.RoomID(), e)
for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
@ -953,6 +972,9 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
break
} else {
logger.WithError(err).Errorf("%s pushed us an event but %q did not respond to /get_missing_events", t.Origin, server)
if errors.Is(err, context.DeadlineExceeded) {
break
}
}
}
@ -980,6 +1002,12 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
// For now, we do not allow Case B, so reject the event.
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
// Make sure events from the missingResp are using the cache - missing events
// will be added and duplicates will be removed.
for i, ev := range missingResp.Events {
missingResp.Events[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
// topologically sort and sanity check that we are making forward progress
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
shouldHaveSomeEventIDs := e.PrevEventIDs()
@ -1018,6 +1046,14 @@ func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID
if err := state.Check(ctx, t.keys, nil); err != nil {
return nil, err
}
// Cache the results of this state lookup and deduplicate anything we already
// have in the cache, freeing up memory.
for i, ev := range state.AuthEvents {
state.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
for i, ev := range state.StateEvents {
state.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
}
return &state, nil
}
@ -1033,6 +1069,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...)
missing := make(map[string]bool)
var missingEventList []string
t.haveEventsMutex.Lock()
for _, sid := range wantIDs {
if _, ok := t.haveEvents[sid]; !ok {
if !missing[sid] {
@ -1041,6 +1078,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
}
}
}
t.haveEventsMutex.Unlock()
// fetch as many as we can from the roomserver
queryReq := api.QueryEventsByIDRequest{
@ -1050,9 +1088,10 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil, err
}
for i := range queryRes.Events {
for i, ev := range queryRes.Events {
queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i])
t.hadEvents[ev.EventID()] = true
evID := queryRes.Events[i].EventID()
t.cacheAndReturn(queryRes.Events[i])
if missing[evID] {
delete(missing, evID)
}
@ -1153,6 +1192,9 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
*gomatrixserverlib.RespState, error) { // nolint:unparam
t.haveEventsMutex.Lock()
defer t.haveEventsMutex.Unlock()
// create a RespState response using the response to /state_ids as a guide
respState := gomatrixserverlib.RespState{}
@ -1193,11 +1235,14 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
}
var event *gomatrixserverlib.Event
found := false
servers := t.getServers(ctx, roomID)
servers := t.getServers(ctx, roomID, nil)
for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("Failed to get missing /event for event ID")
if errors.Is(err, context.DeadlineExceeded) {
break
}
continue
}
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(txn.PDUs[0], roomVersion)
@ -1216,9 +1261,5 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, verifySigError{event.EventID(), err}
}
h := event.Headered(roomVersion)
t.newEventsMutex.Lock()
t.newEvents[h.EventID()] = true
t.newEventsMutex.Unlock()
return h, nil
return t.cacheAndReturn(event.Headered(roomVersion)), nil
}

View file

@ -370,7 +370,7 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat
keys: &test.NopJSONVerifier{},
federation: fedClient,
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool),
hadEvents: make(map[string]bool),
roomsMu: internal.NewMutexByRoom(),
}
t.PDUs = pdus

7
go.mod
View file

@ -21,11 +21,11 @@ require (
github.com/lucas-clemente/quic-go v0.19.3
github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
github.com/matrix-org/pinecone v0.0.0-20210622111727-6e630fb016ac
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
@ -47,6 +47,7 @@ require (
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
gopkg.in/h2non/bimg.v1 v1.1.5
gopkg.in/yaml.v2 v2.4.0
nhooyr.io/websocket v1.8.7
)
go 1.14

47
go.sum
View file

@ -55,9 +55,7 @@ github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgw
github.com/anacrolix/missinggo v1.1.2-0.20190815015349-b888af804467/go.mod h1:MBJu3Sk/k3ZfGYcS7z18gwfu72Ey/xopPFJJbTi5yIo=
github.com/anacrolix/missinggo v1.2.1 h1:0IE3TqX5y5D0IxeMwTyIgqdDew4QrzcXaaEnJQyjHvw=
github.com/anacrolix/missinggo v1.2.1/go.mod h1:J5cMhif8jPmFoC3+Uvob3OXXNIhOUikzMt+uUjeM21Y=
github.com/anacrolix/missinggo/perf v1.0.0 h1:7ZOGYziGEBytW49+KmYGTaNfnwUqP1HBsy6BqESAJVw=
github.com/anacrolix/missinggo/perf v1.0.0/go.mod h1:ljAFWkBuzkO12MQclXzZrosP5urunoLS0Cbvb4V0uMQ=
github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY=
github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/utp v0.1.0/go.mod h1:MDwc+vsGEq7RMw6lr2GKOEqjWny5hO5OZXRVNaBJ2Dk=
@ -185,7 +183,11 @@ github.com/getsentry/sentry-go v0.10.0 h1:6gwY+66NHKqyZrdi6O2jGdo7wGdo9b3B69E01N
github.com/getsentry/sentry-go v0.10.0/go.mod h1:kELm/9iCblqUYh+ZRML7PNdCvEuw24wBvJPYyi86cws=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
@ -199,10 +201,21 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@ -227,6 +240,7 @@ github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -418,6 +432,7 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
@ -439,6 +454,7 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
@ -461,6 +477,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8=
github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
@ -697,17 +715,17 @@ github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b h1:xpcmnpfUImRC4
github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b/go.mod h1:NgPCr+UavRGH6n5jmdX8DuqFZ4JiCWIJoZiuhTRLSUg=
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 h1:eqE5OnGx9ZMWmrRbD3KF/3KtTunw0iQulI7YxOIdxo4=
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4/go.mod h1:3WluEZ9QXSwU30tWYqktnpC1x9mwZKx1r8uAv8Iq+a4=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf/iHhWlLWd+kCgG+Fsg4Dc+xBl7hptfK7lD0zY=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8 h1:/FKUeUlCATr1gXxYqlaJgH8FW/sw0Jz8t7s8BIlECfg=
github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a h1:pVhOeJpD0gv5boUnihefPDuYkQ6xSdEVbH5ld5Vvve0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161 h1:h1XVh05pLoC+nJjP3GIpj5wUsuC8WdHP3He0RTkRJTs=
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20210622111727-6e630fb016ac h1:x9qICcE2aKXZPy8H62Z3VE2JRodotz6zHBMYhccbrzc=
github.com/matrix-org/pinecone v0.0.0-20210622111727-6e630fb016ac/go.mod h1:UQzJS6UVyVwfkr+RLrdvBB1vLyECqe3fLYNcbRxv8SA=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b h1:5X5vdWQ13xrNkJVqaJHPsrt7rKkMJH5iac0EtfOuxSg=
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b/go.mod h1:CVlrvs1R5iz7Omy2GqAjJJKbACn07GZgUq1Gli18FYE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
@ -721,6 +739,8 @@ github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.2/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
@ -756,8 +776,10 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
@ -831,8 +853,8 @@ github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/neilalexander/utp v0.1.1-0.20210510143443-c1bac1cd577f h1:xcJVva0Ziw+Ud4AaY/g9OMNc7veEfsYVox3eItY2w8Q=
github.com/neilalexander/utp v0.1.1-0.20210510143443-c1bac1cd577f/go.mod h1:ylsx0342RjGHjOoVKhR/wz/7Lhiusonihfj4QLxEMcU=
github.com/neilalexander/utp v0.1.1-0.20210622132614-ee9a34a30488 h1:xZk82i6JK2d0SqRIXwaxj7J/NQB6ngq0PuMx3wXBaRQ=
github.com/neilalexander/utp v0.1.1-0.20210622132614-ee9a34a30488/go.mod h1:NPHGhPc0/wudcaCqL/H5AOddkRf8GPRhzOujuUKGQu8=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6 h1:evlcQnJY+v8XRRchV3hXzpHDl6GcEZeLXAhlH9Csdww=
@ -1024,8 +1046,10 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
@ -1228,6 +1252,7 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1383,6 +1408,8 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=

View file

@ -38,8 +38,7 @@ type Inputer struct {
ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs
OutputRoomEventTopic string
workers sync.Map // room ID -> *inputWorker
workers sync.Map // room ID -> *inputWorker
}
type inputTask struct {
@ -52,7 +51,7 @@ type inputTask struct {
type inputWorker struct {
r *Inputer
running atomic.Bool
input chan *inputTask
input *fifoQueue
}
// Guarded by a CAS on w.running
@ -60,7 +59,11 @@ func (w *inputWorker) start() {
defer w.running.Store(false)
for {
select {
case task := <-w.input:
case <-w.input.wait():
task, ok := w.input.pop()
if !ok {
continue
}
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil {
@ -143,7 +146,7 @@ func (r *Inputer) InputRoomEvents(
// room - the channel will be quite small as it's just pointer types.
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
r: r,
input: make(chan *inputTask, 32),
input: newFIFOQueue(),
})
worker := w.(*inputWorker)
@ -160,7 +163,7 @@ func (r *Inputer) InputRoomEvents(
if worker.running.CAS(false, true) {
go worker.start()
}
worker.input <- tasks[i]
worker.input.push(tasks[i])
}
// Wait for all of the workers to return results about our tasks.

View file

@ -0,0 +1,64 @@
package input
import (
"sync"
)
type fifoQueue struct {
tasks []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}
func newFIFOQueue() *fifoQueue {
q := &fifoQueue{
notifs: make(chan struct{}, 1),
}
return q
}
func (q *fifoQueue) push(frame *inputTask) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.tasks = append(q.tasks, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
}
// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
// You must check this value, even after calling wait().
func (q *fifoQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.tasks[0]
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.tasks = nil
}
return frame, true
}
// wait returns a channel which can be used to detect when an
// item is waiting in the queue.
func (q *fifoQueue) wait() <-chan struct{} {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count > 0 && len(q.notifs) == 0 {
ch := make(chan struct{})
close(ch)
return ch
}
return q.notifs
}

View file

@ -119,11 +119,15 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
_roomserver_state_snapshots
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
WHERE
_roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT
_roomserver_state_snapshots.state_snapshot_nid = ANY (
SELECT
_roomserver_state_snapshots.state_snapshot_nid
FROM
_roomserver_state_snapshots
LIMIT $1 OFFSET $2)) AS _roomserver_state_block
ORDER BY _roomserver_state_snapshots.state_snapshot_nid ASC
LIMIT $1 OFFSET $2
)
) AS _roomserver_state_block
GROUP BY
state_snapshot_nid,
room_nid,
@ -202,6 +206,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
}
// By this point we should have no more state_snapshot_nids below maxsnapshotid in either roomserver_rooms or roomserver_events
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
// in roomserver_state_snapshots
var count int64
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if _, err = tx.Exec(`
DROP TABLE _roomserver_state_snapshots;
DROP SEQUENCE roomserver_state_snapshot_nid_seq;

View file

@ -31,6 +31,7 @@ func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}
// nolint:gocyclo
func UpStateBlocksRefactor(tx *sql.Tx) error {
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
defer logrus.Warn("State storage upgrade complete")
@ -45,6 +46,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
maxsnapshotid++
maxblockid++
oldMaxSnapshotID := maxsnapshotid
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
@ -133,6 +135,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
if jerr != nil {
return fmt.Errorf("json.Marshal (new blocks): %w", jerr)
}
var newsnapshot types.StateSnapshotNID
err = tx.QueryRow(`
INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids)
@ -144,7 +147,8 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
}
maxsnapshotid++
if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
_, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid)
if err != nil {
return fmt.Errorf("tx.Exec (update events): %w", err)
}
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
@ -153,6 +157,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
}
}
// By this point we should have no more state_snapshot_nids below oldMaxSnapshotID in either roomserver_rooms or roomserver_events
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
// in roomserver_state_snapshots
var count int64
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err)
}
if count > 0 {
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
}
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
}

View file

@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
federationapi.AddPublicRoutes(
ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(