diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index 2f615a6a4..033c5864b 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -4,7 +4,13 @@ on: push: branches: - main + paths: + - '**.go' # only execute on changes to go files + - '.github/workflows/**' # or workflow changes pull_request: + paths: + - '**.go' + - '.github/workflows/**' release: types: [published] workflow_dispatch: diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml index b5a8f0bbd..9df3cceae 100644 --- a/.github/workflows/gh-pages.yml +++ b/.github/workflows/gh-pages.yml @@ -4,7 +4,7 @@ name: Deploy GitHub Pages dependencies preinstalled on: # Runs on pushes targeting the default branch push: - branches: ["main"] + branches: ["gh-pages"] paths: - 'docs/**' # only execute if we have docs changes diff --git a/build/docker/README.md b/build/docker/README.md index d8abd6538..b66cb864b 100644 --- a/build/docker/README.md +++ b/build/docker/README.md @@ -50,7 +50,7 @@ The key files will now exist in your current working directory, and can be mount ## Starting Dendrite -Create your config based on the [`dendrite-sample.monolith.yaml`](https://github.com/matrix-org/dendrite/blob/main/dendrite-sample.monolith.yaml) sample configuration file. +Create your config based on the [`dendrite-sample.yaml`](https://github.com/matrix-org/dendrite/blob/main/dendrite-sample.yaml) sample configuration file. Then start the deployment: diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go index 27720369e..ea8e985c7 100644 --- a/cmd/dendrite-demo-pinecone/monolith/monolith.go +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -68,12 +68,14 @@ type P2PMonolith struct { EventChannel chan pineconeEvents.Event RelayRetriever relay.RelayServerRetriever - dendrite setup.Monolith - port int - httpMux *mux.Router - pineconeMux *mux.Router - listener net.Listener - httpListenAddr string + dendrite setup.Monolith + port int + httpMux *mux.Router + pineconeMux *mux.Router + httpServer *http.Server + listener net.Listener + httpListenAddr string + stopHandlingEvents chan bool } func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite { @@ -199,8 +201,10 @@ func (p *P2PMonolith) StartMonolith() { } func (p *P2PMonolith) Stop() { + logrus.Info("Stopping monolith") _ = p.BaseDendrite.Close() p.WaitForShutdown() + logrus.Info("Stopped monolith") } func (p *P2PMonolith) WaitForShutdown() { @@ -209,6 +213,16 @@ func (p *P2PMonolith) WaitForShutdown() { } func (p *P2PMonolith) closeAllResources() { + logrus.Info("Closing monolith resources") + if p.httpServer != nil { + _ = p.httpServer.Shutdown(context.Background()) + } + + select { + case p.stopHandlingEvents <- true: + default: + } + if p.listener != nil { _ = p.listener.Close() } @@ -224,6 +238,7 @@ func (p *P2PMonolith) closeAllResources() { if p.Router != nil { _ = p.Router.Close() } + logrus.Info("Monolith resources closed") } func (p *P2PMonolith) Addr() string { @@ -280,7 +295,7 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider, func (p *P2PMonolith) startHTTPServers() { go func() { // Build both ends of a HTTP multiplex. - httpServer := &http.Server{ + p.httpServer = &http.Server{ Addr: ":0", TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, ReadTimeout: 10 * time.Second, @@ -296,12 +311,13 @@ func (p *P2PMonolith) startHTTPServers() { pubkeyString := hex.EncodeToString(pubkey[:]) logrus.Info("Listening on ", pubkeyString) - switch httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) { + switch p.httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) { case net.ErrClosed, http.ErrServerClosed: logrus.Info("Stopped listening on ", pubkeyString) default: logrus.Error("Stopped listening on ", pubkeyString) } + logrus.Info("Stopped goroutine listening on ", pubkeyString) }() p.httpListenAddr = fmt.Sprintf(":%d", p.port) @@ -313,10 +329,12 @@ func (p *P2PMonolith) startHTTPServers() { default: logrus.Error("Stopped listening on ", p.httpListenAddr) } + logrus.Info("Stopped goroutine listening on ", p.httpListenAddr) }() } func (p *P2PMonolith) startEventHandler() { + p.stopHandlingEvents = make(chan bool) stopRelayServerSync := make(chan bool) eLog := logrus.WithField("pinecone", "events") p.RelayRetriever = relay.NewRelayServerRetriever( @@ -329,25 +347,40 @@ func (p *P2PMonolith) startEventHandler() { p.RelayRetriever.InitializeRelayServers(eLog) go func(ch <-chan pineconeEvents.Event) { - for event := range ch { - switch e := event.(type) { - case pineconeEvents.PeerAdded: - p.RelayRetriever.StartSync() - case pineconeEvents.PeerRemoved: - if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 { - stopRelayServerSync <- true - } - case pineconeEvents.BroadcastReceived: - // eLog.Info("Broadcast received from: ", e.PeerID) + for { + select { + case event := <-ch: + switch e := event.(type) { + case pineconeEvents.PeerAdded: + p.RelayRetriever.StartSync() + case pineconeEvents.PeerRemoved: + if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 { + // NOTE: Don't block on channel + select { + case stopRelayServerSync <- true: + default: + } + } + case pineconeEvents.BroadcastReceived: + // eLog.Info("Broadcast received from: ", e.PeerID) - req := &federationAPI.PerformWakeupServersRequest{ - ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + req := &federationAPI.PerformWakeupServersRequest{ + ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + } + res := &federationAPI.PerformWakeupServersResponse{} + if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil { + eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) + } } - res := &federationAPI.PerformWakeupServersResponse{} - if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil { - eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) + case <-p.stopHandlingEvents: + logrus.Info("Stopping processing pinecone events") + // NOTE: Don't block on channel + select { + case stopRelayServerSync <- true: + default: } - default: + logrus.Info("Stopped processing pinecone events") + return } } }(p.EventChannel) diff --git a/cmd/dendrite-demo-pinecone/relay/retriever.go b/cmd/dendrite-demo-pinecone/relay/retriever.go index 1b5c617ef..6b34f6416 100644 --- a/cmd/dendrite-demo-pinecone/relay/retriever.go +++ b/cmd/dendrite-demo-pinecone/relay/retriever.go @@ -38,7 +38,7 @@ type RelayServerRetriever struct { relayServersQueried map[gomatrixserverlib.ServerName]bool queriedServersMutex sync.Mutex running atomic.Bool - quit <-chan bool + quit chan bool } func NewRelayServerRetriever( @@ -46,7 +46,7 @@ func NewRelayServerRetriever( serverName gomatrixserverlib.ServerName, federationAPI federationAPI.FederationInternalAPI, relayAPI relayServerAPI.RelayInternalAPI, - quit <-chan bool, + quit chan bool, ) RelayServerRetriever { return RelayServerRetriever{ ctx: ctx, @@ -151,6 +151,7 @@ func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) { if !t.Stop() { <-t.C } + logrus.Info("Stopped relay server retriever") return case <-t.C: } diff --git a/cmd/dendrite-demo-pinecone/relay/retriever_test.go b/cmd/dendrite-demo-pinecone/relay/retriever_test.go index 8f86a3770..6c4c3a529 100644 --- a/cmd/dendrite-demo-pinecone/relay/retriever_test.go +++ b/cmd/dendrite-demo-pinecone/relay/retriever_test.go @@ -60,7 +60,7 @@ func TestRelayRetrieverInitialization(t *testing.T) { "server", &FakeFedAPI{}, &FakeRelayAPI{}, - make(<-chan bool), + make(chan bool), ) retriever.InitializeRelayServers(logrus.WithField("test", "relay")) @@ -74,7 +74,7 @@ func TestRelayRetrieverSync(t *testing.T) { "server", &FakeFedAPI{}, &FakeRelayAPI{}, - make(<-chan bool), + make(chan bool), ) retriever.InitializeRelayServers(logrus.WithField("test", "relay")) diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go index 59c66aed4..e3840bbcf 100644 --- a/cmd/resolve-state/main.go +++ b/cmd/resolve-state/main.go @@ -87,7 +87,7 @@ func main() { } var eventEntries []types.Event - eventEntries, err = roomserverDB.Events(ctx, eventNIDs) + eventEntries, err = roomserverDB.Events(ctx, 0, eventNIDs) if err != nil { panic(err) } @@ -145,7 +145,7 @@ func main() { } fmt.Println("Fetching", len(eventNIDMap), "state events") - eventEntries, err := roomserverDB.Events(ctx, eventNIDs) + eventEntries, err := roomserverDB.Events(ctx, 0, eventNIDs) if err != nil { panic(err) } @@ -165,7 +165,7 @@ func main() { } fmt.Println("Fetching", len(authEventIDs), "auth events") - authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs) + authEventEntries, err := roomserverDB.EventsFromIDs(ctx, 0, authEventIDs) if err != nil { panic(err) } diff --git a/helm/dendrite/.helm-docs/monitoring.gotmpl b/helm/dendrite/.helm-docs/monitoring.gotmpl new file mode 100644 index 000000000..3618a1c1a --- /dev/null +++ b/helm/dendrite/.helm-docs/monitoring.gotmpl @@ -0,0 +1,22 @@ +{{ define "chart.monitoringSection" }} +## Monitoring + +[![Grafana Dashboard](https://grafana.com/api/dashboards/13916/images/9894/image)](https://grafana.com/grafana/dashboards/13916-dendrite/) + +* Works well with [Prometheus Operator](https://prometheus-operator.dev/) ([Helmchart](https://artifacthub.io/packages/helm/prometheus-community/kube-prometheus-stack)) and their setup of [Grafana](https://grafana.com/grafana/), by enabling the following values: +```yaml +prometheus: + servicemonitor: + enabled: true + labels: + release: "kube-prometheus-stack" + rules: + enabled: true # will deploy alert rules + labels: + release: "kube-prometheus-stack" +grafana: + dashboards: + enabled: true # will deploy default dashboards +``` +PS: The label `release=kube-prometheus-stack` is setup with the helmchart of the Prometheus Operator. For Grafana Dashboards it may be necessary to enable scanning in the correct namespaces (or ALL), enabled by `sidecar.dashboards.searchNamespace` in [Helmchart of grafana](https://artifacthub.io/packages/helm/grafana/grafana) (which is part of PrometheusOperator, so `grafana.sidecar.dashboards.searchNamespace`) +{{ end }} \ No newline at end of file diff --git a/helm/dendrite/Chart.yaml b/helm/dendrite/Chart.yaml index 3944a76db..dc2764939 100644 --- a/helm/dendrite/Chart.yaml +++ b/helm/dendrite/Chart.yaml @@ -1,6 +1,6 @@ apiVersion: v2 name: dendrite -version: "0.11.1" +version: "0.11.2" appVersion: "0.11.1" description: Dendrite Matrix Homeserver type: application diff --git a/helm/dendrite/README.md b/helm/dendrite/README.md index 8bcc82e6e..51587b766 100644 --- a/helm/dendrite/README.md +++ b/helm/dendrite/README.md @@ -1,6 +1,6 @@ # dendrite -![Version: 0.11.1](https://img.shields.io/badge/Version-0.11.1-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.11.1](https://img.shields.io/badge/AppVersion-0.11.1-informational?style=flat-square) +![Version: 0.11.2](https://img.shields.io/badge/Version-0.11.2-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.11.1](https://img.shields.io/badge/AppVersion-0.11.1-informational?style=flat-square) Dendrite Matrix Homeserver Status: **NOT PRODUCTION READY** @@ -146,3 +146,35 @@ Create a folder `appservices` and place your configurations in there. The confi | ingress.tls | list | `[]` | | | service.type | string | `"ClusterIP"` | | | service.port | int | `8008` | | +| prometheus.servicemonitor.enabled | bool | `false` | Enable ServiceMonitor for Prometheus-Operator for scrape metric-endpoint | +| prometheus.servicemonitor.labels | object | `{}` | Extra Labels on ServiceMonitor for selector of Prometheus Instance | +| prometheus.rules.enabled | bool | `false` | Enable PrometheusRules for Prometheus-Operator for setup alerting | +| prometheus.rules.labels | object | `{}` | Extra Labels on PrometheusRules for selector of Prometheus Instance | +| prometheus.rules.additionalRules | list | `[]` | additional alertrules (no default alertrules are provided) | +| grafana.dashboards.enabled | bool | `false` | | +| grafana.dashboards.labels | object | `{"grafana_dashboard":"1"}` | Extra Labels on ConfigMap for selector of grafana sidecar | +| grafana.dashboards.annotations | object | `{}` | Extra Annotations on ConfigMap additional config in grafana sidecar | + +## Monitoring + +[![Grafana Dashboard](https://grafana.com/api/dashboards/13916/images/9894/image)](https://grafana.com/grafana/dashboards/13916-dendrite/) + +* Works well with [Prometheus Operator](https://prometheus-operator.dev/) ([Helmchart](https://artifacthub.io/packages/helm/prometheus-community/kube-prometheus-stack)) and their setup of [Grafana](https://grafana.com/grafana/), by enabling the following values: +```yaml +prometheus: + servicemonitor: + enabled: true + labels: + release: "kube-prometheus-stack" + rules: + enabled: true # will deploy alert rules + labels: + release: "kube-prometheus-stack" +grafana: + dashboards: + enabled: true # will deploy default dashboards +``` +PS: The label `release=kube-prometheus-stack` is setup with the helmchart of the Prometheus Operator. For Grafana Dashboards it may be necessary to enable scanning in the correct namespaces (or ALL), enabled by `sidecar.dashboards.searchNamespace` in [Helmchart of grafana](https://artifacthub.io/packages/helm/grafana/grafana) (which is part of PrometheusOperator, so `grafana.sidecar.dashboards.searchNamespace`) + +---------------------------------------------- +Autogenerated from chart metadata using [helm-docs vv1.11.0](https://github.com/norwoodj/helm-docs/releases/vv1.11.0) \ No newline at end of file diff --git a/helm/dendrite/README.md.gotmpl b/helm/dendrite/README.md.gotmpl index 7c32f7b02..9411733ce 100644 --- a/helm/dendrite/README.md.gotmpl +++ b/helm/dendrite/README.md.gotmpl @@ -10,4 +10,5 @@ {{ template "chart.sourcesSection" . }} {{ template "chart.requirementsSection" . }} {{ template "chart.valuesSection" . }} +{{ template "chart.monitoringSection" . }} {{ template "helm-docs.versionFooter" . }} \ No newline at end of file diff --git a/helm/dendrite/ci/ct-ingress-values.yaml b/helm/dendrite/ci/ct-ingress-values.yaml index 28311d33e..f3f58b5ca 100644 --- a/helm/dendrite/ci/ct-ingress-values.yaml +++ b/helm/dendrite/ci/ct-ingress-values.yaml @@ -11,3 +11,8 @@ dendrite_config: ingress: enabled: true + +# dashboard is an ConfigMap with labels - it does not harm on testing +grafana: + dashboards: + enabled: true diff --git a/helm/dendrite/grafana_dashboards/dendrite-rev1.json b/helm/dendrite/grafana_dashboards/dendrite-rev1.json new file mode 100644 index 000000000..206e8af87 --- /dev/null +++ b/helm/dendrite/grafana_dashboards/dendrite-rev1.json @@ -0,0 +1,1119 @@ +{ + "__inputs": [ + { + "name": "DS_INFLUXDB_DOMOTICA", + "label": "", + "description": "", + "type": "datasource", + "pluginId": "influxdb", + "pluginName": "InfluxDB" + }, + { + "name": "DS_PROMETHEUS", + "label": "Prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "7.4.2" + }, + { + "type": "panel", + "id": "graph", + "name": "Graph", + "version": "" + }, + { + "type": "panel", + "id": "heatmap", + "name": "Heatmap", + "version": "" + }, + { + "type": "datasource", + "id": "influxdb", + "name": "InfluxDB", + "version": "1.0.0" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "stat", + "name": "Stat", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "description": "Dendrite dashboard from https://github.com/matrix-org/dendrite/", + "editable": true, + "gnetId": 13916, + "graphTooltip": 0, + "id": null, + "iteration": 1613683251329, + "links": [], + "panels": [ + { + "collapsed": false, + "datasource": "${DS_INFLUXDB_DOMOTICA}", + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 4, + "panels": [], + "title": "Overview", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 5, + "w": 10, + "x": 0, + "y": 1 + }, + "hiddenSeries": false, + "id": 2, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.4.2", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(process_cpu_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{job}}-{{index}} ", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "CPU usage", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": null, + "format": "percentunit", + "label": null, + "logBase": 1, + "max": "1", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Total number of registered users", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 10, + "y": 1 + }, + "id": 20, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "7.4.2", + "targets": [ + { + "exemplar": false, + "expr": "dendrite_clientapi_reg_users_total", + "instant": false, + "interval": "", + "legendFormat": "Users", + "refId": "A" + } + ], + "title": "Registerd Users", + "type": "stat" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "description": "The number of sync requests that are active right now and are waiting to be woken by a notifier", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 5, + "w": 10, + "x": 14, + "y": 1 + }, + "hiddenSeries": false, + "id": 6, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.4.2", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(dendrite_syncapi_active_sync_requests{instance=\"$instance\"}[$bucket_size]))without (job,index)", + "hide": false, + "interval": "", + "legendFormat": "active", + "refId": "A" + }, + { + "expr": "sum(rate(dendrite_syncapi_waiting_sync_requests{instance=\"$instance\"}[$bucket_size]))without (job,index)", + "hide": false, + "interval": "", + "legendFormat": "waiting", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Sync API", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:232", + "format": "hertz", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "$$hashKey": "object:233", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateOranges", + "exponent": 0.5, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_PROMETHEUS}", + "description": "How long it takes to build and submit a new event from the client API to the roomserver", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 6 + }, + "heatmap": {}, + "hideZeroBuckets": false, + "highlightCards": true, + "id": 24, + "legend": { + "show": false + }, + "pluginVersion": "7.4.2", + "reverseYBuckets": false, + "targets": [ + { + "expr": "dendrite_clientapi_sendevent_duration_millis_bucket{action=\"build\",instance=\"$instance\"}", + "interval": "", + "legendFormat": "{{le}}", + "refId": "A" + } + ], + "title": "Sendevent Duration", + "tooltip": { + "show": true, + "showHistogram": false + }, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": "0", + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "collapsed": false, + "datasource": "${DS_INFLUXDB_DOMOTICA}", + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 11 + }, + "id": 8, + "panels": [], + "title": "Federation", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "description": "Collection of queues for sending transactions to other matrix servers", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 24, + "x": 0, + "y": 12 + }, + "hiddenSeries": false, + "id": 10, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.4.2", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dendrite_federationsender_destination_queues_running", + "interval": "", + "legendFormat": "Queue Running", + "refId": "A" + }, + { + "expr": "dendrite_federationsender_destination_queues_total", + "hide": false, + "interval": "", + "legendFormat": "Queue Total", + "refId": "B" + }, + { + "expr": "dendrite_federationsender_destination_queues_backing_off", + "hide": false, + "interval": "", + "legendFormat": "Backing Off", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Federation Sender Destination", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:443", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:444", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "collapsed": false, + "datasource": "${DS_INFLUXDB_DOMOTICA}", + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 18 + }, + "id": 26, + "panels": [], + "title": "Rooms", + "type": "row" + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateOranges", + "exponent": 0.5, + "mode": "spectrum" + }, + "dataFormat": "timeseries", + "datasource": "${DS_PROMETHEUS}", + "description": "How long it takes the roomserver to process an event", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 19 + }, + "heatmap": {}, + "hideZeroBuckets": false, + "highlightCards": true, + "id": 28, + "legend": { + "show": false + }, + "pluginVersion": "7.4.2", + "reverseYBuckets": false, + "targets": [ + { + "expr": "sum(rate(dendrite_roomserver_processroomevent_duration_millis_bucket{instance=\"$instance\"}[$bucket_size])) by (le)", + "interval": "", + "legendFormat": "{{le}}", + "refId": "A" + } + ], + "title": "Room Event Processing", + "tooltip": { + "show": true, + "showHistogram": false + }, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "collapsed": false, + "datasource": "${DS_INFLUXDB_DOMOTICA}", + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 12, + "panels": [], + "title": "Caches", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 8, + "x": 0, + "y": 27 + }, + "hiddenSeries": false, + "id": 14, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.4.2", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dendrite_caching_in_memory_lru_server_key", + "interval": "", + "legendFormat": "Server keys", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Server Keys", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:667", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:668", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 8, + "x": 8, + "y": 27 + }, + "hiddenSeries": false, + "id": 16, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.4.2", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dendrite_caching_in_memory_lru_federation_event", + "interval": "", + "legendFormat": "Federation Event", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Federation Events", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:784", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:785", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 8, + "x": 16, + "y": 27 + }, + "hiddenSeries": false, + "id": 18, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.4.2", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dendrite_caching_in_memory_lru_roomserver_room_ids", + "interval": "", + "legendFormat": "Room IDs", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Room IDs", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:898", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:899", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "refresh": "10s", + "schemaVersion": 27, + "style": "dark", + "tags": [ + "matrix", + "dendrite" + ], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "Prometheus", + "value": "Prometheus" + }, + "description": null, + "error": null, + "hide": 0, + "includeAll": false, + "label": null, + "multi": false, + "name": "datasource", + "options": [], + "query": "prometheus", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + }, + { + "auto": true, + "auto_count": 100, + "auto_min": "30s", + "current": { + "selected": false, + "text": "auto", + "value": "$__auto_interval_bucket_size" + }, + "description": null, + "error": null, + "hide": 0, + "label": "Bucket Size", + "name": "bucket_size", + "options": [ + { + "selected": true, + "text": "auto", + "value": "$__auto_interval_bucket_size" + }, + { + "selected": false, + "text": "30s", + "value": "30s" + }, + { + "selected": false, + "text": "1m", + "value": "1m" + }, + { + "selected": false, + "text": "2m", + "value": "2m" + }, + { + "selected": false, + "text": "5m", + "value": "5m" + }, + { + "selected": false, + "text": "10m", + "value": "10m" + }, + { + "selected": false, + "text": "15m", + "value": "15m" + } + ], + "query": "30s,1m,2m,5m,10m,15m", + "queryValue": "", + "refresh": 2, + "skipUrlSync": false, + "type": "interval" + }, + { + "allValue": null, + "current": {}, + "datasource": "${DS_PROMETHEUS}", + "definition": "label_values(dendrite_caching_in_memory_lru_roominfo, instance)", + "description": null, + "error": null, + "hide": 0, + "includeAll": false, + "label": null, + "multi": false, + "name": "instance", + "options": [], + "query": { + "query": "label_values(dendrite_caching_in_memory_lru_roominfo, instance)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "allValue": null, + "current": {}, + "datasource": "${DS_PROMETHEUS}", + "definition": "label_values(dendrite_caching_in_memory_lru_roominfo, job)", + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": "Job", + "multi": true, + "name": "job", + "options": [], + "query": { + "query": "label_values(dendrite_caching_in_memory_lru_roominfo, job)", + "refId": "StandardVariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "allValue": ".*", + "current": {}, + "datasource": "${DS_PROMETHEUS}", + "definition": "label_values(dendrite_caching_in_memory_lru_roominfo, index)", + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": null, + "multi": true, + "name": "index", + "options": [], + "query": { + "query": "label_values(dendrite_caching_in_memory_lru_roominfo, index)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 3, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + } + ] + }, + "time": { + "from": "now-3h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Dendrite", + "uid": "RoRt1jEGz", + "version": 8 +} \ No newline at end of file diff --git a/helm/dendrite/templates/configmap_grafana_dashboards.yaml b/helm/dendrite/templates/configmap_grafana_dashboards.yaml new file mode 100644 index 000000000..e2abc4909 --- /dev/null +++ b/helm/dendrite/templates/configmap_grafana_dashboards.yaml @@ -0,0 +1,16 @@ +{{- if .Values.grafana.dashboards.enabled }} +{{- range $path, $bytes := .Files.Glob "grafana_dashboards/*" }} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "dendrite.fullname" $ }}-grafana-dashboards-{{ base $path }} + labels: + {{- include "dendrite.labels" $ | nindent 4 }} + {{- toYaml $.Values.grafana.dashboards.labels | nindent 4 }} + annotations: + {{- toYaml $.Values.grafana.dashboards.annotations | nindent 4 }} +data: + {{- ($.Files.Glob $path ).AsConfig | nindent 2 }} +{{- end }} +{{- end }} diff --git a/helm/dendrite/templates/prometheus-rules.yaml b/helm/dendrite/templates/prometheus-rules.yaml new file mode 100644 index 000000000..6693a4ed9 --- /dev/null +++ b/helm/dendrite/templates/prometheus-rules.yaml @@ -0,0 +1,16 @@ +{{- if and ( .Values.prometheus.rules.enabled ) ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) }} +--- +apiVersion: monitoring.coreos.com/v1 +kind: PrometheusRule +metadata: + name: {{ include "dendrite.fullname" . }} + labels: + {{- include "dendrite.labels" . | nindent 4 }} + {{- toYaml .Values.prometheus.rules.labels | nindent 4 }} +spec: + groups: + {{- if .Values.prometheus.rules.additionalRules }} + - name: {{ template "dendrite.name" . }}-Additional + rules: {{- toYaml .Values.prometheus.rules.additionalRules | nindent 4 }} + {{- end }} +{{- end }} diff --git a/helm/dendrite/templates/secrets.yaml b/helm/dendrite/templates/secrets.yaml index d4b8ecbf2..2084c9a56 100644 --- a/helm/dendrite/templates/secrets.yaml +++ b/helm/dendrite/templates/secrets.yaml @@ -1,15 +1,15 @@ -{{ if (gt (len (.Files.Glob "appservices/*")) 0) }} +{{- if (gt (len (.Files.Glob "appservices/*")) 0) }} --- apiVersion: v1 kind: Secret metadata: name: {{ include "dendrite.fullname" . }}-appservices-conf - namespace: {{ .Release.Namespace }} type: Opaque data: {{ (.Files.Glob "appservices/*").AsSecrets | indent 2 }} -{{ end }} -{{ if and .Values.signing_key.create (not .Values.signing_key.existingSecret) }} +{{- end }} + +{{- if and .Values.signing_key.create (not .Values.signing_key.existingSecret) }} --- apiVersion: v1 kind: Secret @@ -17,17 +17,29 @@ metadata: annotations: helm.sh/resource-policy: keep name: {{ include "dendrite.fullname" . }}-signing-key - namespace: {{ .Release.Namespace }} type: Opaque -{{ end }} +{{- end }} + +{{- with .Values.dendrite_config.global.metrics }} +{{- if .enabled }} +--- +apiVersion: v1 +kind: Secret +metadata: + name: {{ include "dendrite.fullname" $ }}-metrics-basic-auth +type: Opaque +stringData: + user: {{ .basic_auth.user | quote }} + password: {{ .basic_auth.password | quote }} +{{- end }} +{{- end }} --- apiVersion: v1 kind: Secret -type: Opaque metadata: name: {{ include "dendrite.fullname" . }}-conf - namespace: {{ .Release.Namespace }} +type: Opaque stringData: dendrite.yaml: | {{ toYaml ( mustMergeOverwrite .Values.dendrite_config ( fromYaml (include "override.config" .) ) .Values.dendrite_config ) | nindent 4 }} \ No newline at end of file diff --git a/helm/dendrite/templates/servicemonitor.yaml b/helm/dendrite/templates/servicemonitor.yaml new file mode 100644 index 000000000..3819c7d02 --- /dev/null +++ b/helm/dendrite/templates/servicemonitor.yaml @@ -0,0 +1,26 @@ +{{- if and + (and .Values.prometheus.servicemonitor.enabled .Values.dendrite_config.global.metrics.enabled ) + ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) +}} +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ include "dendrite.fullname" . }} + labels: + {{- include "dendrite.labels" . | nindent 4 }} + {{- toYaml .Values.prometheus.servicemonitor.labels | nindent 4 }} +spec: + endpoints: + - port: http + basicAuth: + username: + name: {{ include "dendrite.fullname" . }}-metrics-basic-auth + key: "user" + password: + name: {{ include "dendrite.fullname" . }}-metrics-basic-auth + key: "password" + selector: + matchLabels: + {{- include "dendrite.selectorLabels" . | nindent 6 }} +{{- end }} diff --git a/helm/dendrite/values.yaml b/helm/dendrite/values.yaml index 848241ab6..c219d27f8 100644 --- a/helm/dendrite/values.yaml +++ b/helm/dendrite/values.yaml @@ -348,3 +348,26 @@ ingress: service: type: ClusterIP port: 8008 + +prometheus: + servicemonitor: + # -- Enable ServiceMonitor for Prometheus-Operator for scrape metric-endpoint + enabled: false + # -- Extra Labels on ServiceMonitor for selector of Prometheus Instance + labels: {} + rules: + # -- Enable PrometheusRules for Prometheus-Operator for setup alerting + enabled: false + # -- Extra Labels on PrometheusRules for selector of Prometheus Instance + labels: {} + # -- additional alertrules (no default alertrules are provided) + additionalRules: [] + +grafana: + dashboards: + enabled: false + # -- Extra Labels on ConfigMap for selector of grafana sidecar + labels: + grafana_dashboard: "1" + # -- Extra Annotations on ConfigMap additional config in grafana sidecar + annotations: {} diff --git a/internal/caching/cache_eventstatekeys.go b/internal/caching/cache_eventstatekeys.go index 05580ab05..51e2499d5 100644 --- a/internal/caching/cache_eventstatekeys.go +++ b/internal/caching/cache_eventstatekeys.go @@ -7,6 +7,7 @@ import "github.com/matrix-org/dendrite/roomserver/types" type EventStateKeyCache interface { GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) + GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool) } func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) { @@ -15,4 +16,23 @@ func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (strin func (c Caches) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) { c.RoomServerStateKeys.Set(eventStateKeyNID, eventStateKey) + c.RoomServerStateKeyNIDs.Set(eventStateKey, eventStateKeyNID) +} + +func (c Caches) GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool) { + return c.RoomServerStateKeyNIDs.Get(eventStateKey) +} + +type EventTypeCache interface { + GetEventTypeKey(eventType string) (types.EventTypeNID, bool) + StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string) +} + +func (c Caches) StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string) { + c.RoomServerEventTypeNIDs.Set(eventType, eventTypeNID) + c.RoomServerEventTypes.Set(eventTypeNID, eventType) +} + +func (c Caches) GetEventTypeKey(eventType string) (types.EventTypeNID, bool) { + return c.RoomServerEventTypeNIDs.Get(eventType) } diff --git a/internal/caching/cache_roomservernids.go b/internal/caching/cache_roomservernids.go index 88a5b28bc..734a3a04f 100644 --- a/internal/caching/cache_roomservernids.go +++ b/internal/caching/cache_roomservernids.go @@ -9,19 +9,28 @@ type RoomServerCaches interface { RoomVersionCache RoomServerEventsCache EventStateKeyCache + EventTypeCache } // RoomServerNIDsCache contains the subset of functions needed for // a roomserver NID cache. type RoomServerNIDsCache interface { GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) + // StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) + GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) } func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) { return c.RoomServerRoomIDs.Get(roomNID) } +// StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) { + c.RoomServerRoomNIDs.Set(roomID, roomNID) c.RoomServerRoomIDs.Set(roomNID, roomID) } + +func (c Caches) GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) { + return c.RoomServerRoomNIDs.Get(roomID) +} diff --git a/internal/caching/caches.go b/internal/caching/caches.go index 78c9ab7ee..479920466 100644 --- a/internal/caching/caches.go +++ b/internal/caching/caches.go @@ -23,16 +23,19 @@ import ( // different implementations as long as they satisfy the Cache // interface. type Caches struct { - RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version - ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys - RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID - RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID - RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event - RoomServerStateKeys Cache[types.EventStateKeyNID, string] // event NID -> event state key - FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU - FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU - SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response - LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID + RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version + ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys + RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID + RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID + RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event + RoomServerStateKeys Cache[types.EventStateKeyNID, string] // eventStateKey NID -> event state key + RoomServerStateKeyNIDs Cache[string, types.EventStateKeyNID] // event state key -> eventStateKey NID + RoomServerEventTypeNIDs Cache[string, types.EventTypeNID] // eventType -> eventType NID + RoomServerEventTypes Cache[types.EventTypeNID, string] // eventType NID -> eventType + FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU + FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU + SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response + LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID } // Cache is the interface that an implementation must satisfy. diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go index 49292d0dc..fca93afd1 100644 --- a/internal/caching/impl_ristretto.go +++ b/internal/caching/impl_ristretto.go @@ -40,6 +40,9 @@ const ( spaceSummaryRoomsCache lazyLoadingCache eventStateKeyCache + eventTypeCache + eventTypeNIDCache + eventStateKeyNIDCache ) func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches { @@ -105,6 +108,21 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm Prefix: eventStateKeyCache, MaxAge: maxAge, }, + RoomServerStateKeyNIDs: &RistrettoCachePartition[string, types.EventStateKeyNID]{ // eventStateKey -> eventStateKey NID + cache: cache, + Prefix: eventStateKeyNIDCache, + MaxAge: maxAge, + }, + RoomServerEventTypeNIDs: &RistrettoCachePartition[string, types.EventTypeNID]{ // eventType -> eventType NID + cache: cache, + Prefix: eventTypeCache, + MaxAge: maxAge, + }, + RoomServerEventTypes: &RistrettoCachePartition[types.EventTypeNID, string]{ // eventType NID -> eventType + cache: cache, + Prefix: eventTypeNIDCache, + MaxAge: maxAge, + }, FederationPDUs: &RistrettoCostedCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ // queue NID -> PDU &RistrettoCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ cache: cache, diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index 329e6af7f..fc61b7f4a 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -30,26 +30,6 @@ import ( "github.com/tidwall/sjson" ) -// RoomserverInternalAPIDatabase has the storage APIs needed to implement the alias API. -type RoomserverInternalAPIDatabase interface { - // Save a given room alias with the room ID it refers to. - // Returns an error if there was a problem talking to the database. - SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error - // Look up the room ID a given alias refers to. - // Returns an error if there was a problem talking to the database. - GetRoomIDForAlias(ctx context.Context, alias string) (string, error) - // Look up all aliases referring to a given room ID. - // Returns an error if there was a problem talking to the database. - GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) - // Remove a given room alias. - // Returns an error if there was a problem talking to the database. - RemoveRoomAlias(ctx context.Context, alias string) error - // Look up the room version for a given room. - GetRoomVersionForRoom( - ctx context.Context, roomID string, - ) (gomatrixserverlib.RoomVersion, error) -} - // SetRoomAlias implements alias.RoomserverInternalAPI func (r *RoomserverInternalAPI) SetRoomAlias( ctx context.Context, diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 451b37696..c43b9d049 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -155,7 +155,6 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio r.Unpeeker = &perform.Unpeeker{ ServerName: r.ServerName, Cfg: r.Cfg, - DB: r.DB, FSAPI: r.fsAPI, Inputer: r.Inputer, } diff --git a/roomserver/internal/helpers/auth.go b/roomserver/internal/helpers/auth.go index 03d8bca0b..27c8dd8fa 100644 --- a/roomserver/internal/helpers/auth.go +++ b/roomserver/internal/helpers/auth.go @@ -31,7 +31,8 @@ import ( // the soft-fail bool. func CheckForSoftFail( ctx context.Context, - db storage.Database, + db storage.RoomDatabase, + roomInfo *types.RoomInfo, event *gomatrixserverlib.HeaderedEvent, stateEventIDs []string, ) (bool, error) { @@ -45,16 +46,6 @@ func CheckForSoftFail( return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err) } } else { - // Work out if the room exists. - var roomInfo *types.RoomInfo - roomInfo, err = db.RoomInfo(ctx, event.RoomID()) - if err != nil { - return false, fmt.Errorf("db.RoomNID: %w", err) - } - if roomInfo == nil || roomInfo.IsStub() { - return false, nil - } - // Then get the state entries for the current state snapshot. // We'll use this to check if the event is allowed right now. roomState := state.NewStateResolution(db, roomInfo) @@ -76,7 +67,7 @@ func CheckForSoftFail( stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()}) // Load the actual auth events from the database. - authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries) + authEvents, err := loadAuthEvents(ctx, db, roomInfo.RoomNID, stateNeeded, authStateEntries) if err != nil { return true, fmt.Errorf("loadAuthEvents: %w", err) } @@ -93,7 +84,8 @@ func CheckForSoftFail( // Returns the numeric IDs for the auth events. func CheckAuthEvents( ctx context.Context, - db storage.Database, + db storage.RoomDatabase, + roomNID types.RoomNID, event *gomatrixserverlib.HeaderedEvent, authEventIDs []string, ) ([]types.EventNID, error) { @@ -108,7 +100,7 @@ func CheckAuthEvents( stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()}) // Load the actual auth events from the database. - authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries) + authEvents, err := loadAuthEvents(ctx, db, roomNID, stateNeeded, authStateEntries) if err != nil { return nil, fmt.Errorf("loadAuthEvents: %w", err) } @@ -201,6 +193,7 @@ func (ae *authEvents) lookupEvent(typeNID types.EventTypeNID, stateKey string) * func loadAuthEvents( ctx context.Context, db state.StateResolutionStorage, + roomNID types.RoomNID, needed gomatrixserverlib.StateNeeded, state []types.StateEntry, ) (result authEvents, err error) { @@ -223,7 +216,7 @@ func loadAuthEvents( eventNIDs = append(eventNIDs, eventNID) } } - if result.events, err = db.Events(ctx, eventNIDs); err != nil { + if result.events, err = db.Events(ctx, roomNID, eventNIDs); err != nil { return } roomID := "" diff --git a/roomserver/internal/helpers/helpers.go b/roomserver/internal/helpers/helpers.go index 7efad7af6..ee1610cf2 100644 --- a/roomserver/internal/helpers/helpers.go +++ b/roomserver/internal/helpers/helpers.go @@ -85,7 +85,7 @@ func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverNam return false, err } - events, err := db.Events(ctx, eventNIDs) + events, err := db.Events(ctx, info.RoomNID, eventNIDs) if err != nil { return false, err } @@ -157,7 +157,7 @@ func IsInvitePending( // only keep the "m.room.member" events with a "join" membership. These events are returned. // Returns an error if there was an issue fetching the events. func GetMembershipsAtState( - ctx context.Context, db storage.Database, stateEntries []types.StateEntry, joinedOnly bool, + ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, stateEntries []types.StateEntry, joinedOnly bool, ) ([]types.Event, error) { var eventNIDs types.EventNIDs @@ -177,7 +177,7 @@ func GetMembershipsAtState( util.Unique(eventNIDs) // Get all of the events in this state - stateEvents, err := db.Events(ctx, eventNIDs) + stateEvents, err := db.Events(ctx, roomNID, eventNIDs) if err != nil { return nil, err } @@ -220,16 +220,16 @@ func StateBeforeEvent(ctx context.Context, db storage.Database, info *types.Room return roomState.LoadCombinedStateAfterEvents(ctx, prevState) } -func MembershipAtEvent(ctx context.Context, db storage.Database, info *types.RoomInfo, eventIDs []string, stateKeyNID types.EventStateKeyNID) (map[string][]types.StateEntry, error) { +func MembershipAtEvent(ctx context.Context, db storage.RoomDatabase, info *types.RoomInfo, eventIDs []string, stateKeyNID types.EventStateKeyNID) (map[string][]types.StateEntry, error) { roomState := state.NewStateResolution(db, info) // Fetch the state as it was when this event was fired return roomState.LoadMembershipAtEvent(ctx, eventIDs, stateKeyNID) } func LoadEvents( - ctx context.Context, db storage.Database, eventNIDs []types.EventNID, + ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, eventNIDs []types.EventNID, ) ([]*gomatrixserverlib.Event, error) { - stateEvents, err := db.Events(ctx, eventNIDs) + stateEvents, err := db.Events(ctx, roomNID, eventNIDs) if err != nil { return nil, err } @@ -242,13 +242,13 @@ func LoadEvents( } func LoadStateEvents( - ctx context.Context, db storage.Database, stateEntries []types.StateEntry, + ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, stateEntries []types.StateEntry, ) ([]*gomatrixserverlib.Event, error) { eventNIDs := make([]types.EventNID, len(stateEntries)) for i := range stateEntries { eventNIDs[i] = stateEntries[i].EventNID } - return LoadEvents(ctx, db, eventNIDs) + return LoadEvents(ctx, db, roomNID, eventNIDs) } func CheckServerAllowedToSeeEvent( @@ -326,7 +326,7 @@ func slowGetHistoryVisibilityState( return nil, nil } - return LoadStateEvents(ctx, db, filteredEntries) + return LoadStateEvents(ctx, db, info.RoomNID, filteredEntries) } // TODO: Remove this when we have tests to assert correctness of this function @@ -366,7 +366,7 @@ BFSLoop: next = make([]string, 0) } // Retrieve the events to process from the database. - events, err = db.EventsFromIDs(ctx, front) + events, err = db.EventsFromIDs(ctx, info.RoomNID, front) if err != nil { return resultNIDs, redactEventIDs, err } @@ -467,7 +467,7 @@ func QueryLatestEventsAndState( return err } - stateEvents, err := LoadStateEvents(ctx, db, stateEntries) + stateEvents, err := LoadStateEvents(ctx, db, roomInfo.RoomNID, stateEntries) if err != nil { return err } diff --git a/roomserver/internal/helpers/helpers_test.go b/roomserver/internal/helpers/helpers_test.go index aa5c30e44..62730df1f 100644 --- a/roomserver/internal/helpers/helpers_test.go +++ b/roomserver/internal/helpers/helpers_test.go @@ -38,7 +38,18 @@ func TestIsInvitePendingWithoutNID(t *testing.T) { var authNIDs []types.EventNID for _, x := range room.Events() { - evNID, _, _, _, _, err := db.StoreEvent(context.Background(), x.Event, authNIDs, false) + roomNID, err := db.GetOrCreateRoomNID(context.Background(), x.Unwrap()) + assert.NoError(t, err) + assert.Greater(t, roomNID, types.RoomNID(0)) + + eventTypeNID, err := db.GetOrCreateEventTypeNID(context.Background(), x.Type()) + assert.NoError(t, err) + assert.Greater(t, eventTypeNID, types.EventTypeNID(0)) + + eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(context.Background(), x.StateKey()) + assert.NoError(t, err) + + evNID, _, _, _, err := db.StoreEvent(context.Background(), x.Event, roomNID, eventTypeNID, eventStateKeyNID, authNIDs, false) assert.NoError(t, err) authNIDs = append(authNIDs, evNID) } diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 941311030..2ec19f010 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -76,7 +76,7 @@ type Inputer struct { Cfg *config.RoomServer Base *base.BaseDendrite ProcessContext *process.ProcessContext - DB storage.Database + DB storage.RoomDatabase NATSClient *nats.Conn JetStream nats.JetStreamContext Durable nats.SubOpt diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 67edb3217..fe35efb27 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -308,10 +308,10 @@ func (r *Inputer) processRoomEvent( } var softfail bool - if input.Kind == api.KindNew { + if input.Kind == api.KindNew && !isCreateEvent { // Check that the event passes authentication checks based on the // current room state. - softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs) + softfail, err = helpers.CheckForSoftFail(ctx, r.DB, roomInfo, headered, input.StateEventIDs) if err != nil { logger.WithError(err).Warn("Error authing soft-failed event") } @@ -322,8 +322,8 @@ func (r *Inputer) processRoomEvent( // bother doing this if the event was already rejected as it just ends up // burning CPU time. historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared. - if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected { - historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev) + if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected && !isCreateEvent { + historyVisibility, rejectionErr, err = r.processStateBefore(ctx, roomInfo.RoomNID, input, missingPrev) if err != nil { return fmt.Errorf("r.processStateBefore: %w", err) } @@ -332,8 +332,23 @@ func (r *Inputer) processRoomEvent( } } + roomNID, err := r.DB.GetOrCreateRoomNID(ctx, event) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err) + } + + eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, event.Type()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err) + } + + eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err) + } + // Store the event. - _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) + _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected) if err != nil { return fmt.Errorf("updater.StoreEvent: %w", err) } @@ -474,6 +489,7 @@ func (r *Inputer) handleRemoteRoomUpgrade(ctx context.Context, event *gomatrixse // nolint:nakedret func (r *Inputer) processStateBefore( ctx context.Context, + roomNID types.RoomNID, input *api.InputRoomEvent, missingPrev bool, ) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) { @@ -489,7 +505,7 @@ func (r *Inputer) processStateBefore( case input.HasState: // If we're overriding the state then we need to go and retrieve // them from the database. It's a hard error if they are missing. - stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs) + stateEvents, err := r.DB.EventsFromIDs(ctx, roomNID, input.StateEventIDs) if err != nil { return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err) } @@ -567,6 +583,7 @@ func (r *Inputer) processStateBefore( // we've failed to retrieve the auth chain altogether (in which case // an error is returned) or we've successfully retrieved them all and // they are now in the database. +// nolint: gocyclo func (r *Inputer) fetchAuthEvents( ctx context.Context, logger *logrus.Entry, @@ -587,7 +604,7 @@ func (r *Inputer) fetchAuthEvents( } for _, authEventID := range authEventIDs { - authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID}) + authEvents, err := r.DB.EventsFromIDs(ctx, roomInfo.RoomNID, []string{authEventID}) if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil { unknown[authEventID] = struct{}{} continue @@ -673,8 +690,23 @@ nextAuthEvent: logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID()) } + roomNID, err := r.DB.GetOrCreateRoomNID(ctx, authEvent) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err) + } + + eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, authEvent.Type()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err) + } + + eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey()) + if err != nil { + return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err) + } + // Finally, store the event in the database. - eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected) + eventNID, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected) if err != nil { return fmt.Errorf("updater.StoreEvent: %w", err) } @@ -750,7 +782,7 @@ func (r *Inputer) kickGuests(ctx context.Context, event *gomatrixserverlib.Event return err } - memberEvents, err := r.DB.Events(ctx, membershipNIDs) + memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, membershipNIDs) if err != nil { return err } diff --git a/roomserver/internal/input/input_membership.go b/roomserver/internal/input/input_membership.go index 28a54623b..99a012551 100644 --- a/roomserver/internal/input/input_membership.go +++ b/roomserver/internal/input/input_membership.go @@ -53,7 +53,7 @@ func (r *Inputer) updateMemberships( // Load the event JSON so we can look up the "membership" key. // TODO: Maybe add a membership key to the events table so we can load that // key without having to load the entire event JSON? - events, err := updater.Events(ctx, eventNIDs) + events, err := updater.Events(ctx, 0, eventNIDs) if err != nil { return nil, err } diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 03ac2b38d..c8b7d31dd 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -43,7 +43,7 @@ type missingStateReq struct { log *logrus.Entry virtualHost gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName - db storage.Database + db storage.RoomDatabase roomInfo *types.RoomInfo inputer *Inputer keys gomatrixserverlib.JSONVerifier @@ -395,7 +395,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even for _, entry := range stateEntries { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } - stateEvents, err := t.db.Events(ctx, stateEventNIDs) + stateEvents, err := t.db.Events(ctx, t.roomInfo.RoomNID, stateEventNIDs) if err != nil { t.log.WithError(err).Warnf("failed to load state events locally") return nil @@ -432,7 +432,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even missingEventList = append(missingEventList, evID) } t.log.WithField("count", len(missingEventList)).Debugf("Fetching missing auth events") - events, err := t.db.EventsFromIDs(ctx, missingEventList) + events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList) if err != nil { return nil } @@ -702,7 +702,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo } t.haveEventsMutex.Unlock() - events, err := t.db.EventsFromIDs(ctx, missingEventList) + events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList) if err != nil { return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err) } @@ -844,7 +844,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs if localFirst { // fetch from the roomserver - events, err := t.db.EventsFromIDs(ctx, []string{missingEventID}) + events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, []string{missingEventID}) if err != nil { t.log.Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err) } else if len(events) == 1 { diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 3256162b4..2efe2255f 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -70,7 +70,7 @@ func (r *Admin) PerformAdminEvacuateRoom( return nil } - memberEvents, err := r.DB.Events(ctx, memberNIDs) + memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, memberNIDs) if err != nil { res.Error = &api.PerformError{ Code: api.PerformErrorBadRequest, diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index d9214fdc6..3a3a049db 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -86,7 +86,7 @@ func (r *Backfiller) PerformBackfill( // Retrieve events from the list that was filled previously. If we fail to get // events from the database then attempt once to get them from federation instead. var loadedEvents []*gomatrixserverlib.Event - loadedEvents, err = helpers.LoadEvents(ctx, r.DB, resultNIDs) + loadedEvents, err = helpers.LoadEvents(ctx, r.DB, info.RoomNID, resultNIDs) if err != nil { if _, ok := err.(types.MissingEventError); ok { return r.backfillViaFederation(ctx, request, response) @@ -258,6 +258,7 @@ type backfillRequester struct { eventIDToBeforeStateIDs map[string][]string eventIDMap map[string]*gomatrixserverlib.Event historyVisiblity gomatrixserverlib.HistoryVisibility + roomInfo types.RoomInfo } func newBackfillRequester( @@ -454,14 +455,14 @@ FindSuccessor: return nil } - stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, info, NIDs[eventID]) + stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, info, NIDs[eventID].EventNID) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event") return nil } // possibly return all joined servers depending on history visiblity - memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.virtualHost) + memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, info, stateEntries, b.virtualHost) b.historyVisiblity = visibility if err != nil { logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") @@ -472,7 +473,7 @@ FindSuccessor: // Retrieve all "m.room.member" state events of "join" membership, which // contains the list of users in the room before the event, therefore all // the servers in it at that moment. - memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, stateEntries, true) + memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, info.RoomNID, stateEntries, true) if err != nil { logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event") return nil @@ -523,11 +524,15 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, } eventNIDs := make([]types.EventNID, len(nidMap)) i := 0 + roomNID := b.roomInfo.RoomNID for _, nid := range nidMap { - eventNIDs[i] = nid + eventNIDs[i] = nid.EventNID i++ + if roomNID == 0 { + roomNID = nid.RoomNID + } } - eventsWithNids, err := b.db.Events(ctx, eventNIDs) + eventsWithNids, err := b.db.Events(ctx, roomNID, eventNIDs) if err != nil { logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events") return nil, err @@ -544,7 +549,7 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, // TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just // pull all events and then filter by that table. func joinEventsFromHistoryVisibility( - ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry, + ctx context.Context, db storage.RoomDatabase, roomInfo *types.RoomInfo, stateEntries []types.StateEntry, thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) { var eventNIDs []types.EventNID @@ -557,7 +562,7 @@ func joinEventsFromHistoryVisibility( } // Get all of the events in this state - stateEvents, err := db.Events(ctx, eventNIDs) + stateEvents, err := db.Events(ctx, roomInfo.RoomNID, eventNIDs) if err != nil { // even though the default should be shared, restricting the visibility to joined // feels more secure here. @@ -570,21 +575,17 @@ func joinEventsFromHistoryVisibility( // Can we see events in the room? canSeeEvents := auth.IsServerAllowed(thisServer, true, events) - visibility := gomatrixserverlib.HistoryVisibility(auth.HistoryVisibilityForRoom(events)) + visibility := auth.HistoryVisibilityForRoom(events) if !canSeeEvents { logrus.Infof("ServersAtEvent history not visible to us: %s", visibility) return nil, visibility, nil } // get joined members - info, err := db.RoomInfo(ctx, roomID) - if err != nil { - return nil, visibility, nil - } - joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) + joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, false) if err != nil { return nil, visibility, err } - evs, err := db.Events(ctx, joinEventNIDs) + evs, err := db.Events(ctx, roomInfo.RoomNID, joinEventNIDs) return evs, visibility, err } @@ -601,12 +602,31 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs authNids := make([]types.EventNID, len(nidMap)) i := 0 for _, nid := range nidMap { - authNids[i] = nid + authNids[i] = nid.EventNID i++ } + + roomNID, err = db.GetOrCreateRoomNID(ctx, ev.Unwrap()) + if err != nil { + logrus.WithError(err).Error("failed to get or create roomNID") + continue + } + + eventTypeNID, err := db.GetOrCreateEventTypeNID(ctx, ev.Type()) + if err != nil { + logrus.WithError(err).Error("failed to get or create eventType NID") + continue + } + + eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(ctx, ev.StateKey()) + if err != nil { + logrus.WithError(err).Error("failed to get or create eventStateKey NID") + continue + } + var redactedEventID string var redactionEvent *gomatrixserverlib.Event - eventNID, roomNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false) + eventNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), roomNID, eventTypeNID, eventStateKeyNID, authNids, false) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event") continue diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index 29decd363..9ac9edc4c 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -29,7 +29,7 @@ import ( ) type InboundPeeker struct { - DB storage.Database + DB storage.RoomDatabase Inputer *input.Inputer } @@ -64,7 +64,7 @@ func (r *InboundPeeker) PerformInboundPeek( if err != nil { return err } - latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID}) + latestEvents, err := r.DB.EventsFromIDs(ctx, info.RoomNID, []string{latestEventRefs[0].EventID}) if err != nil { return err } @@ -88,7 +88,7 @@ func (r *InboundPeeker) PerformInboundPeek( if err != nil { return err } - stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries) + stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, info.RoomNID, stateEntries) if err != nil { return err } diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go index f60247cd7..118e1b879 100644 --- a/roomserver/internal/perform/perform_invite.go +++ b/roomserver/internal/perform/perform_invite.go @@ -194,7 +194,7 @@ func (r *Inviter) PerformInvite( // try and see if the user is allowed to make this invite. We can't do // this for invites coming in over federation - we have to take those on // trust. - _, err = helpers.CheckAuthEvents(ctx, r.DB, event, event.AuthEventIDs()) + _, err = helpers.CheckAuthEvents(ctx, r.DB, info.RoomNID, event, event.AuthEventIDs()) if err != nil { logger.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error( "processInviteEvent.checkAuthEvents failed for event", @@ -291,7 +291,7 @@ func buildInviteStrippedState( for _, stateNID := range stateEntries { stateNIDs = append(stateNIDs, stateNID.EventNID) } - stateEvents, err := db.Events(ctx, stateNIDs) + stateEvents, err := db.Events(ctx, info.RoomNID, stateNIDs) if err != nil { return nil, err } diff --git a/roomserver/internal/perform/perform_unpeek.go b/roomserver/internal/perform/perform_unpeek.go index 0d97da4d6..4d714be66 100644 --- a/roomserver/internal/perform/perform_unpeek.go +++ b/roomserver/internal/perform/perform_unpeek.go @@ -22,7 +22,6 @@ import ( fsAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" - "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -31,9 +30,7 @@ type Unpeeker struct { ServerName gomatrixserverlib.ServerName Cfg *config.RoomServer FSAPI fsAPI.RoomserverFederationAPI - DB storage.Database - - Inputer *input.Inputer + Inputer *input.Inputer } // PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationapi. diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 1083bb237..ac34e0ff0 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -102,7 +102,7 @@ func (r *Queryer) QueryStateAfterEvents( return err } - stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries) + stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, info.RoomNID, stateEntries) if err != nil { return err } @@ -138,17 +138,7 @@ func (r *Queryer) QueryEventsByID( request *api.QueryEventsByIDRequest, response *api.QueryEventsByIDResponse, ) error { - eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs) - if err != nil { - return err - } - - var eventNIDs []types.EventNID - for _, nid := range eventNIDMap { - eventNIDs = append(eventNIDs, nid) - } - - events, err := helpers.LoadEvents(ctx, r.DB, eventNIDs) + events, err := r.DB.EventsFromIDs(ctx, 0, request.EventIDs) if err != nil { return err } @@ -196,7 +186,7 @@ func (r *Queryer) QueryMembershipForUser( response.IsInRoom = stillInRoom response.HasBeenInRoom = true - evs, err := r.DB.Events(ctx, []types.EventNID{membershipEventNID}) + evs, err := r.DB.Events(ctx, info.RoomNID, []types.EventNID{membershipEventNID}) if err != nil { return err } @@ -278,10 +268,10 @@ func (r *Queryer) QueryMembershipAtEvent( // once. If we have more than one membership event, we need to get the state for each state entry. if canShortCircuit { if len(memberships) == 0 { - memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) + memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntry, false) } } else { - memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) + memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntry, false) } if err != nil { return fmt.Errorf("unable to get memberships at state: %w", err) @@ -328,7 +318,7 @@ func (r *Queryer) QueryMembershipsForRoom( } return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err) } - events, err = r.DB.Events(ctx, eventNIDs) + events, err = r.DB.Events(ctx, info.RoomNID, eventNIDs) if err != nil { return fmt.Errorf("r.DB.Events: %w", err) } @@ -367,14 +357,14 @@ func (r *Queryer) QueryMembershipsForRoom( return err } - events, err = r.DB.Events(ctx, eventNIDs) + events, err = r.DB.Events(ctx, info.RoomNID, eventNIDs) } else { stateEntries, err = helpers.StateBeforeEvent(ctx, r.DB, info, membershipEventNID) if err != nil { logrus.WithField("membership_event_nid", membershipEventNID).WithError(err).Error("failed to load state before event") return err } - events, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntries, request.JoinedOnly) + events, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntries, request.JoinedOnly) } if err != nil { @@ -425,7 +415,7 @@ func (r *Queryer) QueryServerAllowedToSeeEvent( request *api.QueryServerAllowedToSeeEventRequest, response *api.QueryServerAllowedToSeeEventResponse, ) (err error) { - events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID}) + events, err := r.DB.EventsFromIDs(ctx, 0, []string{request.EventID}) if err != nil { return } @@ -476,7 +466,7 @@ func (r *Queryer) QueryMissingEvents( eventsToFilter[id] = true } } - events, err := r.DB.EventsFromIDs(ctx, front) + events, err := r.DB.EventsFromIDs(ctx, 0, front) if err != nil { return err } @@ -496,7 +486,7 @@ func (r *Queryer) QueryMissingEvents( return err } - loadedEvents, err := helpers.LoadEvents(ctx, r.DB, resultNIDs) + loadedEvents, err := helpers.LoadEvents(ctx, r.DB, info.RoomNID, resultNIDs) if err != nil { return err } @@ -621,11 +611,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo *types.RoomI return nil, rejected, false, err } - events, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries) + events, err := helpers.LoadStateEvents(ctx, r.DB, roomInfo.RoomNID, stateEntries) return events, rejected, false, err } -type eventsFromIDs func(context.Context, []string) ([]types.Event, error) +type eventsFromIDs func(context.Context, types.RoomNID, []string) ([]types.Event, error) // GetAuthChain fetches the auth chain for the given auth events. An auth chain // is the list of all events that are referenced in the auth_events section, and @@ -643,7 +633,7 @@ func GetAuthChain( for len(eventsToFetch) > 0 { // Try to retrieve the events from the database. - events, err := fn(ctx, eventsToFetch) + events, err := fn(ctx, 0, eventsToFetch) if err != nil { return nil, err } @@ -981,7 +971,7 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query // For each of the joined users, let's see if we can get a valid // membership event. for _, joinNID := range joinNIDs { - events, err := r.DB.Events(ctx, []types.EventNID{joinNID}) + events, err := r.DB.Events(ctx, roomInfo.RoomNID, []types.EventNID{joinNID}) if err != nil || len(events) != 1 { continue } diff --git a/roomserver/internal/query/query_test.go b/roomserver/internal/query/query_test.go index 03627ea97..167611575 100644 --- a/roomserver/internal/query/query_test.go +++ b/roomserver/internal/query/query_test.go @@ -80,7 +80,7 @@ func (db *getEventDB) addFakeEvents(graph map[string][]string) error { } // EventsFromIDs implements RoomserverInternalAPIEventDB -func (db *getEventDB) EventsFromIDs(ctx context.Context, eventIDs []string) (res []types.Event, err error) { +func (db *getEventDB) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) (res []types.Event, err error) { for _, evID := range eventIDs { res = append(res, types.Event{ EventNID: 0, diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index c5c51b255..5a8d8b570 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -23,8 +23,7 @@ import ( "github.com/matrix-org/dendrite/setup/base" ) -// NewInternalAPI returns a concerete implementation of the internal API. Callers -// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. +// NewInternalAPI returns a concrete implementation of the internal API. func NewInternalAPI( base *base.BaseDendrite, ) api.RoomserverInternalAPI { diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 1cfde5e4b..cec542d7e 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -41,8 +41,8 @@ type StateResolutionStorage interface { StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error) StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error) AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) - Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error) - EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) + Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error) + EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) } type StateResolution struct { @@ -975,7 +975,7 @@ func (v *StateResolution) resolveConflictsV2( // Store the newly found auth events in the auth set for this event. var authEventMap map[string]types.StateEntry - authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, conflictedEvent, knownAuthEvents) + authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, v.roomInfo.RoomNID, conflictedEvent, knownAuthEvents) if err != nil { return err } @@ -1091,7 +1091,7 @@ func (v *StateResolution) loadStateEvents( eventNIDs = append(eventNIDs, entry.EventNID) } } - events, err := v.db.Events(ctx, eventNIDs) + events, err := v.db.Events(ctx, v.roomInfo.RoomNID, eventNIDs) if err != nil { return nil, nil, err } @@ -1120,7 +1120,7 @@ type authEventLoader struct { // loadAuthEvents loads all of the auth events for a given event recursively, // along with a map that contains state entries for all of the auth events. func (l *authEventLoader) loadAuthEvents( - ctx context.Context, event *gomatrixserverlib.Event, eventMap map[string]types.Event, + ctx context.Context, roomNID types.RoomNID, event *gomatrixserverlib.Event, eventMap map[string]types.Event, ) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) { l.Lock() defer l.Unlock() @@ -1155,7 +1155,7 @@ func (l *authEventLoader) loadAuthEvents( // If we need to get events from the database, go and fetch // those now. if len(l.lookupFromDB) > 0 { - eventsFromDB, err := l.v.db.EventsFromIDs(ctx, l.lookupFromDB) + eventsFromDB, err := l.v.db.EventsFromIDs(ctx, roomNID, l.lookupFromDB) if err != nil { return nil, nil, fmt.Errorf("v.db.EventsFromIDs: %w", err) } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 998915122..88ec56670 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -69,15 +69,12 @@ type Database interface { ) ([]types.StateEntryList, error) // Look up the Events for a list of numeric event IDs. // Returns a sorted list of events. - Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error) + Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error) // Look up snapshot NID for an event ID string SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error) // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. - StoreEvent( - ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, - isRejected bool, - ) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) + StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) // Look up the state entries for a list of string event IDs // Returns an error if the there is an error talking to the database // Returns a types.MissingEventError if the event IDs aren't in the database. @@ -87,7 +84,7 @@ type Database interface { EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error) // Look up the numeric IDs for a list of events. // Returns an error if there was a problem talking to the database. - EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error) + EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventMetadata, error) // Set the state at an event. FIXME TODO: "at" SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error // Lookup the event IDs for a batch of event numeric IDs. @@ -138,7 +135,7 @@ type Database interface { // EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was // not found. // Returns an error if the retrieval went wrong. - EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) + EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) // Publish or unpublish a room from the room directory. PublishRoom(ctx context.Context, roomID, appserviceID, networkID string, publish bool) error // Returns a list of room IDs for rooms which are published. @@ -182,4 +179,36 @@ type Database interface { GetMembershipForHistoryVisibility( ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string, ) (map[string]*gomatrixserverlib.HeaderedEvent, error) + GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error) + GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) + GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error) +} + +type RoomDatabase interface { + // RoomInfo returns room information for the given room ID, or nil if there is no room. + RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) + // IsEventRejected returns true if the event is known and rejected. + IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error) + MissingAuthPrevEvents(ctx context.Context, e *gomatrixserverlib.Event) (missingAuth, missingPrev []string, err error) + // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. + StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) + UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error + GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) + StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error) + GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error) + StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error) + StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) + SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) + BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error) + StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error) + StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error) + AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) + Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error) + EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) + LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) + EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error) + EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error) + GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error) + GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) + GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error) } diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index f4a21c8a7..c935608a5 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -140,10 +140,10 @@ const bulkSelectEventIDSQL = "" + "SELECT event_nid, event_id FROM roomserver_events WHERE event_nid = ANY($1)" const bulkSelectEventNIDSQL = "" + - "SELECT event_id, event_nid FROM roomserver_events WHERE event_id = ANY($1)" + "SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id = ANY($1)" const bulkSelectUnsentEventNIDSQL = "" + - "SELECT event_id, event_nid FROM roomserver_events WHERE event_id = ANY($1) AND sent_to_output = FALSE" + "SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id = ANY($1) AND sent_to_output = FALSE" const selectMaxEventDepthSQL = "" + "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)" @@ -520,20 +520,20 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev // BulkSelectEventNIDs returns a map from string event ID to numeric event ID. // If an event ID is not in the database then it is omitted from the map. -func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) { +func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) { return s.bulkSelectEventNID(ctx, txn, eventIDs, false) } // BulkSelectEventNIDs returns a map from string event ID to numeric event ID // only for events that haven't already been sent to the roomserver output. // If an event ID is not in the database then it is omitted from the map. -func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) { +func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) { return s.bulkSelectEventNID(ctx, txn, eventIDs, true) } // bulkSelectEventNIDs returns a map from string event ID to numeric event ID. // If an event ID is not in the database then it is omitted from the map. -func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventNID, error) { +func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventMetadata, error) { var stmt *sql.Stmt if onlyUnsent { stmt = sqlutil.TxStmt(txn, s.bulkSelectUnsentEventNIDStmt) @@ -545,14 +545,18 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed") - results := make(map[string]types.EventNID, len(eventIDs)) + results := make(map[string]types.EventMetadata, len(eventIDs)) var eventID string var eventNID int64 + var roomNID int64 for rows.Next() { - if err = rows.Scan(&eventID, &eventNID); err != nil { + if err = rows.Scan(&eventID, &eventNID, &roomNID); err != nil { return nil, err } - results[eventID] = types.EventNID(eventNID) + results[eventID] = types.EventMetadata{ + EventNID: types.EventNID(eventNID), + RoomNID: types.RoomNID(roomNID), + } } return results, rows.Err() } diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go index cc880a6c8..5006c3c55 100644 --- a/roomserver/storage/shared/room_updater.go +++ b/roomserver/storage/shared/room_updater.go @@ -116,10 +116,8 @@ func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEvent }) } -func (u *RoomUpdater) Events( - ctx context.Context, eventNIDs []types.EventNID, -) ([]types.Event, error) { - return u.d.events(ctx, u.txn, eventNIDs) +func (u *RoomUpdater) Events(ctx context.Context, _ types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error) { + return u.d.events(ctx, u.txn, u.roomInfo.RoomNID, eventNIDs) } func (u *RoomUpdater) SnapshotNIDFromEventID( @@ -197,12 +195,8 @@ func (u *RoomUpdater) StateAtEventIDs( return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs) } -func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) { - return u.d.eventsFromIDs(ctx, u.txn, eventIDs, false) -} - -func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) { - return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true) +func (u *RoomUpdater) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) { + return u.d.eventsFromIDs(ctx, u.txn, roomNID, eventIDs, NoFilter) } // IsReferenced implements types.RoomRecentEventsUpdater diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index f8672496b..aac5bc365 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/internal/caching" @@ -67,12 +68,25 @@ func (d *Database) eventTypeNIDs( ctx context.Context, txn *sql.Tx, eventTypes []string, ) (map[string]types.EventTypeNID, error) { result := make(map[string]types.EventTypeNID) - nids, err := d.EventTypesTable.BulkSelectEventTypeNID(ctx, txn, eventTypes) - if err != nil { - return nil, err + // first try the cache + fetchEventTypes := make([]string, 0, len(eventTypes)) + for _, eventType := range eventTypes { + eventTypeNID, ok := d.Cache.GetEventTypeKey(eventType) + if ok { + result[eventType] = eventTypeNID + continue + } + fetchEventTypes = append(fetchEventTypes, eventType) } - for eventType, nid := range nids { - result[eventType] = nid + if len(fetchEventTypes) > 0 { + nids, err := d.EventTypesTable.BulkSelectEventTypeNID(ctx, txn, fetchEventTypes) + if err != nil { + return nil, err + } + for eventType, nid := range nids { + result[eventType] = nid + d.Cache.StoreEventTypeKey(nid, eventType) + } } return result, nil } @@ -89,13 +103,15 @@ func (d *Database) EventStateKeys( fetch = append(fetch, nid) } } - fromDB, err := d.EventStateKeysTable.BulkSelectEventStateKey(ctx, nil, fetch) - if err != nil { - return nil, err - } - for nid, key := range fromDB { - result[nid] = key - d.Cache.StoreEventStateKey(nid, key) + if len(fetch) > 0 { + fromDB, err := d.EventStateKeysTable.BulkSelectEventStateKey(ctx, nil, fetch) + if err != nil { + return nil, err + } + for nid, key := range fromDB { + result[nid] = key + d.Cache.StoreEventStateKey(nid, key) + } } return result, nil } @@ -111,16 +127,32 @@ func (d *Database) eventStateKeyNIDs( ) (map[string]types.EventStateKeyNID, error) { result := make(map[string]types.EventStateKeyNID) eventStateKeys = util.UniqueStrings(eventStateKeys) - nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, eventStateKeys) - if err != nil { - return nil, err + // first ask the cache about these keys + fetchEventStateKeys := make([]string, 0, len(eventStateKeys)) + for _, eventStateKey := range eventStateKeys { + eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey) + if ok { + result[eventStateKey] = eventStateKeyNID + continue + } + fetchEventStateKeys = append(fetchEventStateKeys, eventStateKey) } - for eventStateKey, nid := range nids { - result[eventStateKey] = nid + + if len(fetchEventStateKeys) > 0 { + nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, fetchEventStateKeys) + if err != nil { + return nil, err + } + for eventStateKey, nid := range nids { + result[eventStateKey] = nid + d.Cache.StoreEventStateKey(nid, eventStateKey) + } } + // We received some nids, but are still missing some, work out which and create them if len(eventStateKeys) > len(result) { var nid types.EventStateKeyNID + var err error err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { for _, eventStateKey := range eventStateKeys { if _, ok := result[eventStateKey]; ok { @@ -262,7 +294,7 @@ func (d *Database) addState( func (d *Database) EventNIDs( ctx context.Context, eventIDs []string, -) (map[string]types.EventNID, error) { +) (map[string]types.EventMetadata, error) { return d.eventNIDs(ctx, nil, eventIDs, NoFilter) } @@ -275,7 +307,7 @@ const ( func (d *Database) eventNIDs( ctx context.Context, txn *sql.Tx, eventIDs []string, filter UnsentFilter, -) (map[string]types.EventNID, error) { +) (map[string]types.EventMetadata, error) { switch filter { case FilterUnsentOnly: return d.EventsTable.BulkSelectUnsentEventNID(ctx, txn, eventIDs) @@ -325,11 +357,11 @@ func (d *Database) EventIDs( return d.EventsTable.BulkSelectEventID(ctx, nil, eventNIDs) } -func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) { - return d.eventsFromIDs(ctx, nil, eventIDs, NoFilter) +func (d *Database) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) { + return d.eventsFromIDs(ctx, nil, roomNID, eventIDs, NoFilter) } -func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, eventIDs []string, filter UnsentFilter) ([]types.Event, error) { +func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventIDs []string, filter UnsentFilter) ([]types.Event, error) { nidMap, err := d.eventNIDs(ctx, txn, eventIDs, filter) if err != nil { return nil, err @@ -337,10 +369,16 @@ func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, eventIDs []st var nids []types.EventNID for _, nid := range nidMap { - nids = append(nids, nid) + nids = append(nids, nid.EventNID) + if roomNID != 0 && roomNID != nid.RoomNID { + logrus.Errorf("expected events from room %d, but also found %d", roomNID, nid.RoomNID) + } + if roomNID == 0 { + roomNID = nid.RoomNID + } } - return d.events(ctx, txn, nids) + return d.events(ctx, txn, roomNID, nids) } func (d *Database) LatestEventIDs( @@ -480,14 +518,18 @@ func (d *Database) GetInvitesForUser( } func (d *Database) Events( - ctx context.Context, eventNIDs []types.EventNID, + ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID, ) ([]types.Event, error) { - return d.events(ctx, nil, eventNIDs) + return d.events(ctx, nil, roomNID, eventNIDs) } func (d *Database) events( - ctx context.Context, txn *sql.Tx, inputEventNIDs types.EventNIDs, + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, inputEventNIDs types.EventNIDs, ) ([]types.Event, error) { + if roomNID == 0 { + // No need to go further, as we won't find any events for this room. + return nil, nil + } sort.Sort(inputEventNIDs) events := make(map[types.EventNID]*gomatrixserverlib.Event, len(inputEventNIDs)) eventNIDs := make([]types.EventNID, 0, len(inputEventNIDs)) @@ -519,40 +561,34 @@ func (d *Database) events( if err != nil { return nil, err } - eventIDs, _ := d.EventsTable.BulkSelectEventID(ctx, txn, eventNIDs) + eventIDs, err := d.EventsTable.BulkSelectEventID(ctx, txn, eventNIDs) if err != nil { eventIDs = map[types.EventNID]string{} } - var roomNIDs map[types.EventNID]types.RoomNID - roomNIDs, err = d.EventsTable.SelectRoomNIDsForEventNIDs(ctx, txn, eventNIDs) - if err != nil { - return nil, err - } - uniqueRoomNIDs := make(map[types.RoomNID]struct{}) - for _, n := range roomNIDs { - uniqueRoomNIDs[n] = struct{}{} - } - roomVersions := make(map[types.RoomNID]gomatrixserverlib.RoomVersion) - fetchNIDList := make([]types.RoomNID, 0, len(uniqueRoomNIDs)) - for n := range uniqueRoomNIDs { - if roomID, ok := d.Cache.GetRoomServerRoomID(n); ok { - if roomVersion, ok := d.Cache.GetRoomVersion(roomID); ok { - roomVersions[n] = roomVersion - continue - } + + var roomVersion gomatrixserverlib.RoomVersion + var fetchRoomVersion bool + var ok bool + var roomID string + if roomID, ok = d.Cache.GetRoomServerRoomID(roomNID); ok { + roomVersion, ok = d.Cache.GetRoomVersion(roomID) + if !ok { + fetchRoomVersion = true } - fetchNIDList = append(fetchNIDList, n) } - dbRoomVersions, err := d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, txn, fetchNIDList) - if err != nil { - return nil, err - } - for n, v := range dbRoomVersions { - roomVersions[n] = v + + if roomVersion == "" || fetchRoomVersion { + var dbRoomVersions map[types.RoomNID]gomatrixserverlib.RoomVersion + dbRoomVersions, err = d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, txn, []types.RoomNID{roomNID}) + if err != nil { + return nil, err + } + if roomVersion, ok = dbRoomVersions[roomNID]; !ok { + return nil, fmt.Errorf("unable to find roomversion for room %d", roomNID) + } } + for _, eventJSON := range eventJSONs { - roomNID := roomNIDs[eventJSON.EventNID] - roomVersion := roomVersions[roomNID] events[eventJSON.EventNID], err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID( eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion, ) @@ -624,73 +660,71 @@ func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, e return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID) } -func (d *Database) StoreEvent( - ctx context.Context, event *gomatrixserverlib.Event, - authEventNIDs []types.EventNID, isRejected bool, -) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { - return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected) +// GetOrCreateRoomNID gets or creates a new roomNID for the given event +func (d *Database) GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (roomNID types.RoomNID, err error) { + // Get the default room version. If the client doesn't supply a room_version + // then we will use our configured default to create the room. + // https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-createroom + // Note that the below logic depends on the m.room.create event being the + // first event that is persisted to the database when creating or joining a + // room. + var roomVersion gomatrixserverlib.RoomVersion + if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil { + return 0, fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err) + } + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion) + if err != nil { + return err + } + return nil + }) + return roomNID, err } -func (d *Database) storeEvent( - ctx context.Context, updater *RoomUpdater, event *gomatrixserverlib.Event, - authEventNIDs []types.EventNID, isRejected bool, -) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { - var ( - roomNID types.RoomNID - eventTypeNID types.EventTypeNID - eventStateKeyNID types.EventStateKeyNID - eventNID types.EventNID - stateNID types.StateSnapshotNID - redactionEvent *gomatrixserverlib.Event - redactedEventID string - err error - ) - var txn *sql.Tx - if updater != nil && updater.txn != nil { - txn = updater.txn - } - // First writer is with a database-provided transaction, so that NIDs are assigned - // globally outside of the updater context, to help avoid races. +func (d *Database) GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - // TODO: Here we should aim to have two different code paths for new rooms - // vs existing ones. - - // Get the default room version. If the client doesn't supply a room_version - // then we will use our configured default to create the room. - // https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-createroom - // Note that the below logic depends on the m.room.create event being the - // first event that is persisted to the database when creating or joining a - // room. - var roomVersion gomatrixserverlib.RoomVersion - if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil { - return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err) - } - - if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil { - return fmt.Errorf("d.assignRoomNID: %w", err) - } - - if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil { + if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, eventType); err != nil { return fmt.Errorf("d.assignEventTypeNID: %w", err) } + return nil + }) + return eventTypeNID, err +} - eventStateKey := event.StateKey() - // Assigned a numeric ID for the state_key if there is one present. - // Otherwise set the numeric ID for the state_key to 0. - if eventStateKey != nil { - if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil { - return fmt.Errorf("d.assignStateKeyNID: %w", err) - } +func (d *Database) GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (eventStateKeyNID types.EventStateKeyNID, err error) { + if eventStateKey == nil { + return 0, nil + } + + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil { + return fmt.Errorf("d.assignStateKeyNID: %w", err) } - return nil }) if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err) + return 0, err } + + return eventStateKeyNID, nil +} + +func (d *Database) StoreEvent( + ctx context.Context, event *gomatrixserverlib.Event, + roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, + authEventNIDs []types.EventNID, isRejected bool, +) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { + var ( + eventNID types.EventNID + stateNID types.StateSnapshotNID + redactionEvent *gomatrixserverlib.Event + redactedEventID string + err error + ) // Second writer is using the database-provided transaction, probably from the // room updater, for easy roll-back if required. - err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if eventNID, stateNID, err = d.EventsTable.InsertEvent( ctx, txn, @@ -718,7 +752,7 @@ func (d *Database) storeEvent( return fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err) } if !isRejected { // ignore rejected redaction events - redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event) + redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, roomNID, eventNID, event) if err != nil { return fmt.Errorf("d.handleRedactions: %w", err) } @@ -726,7 +760,7 @@ func (d *Database) storeEvent( return nil }) if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err) + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err) } // We should attempt to update the previous events table with any @@ -741,28 +775,28 @@ func (d *Database) storeEvent( // any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater // to do writes however then this will need to go inside `Writer.Do`. succeeded := false - if updater == nil { - var roomInfo *types.RoomInfo - roomInfo, err = d.roomInfo(ctx, txn, event.RoomID()) - if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err) - } - if roomInfo == nil && len(prevEvents) > 0 { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID()) - } - updater, err = d.GetRoomUpdater(ctx, roomInfo) - if err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err) - } - defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) + var roomInfo *types.RoomInfo + roomInfo, err = d.roomInfo(ctx, nil, event.RoomID()) + if err != nil { + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err) } + if roomInfo == nil && len(prevEvents) > 0 { + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID()) + } + var updater *RoomUpdater + updater, err = d.GetRoomUpdater(ctx, roomInfo) + if err != nil { + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err) + } + defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) + if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil { - return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err) + return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err) } succeeded = true } - return eventNID, roomNID, types.StateAtEvent{ + return eventNID, types.StateAtEvent{ BeforeStateSnapshotNID: stateNID, StateEntry: types.StateEntry{ StateKeyTuple: types.StateKeyTuple{ @@ -814,6 +848,10 @@ func (d *Database) MissingAuthPrevEvents( func (d *Database) assignRoomNID( ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion, ) (types.RoomNID, error) { + roomNID, ok := d.Cache.GetRoomServerRoomNID(roomID) + if ok { + return roomNID, nil + } // Check if we already have a numeric ID in the database. roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID) if err == sql.ErrNoRows { @@ -824,12 +862,20 @@ func (d *Database) assignRoomNID( roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID) } } - return roomNID, err + if err != nil { + return 0, err + } + d.Cache.StoreRoomServerRoomID(roomNID, roomID) + return roomNID, nil } func (d *Database) assignEventTypeNID( ctx context.Context, txn *sql.Tx, eventType string, ) (types.EventTypeNID, error) { + eventTypeNID, ok := d.Cache.GetEventTypeKey(eventType) + if ok { + return eventTypeNID, nil + } // Check if we already have a numeric ID in the database. eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType) if err == sql.ErrNoRows { @@ -840,12 +886,20 @@ func (d *Database) assignEventTypeNID( eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType) } } - return eventTypeNID, err + if err != nil { + return 0, err + } + d.Cache.StoreEventTypeKey(eventTypeNID, eventType) + return eventTypeNID, nil } func (d *Database) assignStateKeyNID( ctx context.Context, txn *sql.Tx, eventStateKey string, ) (types.EventStateKeyNID, error) { + eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey) + if ok { + return eventStateKeyNID, nil + } // Check if we already have a numeric ID in the database. eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey) if err == sql.ErrNoRows { @@ -856,6 +910,7 @@ func (d *Database) assignStateKeyNID( eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey) } } + d.Cache.StoreEventStateKey(eventStateKeyNID, eventStateKey) return eventStateKeyNID, err } @@ -899,7 +954,7 @@ func extractRoomVersionFromCreateEvent(event *gomatrixserverlib.Event) ( // // Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction. func (d *Database) handleRedactions( - ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event, + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNID types.EventNID, event *gomatrixserverlib.Event, ) (*gomatrixserverlib.Event, string, error) { var err error isRedactionEvent := event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil @@ -919,7 +974,7 @@ func (d *Database) handleRedactions( } } - redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event) + redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, roomNID, eventNID, event) if err != nil { return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err) } @@ -985,7 +1040,7 @@ func (d *Database) handleRedactions( // loadRedactionPair returns both the redaction event and the redacted event, else nil. func (d *Database) loadRedactionPair( - ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event, + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNID types.EventNID, event *gomatrixserverlib.Event, ) (*types.Event, *types.Event, bool, error) { var redactionEvent, redactedEvent *types.Event var info *tables.RedactionInfo @@ -1017,9 +1072,9 @@ func (d *Database) loadRedactionPair( } if isRedactionEvent { - redactedEvent = d.loadEvent(ctx, info.RedactsEventID) + redactedEvent = d.loadEvent(ctx, roomNID, info.RedactsEventID) } else { - redactionEvent = d.loadEvent(ctx, info.RedactionEventID) + redactionEvent = d.loadEvent(ctx, roomNID, info.RedactionEventID) } return redactionEvent, redactedEvent, info.Validated, nil @@ -1035,7 +1090,7 @@ func (d *Database) applyRedactions(events []types.Event) { } // loadEvent loads a single event or returns nil on any problems/missing event -func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event { +func (d *Database) loadEvent(ctx context.Context, roomNID types.RoomNID, eventID string) *types.Event { nids, err := d.EventNIDs(ctx, []string{eventID}) if err != nil { return nil @@ -1043,7 +1098,7 @@ func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event { if len(nids) == 0 { return nil } - evs, err := d.Events(ctx, []types.EventNID{nids[eventID]}) + evs, err := d.Events(ctx, roomNID, []types.EventNID{nids[eventID].EventNID}) if err != nil { return nil } @@ -1470,13 +1525,19 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - // un-publish old room - if err := d.PublishedTable.UpsertRoomPublished(ctx, txn, oldRoomID, "", "", false); err != nil { - return fmt.Errorf("failed to unpublish room: %w", err) + published, err := d.PublishedTable.SelectPublishedFromRoomID(ctx, txn, oldRoomID) + if err != nil { + return fmt.Errorf("failed to get published room: %w", err) } - // publish new room - if err := d.PublishedTable.UpsertRoomPublished(ctx, txn, newRoomID, "", "", true); err != nil { - return fmt.Errorf("failed to publish room: %w", err) + if published { + // un-publish old room + if err = d.PublishedTable.UpsertRoomPublished(ctx, txn, oldRoomID, "", "", false); err != nil { + return fmt.Errorf("failed to unpublish room: %w", err) + } + // publish new room + if err = d.PublishedTable.UpsertRoomPublished(ctx, txn, newRoomID, "", "", true); err != nil { + return fmt.Errorf("failed to publish room: %w", err) + } } // Migrate any existing room aliases diff --git a/roomserver/storage/shared/storage_test.go b/roomserver/storage/shared/storage_test.go index 58724340c..3acb55a3a 100644 --- a/roomserver/storage/shared/storage_test.go +++ b/roomserver/storage/shared/storage_test.go @@ -3,7 +3,9 @@ package shared_test import ( "context" "testing" + "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/stretchr/testify/assert" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -48,11 +50,14 @@ func mustCreateRoomserverDatabase(t *testing.T, dbType test.DBType) (*shared.Dat } assert.NoError(t, err) + cache := caching.NewRistrettoCache(8*1024*1024, time.Hour, false) + return &shared.Database{ DB: db, EventStateKeysTable: stateKeyTable, MembershipTable: membershipTable, Writer: sqlutil.NewExclusiveWriter(), + Cache: cache, }, func() { err := base.Close() assert.NoError(t, err) diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index f39b9902d..aacf4bc9a 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -110,10 +110,10 @@ const bulkSelectEventIDSQL = "" + "SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)" const bulkSelectEventNIDSQL = "" + - "SELECT event_id, event_nid FROM roomserver_events WHERE event_id IN ($1)" + "SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id IN ($1)" const bulkSelectUnsentEventNIDSQL = "" + - "SELECT event_id, event_nid FROM roomserver_events WHERE sent_to_output = 0 AND event_id IN ($1)" + "SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE sent_to_output = 0 AND event_id IN ($1)" const selectMaxEventDepthSQL = "" + "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)" @@ -572,20 +572,20 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev // BulkSelectEventNIDs returns a map from string event ID to numeric event ID. // If an event ID is not in the database then it is omitted from the map. -func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) { +func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) { return s.bulkSelectEventNID(ctx, txn, eventIDs, false) } // BulkSelectEventNIDs returns a map from string event ID to numeric event ID // only for events that haven't already been sent to the roomserver output. // If an event ID is not in the database then it is omitted from the map. -func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) { +func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) { return s.bulkSelectEventNID(ctx, txn, eventIDs, true) } // bulkSelectEventNIDs returns a map from string event ID to numeric event ID. // If an event ID is not in the database then it is omitted from the map. -func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventNID, error) { +func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventMetadata, error) { /////////////// iEventIDs := make([]interface{}, len(eventIDs)) for k, v := range eventIDs { @@ -609,14 +609,18 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed") - results := make(map[string]types.EventNID, len(eventIDs)) + results := make(map[string]types.EventMetadata, len(eventIDs)) var eventID string var eventNID int64 + var roomNID int64 for rows.Next() { - if err = rows.Scan(&eventID, &eventNID); err != nil { + if err = rows.Scan(&eventID, &eventNID, &roomNID); err != nil { return nil, err } - results[eventID] = types.EventNID(eventNID) + results[eventID] = types.EventMetadata{ + EventNID: types.EventNID(eventNID), + RoomNID: types.RoomNID(roomNID), + } } return results, nil } diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index c7f1064db..4ce2a9c4e 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -63,8 +63,8 @@ type Events interface { BulkSelectEventID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (map[types.EventNID]string, error) // BulkSelectEventNIDs returns a map from string event ID to numeric event ID. // If an event ID is not in the database then it is omitted from the map. - BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) - BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) + BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) + BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error) SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error) diff --git a/roomserver/types/types.go b/roomserver/types/types.go index f40980994..6401a94be 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -38,6 +38,11 @@ type EventNID int64 // RoomNID is a numeric ID for a room. type RoomNID int64 +type EventMetadata struct { + EventNID EventNID + RoomNID RoomNID +} + // StateSnapshotNID is a numeric ID for the state at an event. type StateSnapshotNID int64 diff --git a/setup/jetstream/log.go b/setup/jetstream/log.go new file mode 100644 index 000000000..880f7120b --- /dev/null +++ b/setup/jetstream/log.go @@ -0,0 +1,42 @@ +package jetstream + +import ( + "github.com/nats-io/nats-server/v2/server" + "github.com/sirupsen/logrus" +) + +var _ server.Logger = &LogAdapter{} + +type LogAdapter struct { + entry *logrus.Entry +} + +func NewLogAdapter() *LogAdapter { + return &LogAdapter{ + entry: logrus.StandardLogger().WithField("component", "jetstream"), + } +} + +func (l *LogAdapter) Noticef(format string, v ...interface{}) { + l.entry.Infof(format, v...) +} + +func (l *LogAdapter) Warnf(format string, v ...interface{}) { + l.entry.Warnf(format, v...) +} + +func (l *LogAdapter) Fatalf(format string, v ...interface{}) { + l.entry.Fatalf(format, v...) +} + +func (l *LogAdapter) Errorf(format string, v ...interface{}) { + l.entry.Errorf(format, v...) +} + +func (l *LogAdapter) Debugf(format string, v ...interface{}) { + l.entry.Debugf(format, v...) +} + +func (l *LogAdapter) Tracef(format string, v ...interface{}) { + l.entry.Tracef(format, v...) +} diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index af4eb2949..01fec9ad6 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -40,7 +40,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS } if s.Server == nil { var err error - s.Server, err = natsserver.NewServer(&natsserver.Options{ + opts := &natsserver.Options{ ServerName: "monolith", DontListen: true, JetStream: true, @@ -49,11 +49,12 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS MaxPayload: 16 * 1024 * 1024, NoSigs: true, NoLog: cfg.NoLog, - }) + } + s.Server, err = natsserver.NewServer(opts) if err != nil { panic(err) } - s.ConfigureLogger() + s.SetLogger(NewLogAdapter(), opts.Debug, opts.Trace) go func() { process.ComponentStarted() s.Start()