mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Merge branch 'main' into main
This commit is contained in:
commit
f09085e84f
6
.github/workflows/dendrite.yml
vendored
6
.github/workflows/dendrite.yml
vendored
|
|
@ -4,7 +4,13 @@ on:
|
|||
push:
|
||||
branches:
|
||||
- main
|
||||
paths:
|
||||
- '**.go' # only execute on changes to go files
|
||||
- '.github/workflows/**' # or workflow changes
|
||||
pull_request:
|
||||
paths:
|
||||
- '**.go'
|
||||
- '.github/workflows/**'
|
||||
release:
|
||||
types: [published]
|
||||
workflow_dispatch:
|
||||
|
|
|
|||
2
.github/workflows/gh-pages.yml
vendored
2
.github/workflows/gh-pages.yml
vendored
|
|
@ -4,7 +4,7 @@ name: Deploy GitHub Pages dependencies preinstalled
|
|||
on:
|
||||
# Runs on pushes targeting the default branch
|
||||
push:
|
||||
branches: ["main"]
|
||||
branches: ["gh-pages"]
|
||||
paths:
|
||||
- 'docs/**' # only execute if we have docs changes
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ The key files will now exist in your current working directory, and can be mount
|
|||
|
||||
## Starting Dendrite
|
||||
|
||||
Create your config based on the [`dendrite-sample.monolith.yaml`](https://github.com/matrix-org/dendrite/blob/main/dendrite-sample.monolith.yaml) sample configuration file.
|
||||
Create your config based on the [`dendrite-sample.yaml`](https://github.com/matrix-org/dendrite/blob/main/dendrite-sample.yaml) sample configuration file.
|
||||
|
||||
Then start the deployment:
|
||||
|
||||
|
|
|
|||
|
|
@ -72,8 +72,10 @@ type P2PMonolith struct {
|
|||
port int
|
||||
httpMux *mux.Router
|
||||
pineconeMux *mux.Router
|
||||
httpServer *http.Server
|
||||
listener net.Listener
|
||||
httpListenAddr string
|
||||
stopHandlingEvents chan bool
|
||||
}
|
||||
|
||||
func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite {
|
||||
|
|
@ -199,8 +201,10 @@ func (p *P2PMonolith) StartMonolith() {
|
|||
}
|
||||
|
||||
func (p *P2PMonolith) Stop() {
|
||||
logrus.Info("Stopping monolith")
|
||||
_ = p.BaseDendrite.Close()
|
||||
p.WaitForShutdown()
|
||||
logrus.Info("Stopped monolith")
|
||||
}
|
||||
|
||||
func (p *P2PMonolith) WaitForShutdown() {
|
||||
|
|
@ -209,6 +213,16 @@ func (p *P2PMonolith) WaitForShutdown() {
|
|||
}
|
||||
|
||||
func (p *P2PMonolith) closeAllResources() {
|
||||
logrus.Info("Closing monolith resources")
|
||||
if p.httpServer != nil {
|
||||
_ = p.httpServer.Shutdown(context.Background())
|
||||
}
|
||||
|
||||
select {
|
||||
case p.stopHandlingEvents <- true:
|
||||
default:
|
||||
}
|
||||
|
||||
if p.listener != nil {
|
||||
_ = p.listener.Close()
|
||||
}
|
||||
|
|
@ -224,6 +238,7 @@ func (p *P2PMonolith) closeAllResources() {
|
|||
if p.Router != nil {
|
||||
_ = p.Router.Close()
|
||||
}
|
||||
logrus.Info("Monolith resources closed")
|
||||
}
|
||||
|
||||
func (p *P2PMonolith) Addr() string {
|
||||
|
|
@ -280,7 +295,7 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider,
|
|||
func (p *P2PMonolith) startHTTPServers() {
|
||||
go func() {
|
||||
// Build both ends of a HTTP multiplex.
|
||||
httpServer := &http.Server{
|
||||
p.httpServer = &http.Server{
|
||||
Addr: ":0",
|
||||
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
|
||||
ReadTimeout: 10 * time.Second,
|
||||
|
|
@ -296,12 +311,13 @@ func (p *P2PMonolith) startHTTPServers() {
|
|||
pubkeyString := hex.EncodeToString(pubkey[:])
|
||||
logrus.Info("Listening on ", pubkeyString)
|
||||
|
||||
switch httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) {
|
||||
switch p.httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) {
|
||||
case net.ErrClosed, http.ErrServerClosed:
|
||||
logrus.Info("Stopped listening on ", pubkeyString)
|
||||
default:
|
||||
logrus.Error("Stopped listening on ", pubkeyString)
|
||||
}
|
||||
logrus.Info("Stopped goroutine listening on ", pubkeyString)
|
||||
}()
|
||||
|
||||
p.httpListenAddr = fmt.Sprintf(":%d", p.port)
|
||||
|
|
@ -313,10 +329,12 @@ func (p *P2PMonolith) startHTTPServers() {
|
|||
default:
|
||||
logrus.Error("Stopped listening on ", p.httpListenAddr)
|
||||
}
|
||||
logrus.Info("Stopped goroutine listening on ", p.httpListenAddr)
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *P2PMonolith) startEventHandler() {
|
||||
p.stopHandlingEvents = make(chan bool)
|
||||
stopRelayServerSync := make(chan bool)
|
||||
eLog := logrus.WithField("pinecone", "events")
|
||||
p.RelayRetriever = relay.NewRelayServerRetriever(
|
||||
|
|
@ -329,13 +347,19 @@ func (p *P2PMonolith) startEventHandler() {
|
|||
p.RelayRetriever.InitializeRelayServers(eLog)
|
||||
|
||||
go func(ch <-chan pineconeEvents.Event) {
|
||||
for event := range ch {
|
||||
for {
|
||||
select {
|
||||
case event := <-ch:
|
||||
switch e := event.(type) {
|
||||
case pineconeEvents.PeerAdded:
|
||||
p.RelayRetriever.StartSync()
|
||||
case pineconeEvents.PeerRemoved:
|
||||
if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 {
|
||||
stopRelayServerSync <- true
|
||||
// NOTE: Don't block on channel
|
||||
select {
|
||||
case stopRelayServerSync <- true:
|
||||
default:
|
||||
}
|
||||
}
|
||||
case pineconeEvents.BroadcastReceived:
|
||||
// eLog.Info("Broadcast received from: ", e.PeerID)
|
||||
|
|
@ -347,8 +371,17 @@ func (p *P2PMonolith) startEventHandler() {
|
|||
if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil {
|
||||
eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
|
||||
}
|
||||
}
|
||||
case <-p.stopHandlingEvents:
|
||||
logrus.Info("Stopping processing pinecone events")
|
||||
// NOTE: Don't block on channel
|
||||
select {
|
||||
case stopRelayServerSync <- true:
|
||||
default:
|
||||
}
|
||||
logrus.Info("Stopped processing pinecone events")
|
||||
return
|
||||
}
|
||||
}
|
||||
}(p.EventChannel)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ type RelayServerRetriever struct {
|
|||
relayServersQueried map[gomatrixserverlib.ServerName]bool
|
||||
queriedServersMutex sync.Mutex
|
||||
running atomic.Bool
|
||||
quit <-chan bool
|
||||
quit chan bool
|
||||
}
|
||||
|
||||
func NewRelayServerRetriever(
|
||||
|
|
@ -46,7 +46,7 @@ func NewRelayServerRetriever(
|
|||
serverName gomatrixserverlib.ServerName,
|
||||
federationAPI federationAPI.FederationInternalAPI,
|
||||
relayAPI relayServerAPI.RelayInternalAPI,
|
||||
quit <-chan bool,
|
||||
quit chan bool,
|
||||
) RelayServerRetriever {
|
||||
return RelayServerRetriever{
|
||||
ctx: ctx,
|
||||
|
|
@ -151,6 +151,7 @@ func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
|
|||
if !t.Stop() {
|
||||
<-t.C
|
||||
}
|
||||
logrus.Info("Stopped relay server retriever")
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ func TestRelayRetrieverInitialization(t *testing.T) {
|
|||
"server",
|
||||
&FakeFedAPI{},
|
||||
&FakeRelayAPI{},
|
||||
make(<-chan bool),
|
||||
make(chan bool),
|
||||
)
|
||||
|
||||
retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
|
||||
|
|
@ -74,7 +74,7 @@ func TestRelayRetrieverSync(t *testing.T) {
|
|||
"server",
|
||||
&FakeFedAPI{},
|
||||
&FakeRelayAPI{},
|
||||
make(<-chan bool),
|
||||
make(chan bool),
|
||||
)
|
||||
|
||||
retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ func main() {
|
|||
}
|
||||
|
||||
var eventEntries []types.Event
|
||||
eventEntries, err = roomserverDB.Events(ctx, eventNIDs)
|
||||
eventEntries, err = roomserverDB.Events(ctx, 0, eventNIDs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
@ -145,7 +145,7 @@ func main() {
|
|||
}
|
||||
|
||||
fmt.Println("Fetching", len(eventNIDMap), "state events")
|
||||
eventEntries, err := roomserverDB.Events(ctx, eventNIDs)
|
||||
eventEntries, err := roomserverDB.Events(ctx, 0, eventNIDs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
@ -165,7 +165,7 @@ func main() {
|
|||
}
|
||||
|
||||
fmt.Println("Fetching", len(authEventIDs), "auth events")
|
||||
authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs)
|
||||
authEventEntries, err := roomserverDB.EventsFromIDs(ctx, 0, authEventIDs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
|||
22
helm/dendrite/.helm-docs/monitoring.gotmpl
Normal file
22
helm/dendrite/.helm-docs/monitoring.gotmpl
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
{{ define "chart.monitoringSection" }}
|
||||
## Monitoring
|
||||
|
||||
[](https://grafana.com/grafana/dashboards/13916-dendrite/)
|
||||
|
||||
* Works well with [Prometheus Operator](https://prometheus-operator.dev/) ([Helmchart](https://artifacthub.io/packages/helm/prometheus-community/kube-prometheus-stack)) and their setup of [Grafana](https://grafana.com/grafana/), by enabling the following values:
|
||||
```yaml
|
||||
prometheus:
|
||||
servicemonitor:
|
||||
enabled: true
|
||||
labels:
|
||||
release: "kube-prometheus-stack"
|
||||
rules:
|
||||
enabled: true # will deploy alert rules
|
||||
labels:
|
||||
release: "kube-prometheus-stack"
|
||||
grafana:
|
||||
dashboards:
|
||||
enabled: true # will deploy default dashboards
|
||||
```
|
||||
PS: The label `release=kube-prometheus-stack` is setup with the helmchart of the Prometheus Operator. For Grafana Dashboards it may be necessary to enable scanning in the correct namespaces (or ALL), enabled by `sidecar.dashboards.searchNamespace` in [Helmchart of grafana](https://artifacthub.io/packages/helm/grafana/grafana) (which is part of PrometheusOperator, so `grafana.sidecar.dashboards.searchNamespace`)
|
||||
{{ end }}
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
apiVersion: v2
|
||||
name: dendrite
|
||||
version: "0.11.1"
|
||||
version: "0.11.2"
|
||||
appVersion: "0.11.1"
|
||||
description: Dendrite Matrix Homeserver
|
||||
type: application
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# dendrite
|
||||
|
||||
  
|
||||
  
|
||||
Dendrite Matrix Homeserver
|
||||
|
||||
Status: **NOT PRODUCTION READY**
|
||||
|
|
@ -146,3 +146,35 @@ Create a folder `appservices` and place your configurations in there. The confi
|
|||
| ingress.tls | list | `[]` | |
|
||||
| service.type | string | `"ClusterIP"` | |
|
||||
| service.port | int | `8008` | |
|
||||
| prometheus.servicemonitor.enabled | bool | `false` | Enable ServiceMonitor for Prometheus-Operator for scrape metric-endpoint |
|
||||
| prometheus.servicemonitor.labels | object | `{}` | Extra Labels on ServiceMonitor for selector of Prometheus Instance |
|
||||
| prometheus.rules.enabled | bool | `false` | Enable PrometheusRules for Prometheus-Operator for setup alerting |
|
||||
| prometheus.rules.labels | object | `{}` | Extra Labels on PrometheusRules for selector of Prometheus Instance |
|
||||
| prometheus.rules.additionalRules | list | `[]` | additional alertrules (no default alertrules are provided) |
|
||||
| grafana.dashboards.enabled | bool | `false` | |
|
||||
| grafana.dashboards.labels | object | `{"grafana_dashboard":"1"}` | Extra Labels on ConfigMap for selector of grafana sidecar |
|
||||
| grafana.dashboards.annotations | object | `{}` | Extra Annotations on ConfigMap additional config in grafana sidecar |
|
||||
|
||||
## Monitoring
|
||||
|
||||
[](https://grafana.com/grafana/dashboards/13916-dendrite/)
|
||||
|
||||
* Works well with [Prometheus Operator](https://prometheus-operator.dev/) ([Helmchart](https://artifacthub.io/packages/helm/prometheus-community/kube-prometheus-stack)) and their setup of [Grafana](https://grafana.com/grafana/), by enabling the following values:
|
||||
```yaml
|
||||
prometheus:
|
||||
servicemonitor:
|
||||
enabled: true
|
||||
labels:
|
||||
release: "kube-prometheus-stack"
|
||||
rules:
|
||||
enabled: true # will deploy alert rules
|
||||
labels:
|
||||
release: "kube-prometheus-stack"
|
||||
grafana:
|
||||
dashboards:
|
||||
enabled: true # will deploy default dashboards
|
||||
```
|
||||
PS: The label `release=kube-prometheus-stack` is setup with the helmchart of the Prometheus Operator. For Grafana Dashboards it may be necessary to enable scanning in the correct namespaces (or ALL), enabled by `sidecar.dashboards.searchNamespace` in [Helmchart of grafana](https://artifacthub.io/packages/helm/grafana/grafana) (which is part of PrometheusOperator, so `grafana.sidecar.dashboards.searchNamespace`)
|
||||
|
||||
----------------------------------------------
|
||||
Autogenerated from chart metadata using [helm-docs vv1.11.0](https://github.com/norwoodj/helm-docs/releases/vv1.11.0)
|
||||
|
|
@ -10,4 +10,5 @@
|
|||
{{ template "chart.sourcesSection" . }}
|
||||
{{ template "chart.requirementsSection" . }}
|
||||
{{ template "chart.valuesSection" . }}
|
||||
{{ template "chart.monitoringSection" . }}
|
||||
{{ template "helm-docs.versionFooter" . }}
|
||||
|
|
@ -11,3 +11,8 @@ dendrite_config:
|
|||
|
||||
ingress:
|
||||
enabled: true
|
||||
|
||||
# dashboard is an ConfigMap with labels - it does not harm on testing
|
||||
grafana:
|
||||
dashboards:
|
||||
enabled: true
|
||||
|
|
|
|||
1119
helm/dendrite/grafana_dashboards/dendrite-rev1.json
Normal file
1119
helm/dendrite/grafana_dashboards/dendrite-rev1.json
Normal file
File diff suppressed because it is too large
Load diff
16
helm/dendrite/templates/configmap_grafana_dashboards.yaml
Normal file
16
helm/dendrite/templates/configmap_grafana_dashboards.yaml
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
{{- if .Values.grafana.dashboards.enabled }}
|
||||
{{- range $path, $bytes := .Files.Glob "grafana_dashboards/*" }}
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: {{ include "dendrite.fullname" $ }}-grafana-dashboards-{{ base $path }}
|
||||
labels:
|
||||
{{- include "dendrite.labels" $ | nindent 4 }}
|
||||
{{- toYaml $.Values.grafana.dashboards.labels | nindent 4 }}
|
||||
annotations:
|
||||
{{- toYaml $.Values.grafana.dashboards.annotations | nindent 4 }}
|
||||
data:
|
||||
{{- ($.Files.Glob $path ).AsConfig | nindent 2 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
16
helm/dendrite/templates/prometheus-rules.yaml
Normal file
16
helm/dendrite/templates/prometheus-rules.yaml
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
{{- if and ( .Values.prometheus.rules.enabled ) ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) }}
|
||||
---
|
||||
apiVersion: monitoring.coreos.com/v1
|
||||
kind: PrometheusRule
|
||||
metadata:
|
||||
name: {{ include "dendrite.fullname" . }}
|
||||
labels:
|
||||
{{- include "dendrite.labels" . | nindent 4 }}
|
||||
{{- toYaml .Values.prometheus.rules.labels | nindent 4 }}
|
||||
spec:
|
||||
groups:
|
||||
{{- if .Values.prometheus.rules.additionalRules }}
|
||||
- name: {{ template "dendrite.name" . }}-Additional
|
||||
rules: {{- toYaml .Values.prometheus.rules.additionalRules | nindent 4 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
|
@ -1,15 +1,15 @@
|
|||
{{ if (gt (len (.Files.Glob "appservices/*")) 0) }}
|
||||
{{- if (gt (len (.Files.Glob "appservices/*")) 0) }}
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: {{ include "dendrite.fullname" . }}-appservices-conf
|
||||
namespace: {{ .Release.Namespace }}
|
||||
type: Opaque
|
||||
data:
|
||||
{{ (.Files.Glob "appservices/*").AsSecrets | indent 2 }}
|
||||
{{ end }}
|
||||
{{ if and .Values.signing_key.create (not .Values.signing_key.existingSecret) }}
|
||||
{{- end }}
|
||||
|
||||
{{- if and .Values.signing_key.create (not .Values.signing_key.existingSecret) }}
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
|
|
@ -17,17 +17,29 @@ metadata:
|
|||
annotations:
|
||||
helm.sh/resource-policy: keep
|
||||
name: {{ include "dendrite.fullname" . }}-signing-key
|
||||
namespace: {{ .Release.Namespace }}
|
||||
type: Opaque
|
||||
{{ end }}
|
||||
{{- end }}
|
||||
|
||||
{{- with .Values.dendrite_config.global.metrics }}
|
||||
{{- if .enabled }}
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: {{ include "dendrite.fullname" $ }}-metrics-basic-auth
|
||||
type: Opaque
|
||||
stringData:
|
||||
user: {{ .basic_auth.user | quote }}
|
||||
password: {{ .basic_auth.password | quote }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
type: Opaque
|
||||
metadata:
|
||||
name: {{ include "dendrite.fullname" . }}-conf
|
||||
namespace: {{ .Release.Namespace }}
|
||||
type: Opaque
|
||||
stringData:
|
||||
dendrite.yaml: |
|
||||
{{ toYaml ( mustMergeOverwrite .Values.dendrite_config ( fromYaml (include "override.config" .) ) .Values.dendrite_config ) | nindent 4 }}
|
||||
26
helm/dendrite/templates/servicemonitor.yaml
Normal file
26
helm/dendrite/templates/servicemonitor.yaml
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
{{- if and
|
||||
(and .Values.prometheus.servicemonitor.enabled .Values.dendrite_config.global.metrics.enabled )
|
||||
( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" )
|
||||
}}
|
||||
---
|
||||
apiVersion: monitoring.coreos.com/v1
|
||||
kind: ServiceMonitor
|
||||
metadata:
|
||||
name: {{ include "dendrite.fullname" . }}
|
||||
labels:
|
||||
{{- include "dendrite.labels" . | nindent 4 }}
|
||||
{{- toYaml .Values.prometheus.servicemonitor.labels | nindent 4 }}
|
||||
spec:
|
||||
endpoints:
|
||||
- port: http
|
||||
basicAuth:
|
||||
username:
|
||||
name: {{ include "dendrite.fullname" . }}-metrics-basic-auth
|
||||
key: "user"
|
||||
password:
|
||||
name: {{ include "dendrite.fullname" . }}-metrics-basic-auth
|
||||
key: "password"
|
||||
selector:
|
||||
matchLabels:
|
||||
{{- include "dendrite.selectorLabels" . | nindent 6 }}
|
||||
{{- end }}
|
||||
|
|
@ -348,3 +348,26 @@ ingress:
|
|||
service:
|
||||
type: ClusterIP
|
||||
port: 8008
|
||||
|
||||
prometheus:
|
||||
servicemonitor:
|
||||
# -- Enable ServiceMonitor for Prometheus-Operator for scrape metric-endpoint
|
||||
enabled: false
|
||||
# -- Extra Labels on ServiceMonitor for selector of Prometheus Instance
|
||||
labels: {}
|
||||
rules:
|
||||
# -- Enable PrometheusRules for Prometheus-Operator for setup alerting
|
||||
enabled: false
|
||||
# -- Extra Labels on PrometheusRules for selector of Prometheus Instance
|
||||
labels: {}
|
||||
# -- additional alertrules (no default alertrules are provided)
|
||||
additionalRules: []
|
||||
|
||||
grafana:
|
||||
dashboards:
|
||||
enabled: false
|
||||
# -- Extra Labels on ConfigMap for selector of grafana sidecar
|
||||
labels:
|
||||
grafana_dashboard: "1"
|
||||
# -- Extra Annotations on ConfigMap additional config in grafana sidecar
|
||||
annotations: {}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import "github.com/matrix-org/dendrite/roomserver/types"
|
|||
type EventStateKeyCache interface {
|
||||
GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool)
|
||||
StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string)
|
||||
GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool)
|
||||
}
|
||||
|
||||
func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) {
|
||||
|
|
@ -15,4 +16,23 @@ func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (strin
|
|||
|
||||
func (c Caches) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) {
|
||||
c.RoomServerStateKeys.Set(eventStateKeyNID, eventStateKey)
|
||||
c.RoomServerStateKeyNIDs.Set(eventStateKey, eventStateKeyNID)
|
||||
}
|
||||
|
||||
func (c Caches) GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool) {
|
||||
return c.RoomServerStateKeyNIDs.Get(eventStateKey)
|
||||
}
|
||||
|
||||
type EventTypeCache interface {
|
||||
GetEventTypeKey(eventType string) (types.EventTypeNID, bool)
|
||||
StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string)
|
||||
}
|
||||
|
||||
func (c Caches) StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string) {
|
||||
c.RoomServerEventTypeNIDs.Set(eventType, eventTypeNID)
|
||||
c.RoomServerEventTypes.Set(eventTypeNID, eventType)
|
||||
}
|
||||
|
||||
func (c Caches) GetEventTypeKey(eventType string) (types.EventTypeNID, bool) {
|
||||
return c.RoomServerEventTypeNIDs.Get(eventType)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,19 +9,28 @@ type RoomServerCaches interface {
|
|||
RoomVersionCache
|
||||
RoomServerEventsCache
|
||||
EventStateKeyCache
|
||||
EventTypeCache
|
||||
}
|
||||
|
||||
// RoomServerNIDsCache contains the subset of functions needed for
|
||||
// a roomserver NID cache.
|
||||
type RoomServerNIDsCache interface {
|
||||
GetRoomServerRoomID(roomNID types.RoomNID) (string, bool)
|
||||
// StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID
|
||||
StoreRoomServerRoomID(roomNID types.RoomNID, roomID string)
|
||||
GetRoomServerRoomNID(roomID string) (types.RoomNID, bool)
|
||||
}
|
||||
|
||||
func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) {
|
||||
return c.RoomServerRoomIDs.Get(roomNID)
|
||||
}
|
||||
|
||||
// StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID
|
||||
func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) {
|
||||
c.RoomServerRoomNIDs.Set(roomID, roomNID)
|
||||
c.RoomServerRoomIDs.Set(roomNID, roomID)
|
||||
}
|
||||
|
||||
func (c Caches) GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) {
|
||||
return c.RoomServerRoomNIDs.Get(roomID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,10 @@ type Caches struct {
|
|||
RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID
|
||||
RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID
|
||||
RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event
|
||||
RoomServerStateKeys Cache[types.EventStateKeyNID, string] // event NID -> event state key
|
||||
RoomServerStateKeys Cache[types.EventStateKeyNID, string] // eventStateKey NID -> event state key
|
||||
RoomServerStateKeyNIDs Cache[string, types.EventStateKeyNID] // event state key -> eventStateKey NID
|
||||
RoomServerEventTypeNIDs Cache[string, types.EventTypeNID] // eventType -> eventType NID
|
||||
RoomServerEventTypes Cache[types.EventTypeNID, string] // eventType NID -> eventType
|
||||
FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU
|
||||
FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU
|
||||
SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response
|
||||
|
|
|
|||
|
|
@ -40,6 +40,9 @@ const (
|
|||
spaceSummaryRoomsCache
|
||||
lazyLoadingCache
|
||||
eventStateKeyCache
|
||||
eventTypeCache
|
||||
eventTypeNIDCache
|
||||
eventStateKeyNIDCache
|
||||
)
|
||||
|
||||
func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches {
|
||||
|
|
@ -105,6 +108,21 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm
|
|||
Prefix: eventStateKeyCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomServerStateKeyNIDs: &RistrettoCachePartition[string, types.EventStateKeyNID]{ // eventStateKey -> eventStateKey NID
|
||||
cache: cache,
|
||||
Prefix: eventStateKeyNIDCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomServerEventTypeNIDs: &RistrettoCachePartition[string, types.EventTypeNID]{ // eventType -> eventType NID
|
||||
cache: cache,
|
||||
Prefix: eventTypeCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomServerEventTypes: &RistrettoCachePartition[types.EventTypeNID, string]{ // eventType NID -> eventType
|
||||
cache: cache,
|
||||
Prefix: eventTypeNIDCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
FederationPDUs: &RistrettoCostedCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ // queue NID -> PDU
|
||||
&RistrettoCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{
|
||||
cache: cache,
|
||||
|
|
|
|||
|
|
@ -30,26 +30,6 @@ import (
|
|||
"github.com/tidwall/sjson"
|
||||
)
|
||||
|
||||
// RoomserverInternalAPIDatabase has the storage APIs needed to implement the alias API.
|
||||
type RoomserverInternalAPIDatabase interface {
|
||||
// Save a given room alias with the room ID it refers to.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error
|
||||
// Look up the room ID a given alias refers to.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
GetRoomIDForAlias(ctx context.Context, alias string) (string, error)
|
||||
// Look up all aliases referring to a given room ID.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
|
||||
// Remove a given room alias.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
RemoveRoomAlias(ctx context.Context, alias string) error
|
||||
// Look up the room version for a given room.
|
||||
GetRoomVersionForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (gomatrixserverlib.RoomVersion, error)
|
||||
}
|
||||
|
||||
// SetRoomAlias implements alias.RoomserverInternalAPI
|
||||
func (r *RoomserverInternalAPI) SetRoomAlias(
|
||||
ctx context.Context,
|
||||
|
|
|
|||
|
|
@ -155,7 +155,6 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
|
|||
r.Unpeeker = &perform.Unpeeker{
|
||||
ServerName: r.ServerName,
|
||||
Cfg: r.Cfg,
|
||||
DB: r.DB,
|
||||
FSAPI: r.fsAPI,
|
||||
Inputer: r.Inputer,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,8 @@ import (
|
|||
// the soft-fail bool.
|
||||
func CheckForSoftFail(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
db storage.RoomDatabase,
|
||||
roomInfo *types.RoomInfo,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
stateEventIDs []string,
|
||||
) (bool, error) {
|
||||
|
|
@ -45,16 +46,6 @@ func CheckForSoftFail(
|
|||
return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err)
|
||||
}
|
||||
} else {
|
||||
// Work out if the room exists.
|
||||
var roomInfo *types.RoomInfo
|
||||
roomInfo, err = db.RoomInfo(ctx, event.RoomID())
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("db.RoomNID: %w", err)
|
||||
}
|
||||
if roomInfo == nil || roomInfo.IsStub() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Then get the state entries for the current state snapshot.
|
||||
// We'll use this to check if the event is allowed right now.
|
||||
roomState := state.NewStateResolution(db, roomInfo)
|
||||
|
|
@ -76,7 +67,7 @@ func CheckForSoftFail(
|
|||
stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()})
|
||||
|
||||
// Load the actual auth events from the database.
|
||||
authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries)
|
||||
authEvents, err := loadAuthEvents(ctx, db, roomInfo.RoomNID, stateNeeded, authStateEntries)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("loadAuthEvents: %w", err)
|
||||
}
|
||||
|
|
@ -93,7 +84,8 @@ func CheckForSoftFail(
|
|||
// Returns the numeric IDs for the auth events.
|
||||
func CheckAuthEvents(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
db storage.RoomDatabase,
|
||||
roomNID types.RoomNID,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
authEventIDs []string,
|
||||
) ([]types.EventNID, error) {
|
||||
|
|
@ -108,7 +100,7 @@ func CheckAuthEvents(
|
|||
stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()})
|
||||
|
||||
// Load the actual auth events from the database.
|
||||
authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries)
|
||||
authEvents, err := loadAuthEvents(ctx, db, roomNID, stateNeeded, authStateEntries)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loadAuthEvents: %w", err)
|
||||
}
|
||||
|
|
@ -201,6 +193,7 @@ func (ae *authEvents) lookupEvent(typeNID types.EventTypeNID, stateKey string) *
|
|||
func loadAuthEvents(
|
||||
ctx context.Context,
|
||||
db state.StateResolutionStorage,
|
||||
roomNID types.RoomNID,
|
||||
needed gomatrixserverlib.StateNeeded,
|
||||
state []types.StateEntry,
|
||||
) (result authEvents, err error) {
|
||||
|
|
@ -223,7 +216,7 @@ func loadAuthEvents(
|
|||
eventNIDs = append(eventNIDs, eventNID)
|
||||
}
|
||||
}
|
||||
if result.events, err = db.Events(ctx, eventNIDs); err != nil {
|
||||
if result.events, err = db.Events(ctx, roomNID, eventNIDs); err != nil {
|
||||
return
|
||||
}
|
||||
roomID := ""
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverNam
|
|||
return false, err
|
||||
}
|
||||
|
||||
events, err := db.Events(ctx, eventNIDs)
|
||||
events, err := db.Events(ctx, info.RoomNID, eventNIDs)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -157,7 +157,7 @@ func IsInvitePending(
|
|||
// only keep the "m.room.member" events with a "join" membership. These events are returned.
|
||||
// Returns an error if there was an issue fetching the events.
|
||||
func GetMembershipsAtState(
|
||||
ctx context.Context, db storage.Database, stateEntries []types.StateEntry, joinedOnly bool,
|
||||
ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, stateEntries []types.StateEntry, joinedOnly bool,
|
||||
) ([]types.Event, error) {
|
||||
|
||||
var eventNIDs types.EventNIDs
|
||||
|
|
@ -177,7 +177,7 @@ func GetMembershipsAtState(
|
|||
util.Unique(eventNIDs)
|
||||
|
||||
// Get all of the events in this state
|
||||
stateEvents, err := db.Events(ctx, eventNIDs)
|
||||
stateEvents, err := db.Events(ctx, roomNID, eventNIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -220,16 +220,16 @@ func StateBeforeEvent(ctx context.Context, db storage.Database, info *types.Room
|
|||
return roomState.LoadCombinedStateAfterEvents(ctx, prevState)
|
||||
}
|
||||
|
||||
func MembershipAtEvent(ctx context.Context, db storage.Database, info *types.RoomInfo, eventIDs []string, stateKeyNID types.EventStateKeyNID) (map[string][]types.StateEntry, error) {
|
||||
func MembershipAtEvent(ctx context.Context, db storage.RoomDatabase, info *types.RoomInfo, eventIDs []string, stateKeyNID types.EventStateKeyNID) (map[string][]types.StateEntry, error) {
|
||||
roomState := state.NewStateResolution(db, info)
|
||||
// Fetch the state as it was when this event was fired
|
||||
return roomState.LoadMembershipAtEvent(ctx, eventIDs, stateKeyNID)
|
||||
}
|
||||
|
||||
func LoadEvents(
|
||||
ctx context.Context, db storage.Database, eventNIDs []types.EventNID,
|
||||
ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, eventNIDs []types.EventNID,
|
||||
) ([]*gomatrixserverlib.Event, error) {
|
||||
stateEvents, err := db.Events(ctx, eventNIDs)
|
||||
stateEvents, err := db.Events(ctx, roomNID, eventNIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -242,13 +242,13 @@ func LoadEvents(
|
|||
}
|
||||
|
||||
func LoadStateEvents(
|
||||
ctx context.Context, db storage.Database, stateEntries []types.StateEntry,
|
||||
ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, stateEntries []types.StateEntry,
|
||||
) ([]*gomatrixserverlib.Event, error) {
|
||||
eventNIDs := make([]types.EventNID, len(stateEntries))
|
||||
for i := range stateEntries {
|
||||
eventNIDs[i] = stateEntries[i].EventNID
|
||||
}
|
||||
return LoadEvents(ctx, db, eventNIDs)
|
||||
return LoadEvents(ctx, db, roomNID, eventNIDs)
|
||||
}
|
||||
|
||||
func CheckServerAllowedToSeeEvent(
|
||||
|
|
@ -326,7 +326,7 @@ func slowGetHistoryVisibilityState(
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return LoadStateEvents(ctx, db, filteredEntries)
|
||||
return LoadStateEvents(ctx, db, info.RoomNID, filteredEntries)
|
||||
}
|
||||
|
||||
// TODO: Remove this when we have tests to assert correctness of this function
|
||||
|
|
@ -366,7 +366,7 @@ BFSLoop:
|
|||
next = make([]string, 0)
|
||||
}
|
||||
// Retrieve the events to process from the database.
|
||||
events, err = db.EventsFromIDs(ctx, front)
|
||||
events, err = db.EventsFromIDs(ctx, info.RoomNID, front)
|
||||
if err != nil {
|
||||
return resultNIDs, redactEventIDs, err
|
||||
}
|
||||
|
|
@ -467,7 +467,7 @@ func QueryLatestEventsAndState(
|
|||
return err
|
||||
}
|
||||
|
||||
stateEvents, err := LoadStateEvents(ctx, db, stateEntries)
|
||||
stateEvents, err := LoadStateEvents(ctx, db, roomInfo.RoomNID, stateEntries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,18 @@ func TestIsInvitePendingWithoutNID(t *testing.T) {
|
|||
var authNIDs []types.EventNID
|
||||
for _, x := range room.Events() {
|
||||
|
||||
evNID, _, _, _, _, err := db.StoreEvent(context.Background(), x.Event, authNIDs, false)
|
||||
roomNID, err := db.GetOrCreateRoomNID(context.Background(), x.Unwrap())
|
||||
assert.NoError(t, err)
|
||||
assert.Greater(t, roomNID, types.RoomNID(0))
|
||||
|
||||
eventTypeNID, err := db.GetOrCreateEventTypeNID(context.Background(), x.Type())
|
||||
assert.NoError(t, err)
|
||||
assert.Greater(t, eventTypeNID, types.EventTypeNID(0))
|
||||
|
||||
eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(context.Background(), x.StateKey())
|
||||
assert.NoError(t, err)
|
||||
|
||||
evNID, _, _, _, err := db.StoreEvent(context.Background(), x.Event, roomNID, eventTypeNID, eventStateKeyNID, authNIDs, false)
|
||||
assert.NoError(t, err)
|
||||
authNIDs = append(authNIDs, evNID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ type Inputer struct {
|
|||
Cfg *config.RoomServer
|
||||
Base *base.BaseDendrite
|
||||
ProcessContext *process.ProcessContext
|
||||
DB storage.Database
|
||||
DB storage.RoomDatabase
|
||||
NATSClient *nats.Conn
|
||||
JetStream nats.JetStreamContext
|
||||
Durable nats.SubOpt
|
||||
|
|
|
|||
|
|
@ -308,10 +308,10 @@ func (r *Inputer) processRoomEvent(
|
|||
}
|
||||
|
||||
var softfail bool
|
||||
if input.Kind == api.KindNew {
|
||||
if input.Kind == api.KindNew && !isCreateEvent {
|
||||
// Check that the event passes authentication checks based on the
|
||||
// current room state.
|
||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, roomInfo, headered, input.StateEventIDs)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("Error authing soft-failed event")
|
||||
}
|
||||
|
|
@ -322,8 +322,8 @@ func (r *Inputer) processRoomEvent(
|
|||
// bother doing this if the event was already rejected as it just ends up
|
||||
// burning CPU time.
|
||||
historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared.
|
||||
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected {
|
||||
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
|
||||
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected && !isCreateEvent {
|
||||
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, roomInfo.RoomNID, input, missingPrev)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.processStateBefore: %w", err)
|
||||
}
|
||||
|
|
@ -332,8 +332,23 @@ func (r *Inputer) processRoomEvent(
|
|||
}
|
||||
}
|
||||
|
||||
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
|
||||
}
|
||||
|
||||
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, event.Type())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err)
|
||||
}
|
||||
|
||||
eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err)
|
||||
}
|
||||
|
||||
// Store the event.
|
||||
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
|
@ -474,6 +489,7 @@ func (r *Inputer) handleRemoteRoomUpgrade(ctx context.Context, event *gomatrixse
|
|||
// nolint:nakedret
|
||||
func (r *Inputer) processStateBefore(
|
||||
ctx context.Context,
|
||||
roomNID types.RoomNID,
|
||||
input *api.InputRoomEvent,
|
||||
missingPrev bool,
|
||||
) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) {
|
||||
|
|
@ -489,7 +505,7 @@ func (r *Inputer) processStateBefore(
|
|||
case input.HasState:
|
||||
// If we're overriding the state then we need to go and retrieve
|
||||
// them from the database. It's a hard error if they are missing.
|
||||
stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs)
|
||||
stateEvents, err := r.DB.EventsFromIDs(ctx, roomNID, input.StateEventIDs)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
|
||||
}
|
||||
|
|
@ -567,6 +583,7 @@ func (r *Inputer) processStateBefore(
|
|||
// we've failed to retrieve the auth chain altogether (in which case
|
||||
// an error is returned) or we've successfully retrieved them all and
|
||||
// they are now in the database.
|
||||
// nolint: gocyclo
|
||||
func (r *Inputer) fetchAuthEvents(
|
||||
ctx context.Context,
|
||||
logger *logrus.Entry,
|
||||
|
|
@ -587,7 +604,7 @@ func (r *Inputer) fetchAuthEvents(
|
|||
}
|
||||
|
||||
for _, authEventID := range authEventIDs {
|
||||
authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
|
||||
authEvents, err := r.DB.EventsFromIDs(ctx, roomInfo.RoomNID, []string{authEventID})
|
||||
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
||||
unknown[authEventID] = struct{}{}
|
||||
continue
|
||||
|
|
@ -673,8 +690,23 @@ nextAuthEvent:
|
|||
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
|
||||
}
|
||||
|
||||
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, authEvent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
|
||||
}
|
||||
|
||||
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, authEvent.Type())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err)
|
||||
}
|
||||
|
||||
eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err)
|
||||
}
|
||||
|
||||
// Finally, store the event in the database.
|
||||
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
|
||||
eventNID, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
|
@ -750,7 +782,7 @@ func (r *Inputer) kickGuests(ctx context.Context, event *gomatrixserverlib.Event
|
|||
return err
|
||||
}
|
||||
|
||||
memberEvents, err := r.DB.Events(ctx, membershipNIDs)
|
||||
memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, membershipNIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ func (r *Inputer) updateMemberships(
|
|||
// Load the event JSON so we can look up the "membership" key.
|
||||
// TODO: Maybe add a membership key to the events table so we can load that
|
||||
// key without having to load the entire event JSON?
|
||||
events, err := updater.Events(ctx, eventNIDs)
|
||||
events, err := updater.Events(ctx, 0, eventNIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ type missingStateReq struct {
|
|||
log *logrus.Entry
|
||||
virtualHost gomatrixserverlib.ServerName
|
||||
origin gomatrixserverlib.ServerName
|
||||
db storage.Database
|
||||
db storage.RoomDatabase
|
||||
roomInfo *types.RoomInfo
|
||||
inputer *Inputer
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
|
|
@ -395,7 +395,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
|
|||
for _, entry := range stateEntries {
|
||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||
}
|
||||
stateEvents, err := t.db.Events(ctx, stateEventNIDs)
|
||||
stateEvents, err := t.db.Events(ctx, t.roomInfo.RoomNID, stateEventNIDs)
|
||||
if err != nil {
|
||||
t.log.WithError(err).Warnf("failed to load state events locally")
|
||||
return nil
|
||||
|
|
@ -432,7 +432,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
|
|||
missingEventList = append(missingEventList, evID)
|
||||
}
|
||||
t.log.WithField("count", len(missingEventList)).Debugf("Fetching missing auth events")
|
||||
events, err := t.db.EventsFromIDs(ctx, missingEventList)
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -702,7 +702,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
|||
}
|
||||
t.haveEventsMutex.Unlock()
|
||||
|
||||
events, err := t.db.EventsFromIDs(ctx, missingEventList)
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err)
|
||||
}
|
||||
|
|
@ -844,7 +844,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs
|
|||
|
||||
if localFirst {
|
||||
// fetch from the roomserver
|
||||
events, err := t.db.EventsFromIDs(ctx, []string{missingEventID})
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, []string{missingEventID})
|
||||
if err != nil {
|
||||
t.log.Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
|
||||
} else if len(events) == 1 {
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ func (r *Admin) PerformAdminEvacuateRoom(
|
|||
return nil
|
||||
}
|
||||
|
||||
memberEvents, err := r.DB.Events(ctx, memberNIDs)
|
||||
memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, memberNIDs)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ func (r *Backfiller) PerformBackfill(
|
|||
// Retrieve events from the list that was filled previously. If we fail to get
|
||||
// events from the database then attempt once to get them from federation instead.
|
||||
var loadedEvents []*gomatrixserverlib.Event
|
||||
loadedEvents, err = helpers.LoadEvents(ctx, r.DB, resultNIDs)
|
||||
loadedEvents, err = helpers.LoadEvents(ctx, r.DB, info.RoomNID, resultNIDs)
|
||||
if err != nil {
|
||||
if _, ok := err.(types.MissingEventError); ok {
|
||||
return r.backfillViaFederation(ctx, request, response)
|
||||
|
|
@ -258,6 +258,7 @@ type backfillRequester struct {
|
|||
eventIDToBeforeStateIDs map[string][]string
|
||||
eventIDMap map[string]*gomatrixserverlib.Event
|
||||
historyVisiblity gomatrixserverlib.HistoryVisibility
|
||||
roomInfo types.RoomInfo
|
||||
}
|
||||
|
||||
func newBackfillRequester(
|
||||
|
|
@ -454,14 +455,14 @@ FindSuccessor:
|
|||
return nil
|
||||
}
|
||||
|
||||
stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, info, NIDs[eventID])
|
||||
stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, info, NIDs[eventID].EventNID)
|
||||
if err != nil {
|
||||
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event")
|
||||
return nil
|
||||
}
|
||||
|
||||
// possibly return all joined servers depending on history visiblity
|
||||
memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.virtualHost)
|
||||
memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, info, stateEntries, b.virtualHost)
|
||||
b.historyVisiblity = visibility
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
|
||||
|
|
@ -472,7 +473,7 @@ FindSuccessor:
|
|||
// Retrieve all "m.room.member" state events of "join" membership, which
|
||||
// contains the list of users in the room before the event, therefore all
|
||||
// the servers in it at that moment.
|
||||
memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, stateEntries, true)
|
||||
memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, info.RoomNID, stateEntries, true)
|
||||
if err != nil {
|
||||
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
|
||||
return nil
|
||||
|
|
@ -523,11 +524,15 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
|
|||
}
|
||||
eventNIDs := make([]types.EventNID, len(nidMap))
|
||||
i := 0
|
||||
roomNID := b.roomInfo.RoomNID
|
||||
for _, nid := range nidMap {
|
||||
eventNIDs[i] = nid
|
||||
eventNIDs[i] = nid.EventNID
|
||||
i++
|
||||
if roomNID == 0 {
|
||||
roomNID = nid.RoomNID
|
||||
}
|
||||
eventsWithNids, err := b.db.Events(ctx, eventNIDs)
|
||||
}
|
||||
eventsWithNids, err := b.db.Events(ctx, roomNID, eventNIDs)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
|
||||
return nil, err
|
||||
|
|
@ -544,7 +549,7 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
|
|||
// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just
|
||||
// pull all events and then filter by that table.
|
||||
func joinEventsFromHistoryVisibility(
|
||||
ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry,
|
||||
ctx context.Context, db storage.RoomDatabase, roomInfo *types.RoomInfo, stateEntries []types.StateEntry,
|
||||
thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) {
|
||||
|
||||
var eventNIDs []types.EventNID
|
||||
|
|
@ -557,7 +562,7 @@ func joinEventsFromHistoryVisibility(
|
|||
}
|
||||
|
||||
// Get all of the events in this state
|
||||
stateEvents, err := db.Events(ctx, eventNIDs)
|
||||
stateEvents, err := db.Events(ctx, roomInfo.RoomNID, eventNIDs)
|
||||
if err != nil {
|
||||
// even though the default should be shared, restricting the visibility to joined
|
||||
// feels more secure here.
|
||||
|
|
@ -570,21 +575,17 @@ func joinEventsFromHistoryVisibility(
|
|||
|
||||
// Can we see events in the room?
|
||||
canSeeEvents := auth.IsServerAllowed(thisServer, true, events)
|
||||
visibility := gomatrixserverlib.HistoryVisibility(auth.HistoryVisibilityForRoom(events))
|
||||
visibility := auth.HistoryVisibilityForRoom(events)
|
||||
if !canSeeEvents {
|
||||
logrus.Infof("ServersAtEvent history not visible to us: %s", visibility)
|
||||
return nil, visibility, nil
|
||||
}
|
||||
// get joined members
|
||||
info, err := db.RoomInfo(ctx, roomID)
|
||||
if err != nil {
|
||||
return nil, visibility, nil
|
||||
}
|
||||
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
|
||||
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, false)
|
||||
if err != nil {
|
||||
return nil, visibility, err
|
||||
}
|
||||
evs, err := db.Events(ctx, joinEventNIDs)
|
||||
evs, err := db.Events(ctx, roomInfo.RoomNID, joinEventNIDs)
|
||||
return evs, visibility, err
|
||||
}
|
||||
|
||||
|
|
@ -601,12 +602,31 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
|
|||
authNids := make([]types.EventNID, len(nidMap))
|
||||
i := 0
|
||||
for _, nid := range nidMap {
|
||||
authNids[i] = nid
|
||||
authNids[i] = nid.EventNID
|
||||
i++
|
||||
}
|
||||
|
||||
roomNID, err = db.GetOrCreateRoomNID(ctx, ev.Unwrap())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to get or create roomNID")
|
||||
continue
|
||||
}
|
||||
|
||||
eventTypeNID, err := db.GetOrCreateEventTypeNID(ctx, ev.Type())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to get or create eventType NID")
|
||||
continue
|
||||
}
|
||||
|
||||
eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(ctx, ev.StateKey())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to get or create eventStateKey NID")
|
||||
continue
|
||||
}
|
||||
|
||||
var redactedEventID string
|
||||
var redactionEvent *gomatrixserverlib.Event
|
||||
eventNID, roomNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false)
|
||||
eventNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), roomNID, eventTypeNID, eventStateKeyNID, authNids, false)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
|
||||
continue
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import (
|
|||
)
|
||||
|
||||
type InboundPeeker struct {
|
||||
DB storage.Database
|
||||
DB storage.RoomDatabase
|
||||
Inputer *input.Inputer
|
||||
}
|
||||
|
||||
|
|
@ -64,7 +64,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID})
|
||||
latestEvents, err := r.DB.EventsFromIDs(ctx, info.RoomNID, []string{latestEventRefs[0].EventID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -88,7 +88,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries)
|
||||
stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, info.RoomNID, stateEntries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -194,7 +194,7 @@ func (r *Inviter) PerformInvite(
|
|||
// try and see if the user is allowed to make this invite. We can't do
|
||||
// this for invites coming in over federation - we have to take those on
|
||||
// trust.
|
||||
_, err = helpers.CheckAuthEvents(ctx, r.DB, event, event.AuthEventIDs())
|
||||
_, err = helpers.CheckAuthEvents(ctx, r.DB, info.RoomNID, event, event.AuthEventIDs())
|
||||
if err != nil {
|
||||
logger.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error(
|
||||
"processInviteEvent.checkAuthEvents failed for event",
|
||||
|
|
@ -291,7 +291,7 @@ func buildInviteStrippedState(
|
|||
for _, stateNID := range stateEntries {
|
||||
stateNIDs = append(stateNIDs, stateNID.EventNID)
|
||||
}
|
||||
stateEvents, err := db.Events(ctx, stateNIDs)
|
||||
stateEvents, err := db.Events(ctx, info.RoomNID, stateNIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
|
@ -31,8 +30,6 @@ type Unpeeker struct {
|
|||
ServerName gomatrixserverlib.ServerName
|
||||
Cfg *config.RoomServer
|
||||
FSAPI fsAPI.RoomserverFederationAPI
|
||||
DB storage.Database
|
||||
|
||||
Inputer *input.Inputer
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
|||
return err
|
||||
}
|
||||
|
||||
stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries)
|
||||
stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, info.RoomNID, stateEntries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -138,17 +138,7 @@ func (r *Queryer) QueryEventsByID(
|
|||
request *api.QueryEventsByIDRequest,
|
||||
response *api.QueryEventsByIDResponse,
|
||||
) error {
|
||||
eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var eventNIDs []types.EventNID
|
||||
for _, nid := range eventNIDMap {
|
||||
eventNIDs = append(eventNIDs, nid)
|
||||
}
|
||||
|
||||
events, err := helpers.LoadEvents(ctx, r.DB, eventNIDs)
|
||||
events, err := r.DB.EventsFromIDs(ctx, 0, request.EventIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -196,7 +186,7 @@ func (r *Queryer) QueryMembershipForUser(
|
|||
response.IsInRoom = stillInRoom
|
||||
response.HasBeenInRoom = true
|
||||
|
||||
evs, err := r.DB.Events(ctx, []types.EventNID{membershipEventNID})
|
||||
evs, err := r.DB.Events(ctx, info.RoomNID, []types.EventNID{membershipEventNID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -278,10 +268,10 @@ func (r *Queryer) QueryMembershipAtEvent(
|
|||
// once. If we have more than one membership event, we need to get the state for each state entry.
|
||||
if canShortCircuit {
|
||||
if len(memberships) == 0 {
|
||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntry, false)
|
||||
}
|
||||
} else {
|
||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntry, false)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get memberships at state: %w", err)
|
||||
|
|
@ -328,7 +318,7 @@ func (r *Queryer) QueryMembershipsForRoom(
|
|||
}
|
||||
return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err)
|
||||
}
|
||||
events, err = r.DB.Events(ctx, eventNIDs)
|
||||
events, err = r.DB.Events(ctx, info.RoomNID, eventNIDs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.Events: %w", err)
|
||||
}
|
||||
|
|
@ -367,14 +357,14 @@ func (r *Queryer) QueryMembershipsForRoom(
|
|||
return err
|
||||
}
|
||||
|
||||
events, err = r.DB.Events(ctx, eventNIDs)
|
||||
events, err = r.DB.Events(ctx, info.RoomNID, eventNIDs)
|
||||
} else {
|
||||
stateEntries, err = helpers.StateBeforeEvent(ctx, r.DB, info, membershipEventNID)
|
||||
if err != nil {
|
||||
logrus.WithField("membership_event_nid", membershipEventNID).WithError(err).Error("failed to load state before event")
|
||||
return err
|
||||
}
|
||||
events, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntries, request.JoinedOnly)
|
||||
events, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntries, request.JoinedOnly)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -425,7 +415,7 @@ func (r *Queryer) QueryServerAllowedToSeeEvent(
|
|||
request *api.QueryServerAllowedToSeeEventRequest,
|
||||
response *api.QueryServerAllowedToSeeEventResponse,
|
||||
) (err error) {
|
||||
events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID})
|
||||
events, err := r.DB.EventsFromIDs(ctx, 0, []string{request.EventID})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
@ -476,7 +466,7 @@ func (r *Queryer) QueryMissingEvents(
|
|||
eventsToFilter[id] = true
|
||||
}
|
||||
}
|
||||
events, err := r.DB.EventsFromIDs(ctx, front)
|
||||
events, err := r.DB.EventsFromIDs(ctx, 0, front)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -496,7 +486,7 @@ func (r *Queryer) QueryMissingEvents(
|
|||
return err
|
||||
}
|
||||
|
||||
loadedEvents, err := helpers.LoadEvents(ctx, r.DB, resultNIDs)
|
||||
loadedEvents, err := helpers.LoadEvents(ctx, r.DB, info.RoomNID, resultNIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -621,11 +611,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo *types.RoomI
|
|||
return nil, rejected, false, err
|
||||
}
|
||||
|
||||
events, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries)
|
||||
events, err := helpers.LoadStateEvents(ctx, r.DB, roomInfo.RoomNID, stateEntries)
|
||||
return events, rejected, false, err
|
||||
}
|
||||
|
||||
type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
|
||||
type eventsFromIDs func(context.Context, types.RoomNID, []string) ([]types.Event, error)
|
||||
|
||||
// GetAuthChain fetches the auth chain for the given auth events. An auth chain
|
||||
// is the list of all events that are referenced in the auth_events section, and
|
||||
|
|
@ -643,7 +633,7 @@ func GetAuthChain(
|
|||
|
||||
for len(eventsToFetch) > 0 {
|
||||
// Try to retrieve the events from the database.
|
||||
events, err := fn(ctx, eventsToFetch)
|
||||
events, err := fn(ctx, 0, eventsToFetch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -981,7 +971,7 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
|
|||
// For each of the joined users, let's see if we can get a valid
|
||||
// membership event.
|
||||
for _, joinNID := range joinNIDs {
|
||||
events, err := r.DB.Events(ctx, []types.EventNID{joinNID})
|
||||
events, err := r.DB.Events(ctx, roomInfo.RoomNID, []types.EventNID{joinNID})
|
||||
if err != nil || len(events) != 1 {
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ func (db *getEventDB) addFakeEvents(graph map[string][]string) error {
|
|||
}
|
||||
|
||||
// EventsFromIDs implements RoomserverInternalAPIEventDB
|
||||
func (db *getEventDB) EventsFromIDs(ctx context.Context, eventIDs []string) (res []types.Event, err error) {
|
||||
func (db *getEventDB) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) (res []types.Event, err error) {
|
||||
for _, evID := range eventIDs {
|
||||
res = append(res, types.Event{
|
||||
EventNID: 0,
|
||||
|
|
|
|||
|
|
@ -23,8 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/setup/base"
|
||||
)
|
||||
|
||||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
// NewInternalAPI returns a concrete implementation of the internal API.
|
||||
func NewInternalAPI(
|
||||
base *base.BaseDendrite,
|
||||
) api.RoomserverInternalAPI {
|
||||
|
|
|
|||
|
|
@ -41,8 +41,8 @@ type StateResolutionStorage interface {
|
|||
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
||||
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
|
||||
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
|
||||
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
|
||||
Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||
EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error)
|
||||
}
|
||||
|
||||
type StateResolution struct {
|
||||
|
|
@ -975,7 +975,7 @@ func (v *StateResolution) resolveConflictsV2(
|
|||
|
||||
// Store the newly found auth events in the auth set for this event.
|
||||
var authEventMap map[string]types.StateEntry
|
||||
authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, conflictedEvent, knownAuthEvents)
|
||||
authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, v.roomInfo.RoomNID, conflictedEvent, knownAuthEvents)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -1091,7 +1091,7 @@ func (v *StateResolution) loadStateEvents(
|
|||
eventNIDs = append(eventNIDs, entry.EventNID)
|
||||
}
|
||||
}
|
||||
events, err := v.db.Events(ctx, eventNIDs)
|
||||
events, err := v.db.Events(ctx, v.roomInfo.RoomNID, eventNIDs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -1120,7 +1120,7 @@ type authEventLoader struct {
|
|||
// loadAuthEvents loads all of the auth events for a given event recursively,
|
||||
// along with a map that contains state entries for all of the auth events.
|
||||
func (l *authEventLoader) loadAuthEvents(
|
||||
ctx context.Context, event *gomatrixserverlib.Event, eventMap map[string]types.Event,
|
||||
ctx context.Context, roomNID types.RoomNID, event *gomatrixserverlib.Event, eventMap map[string]types.Event,
|
||||
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
|
@ -1155,7 +1155,7 @@ func (l *authEventLoader) loadAuthEvents(
|
|||
// If we need to get events from the database, go and fetch
|
||||
// those now.
|
||||
if len(l.lookupFromDB) > 0 {
|
||||
eventsFromDB, err := l.v.db.EventsFromIDs(ctx, l.lookupFromDB)
|
||||
eventsFromDB, err := l.v.db.EventsFromIDs(ctx, roomNID, l.lookupFromDB)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("v.db.EventsFromIDs: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,15 +69,12 @@ type Database interface {
|
|||
) ([]types.StateEntryList, error)
|
||||
// Look up the Events for a list of numeric event IDs.
|
||||
// Returns a sorted list of events.
|
||||
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||
Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||
// Look up snapshot NID for an event ID string
|
||||
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
|
||||
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
|
||||
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
|
||||
StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID,
|
||||
isRejected bool,
|
||||
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
// Look up the state entries for a list of string event IDs
|
||||
// Returns an error if the there is an error talking to the database
|
||||
// Returns a types.MissingEventError if the event IDs aren't in the database.
|
||||
|
|
@ -87,7 +84,7 @@ type Database interface {
|
|||
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
|
||||
// Look up the numeric IDs for a list of events.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
|
||||
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventMetadata, error)
|
||||
// Set the state at an event. FIXME TODO: "at"
|
||||
SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error
|
||||
// Lookup the event IDs for a batch of event numeric IDs.
|
||||
|
|
@ -138,7 +135,7 @@ type Database interface {
|
|||
// EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was
|
||||
// not found.
|
||||
// Returns an error if the retrieval went wrong.
|
||||
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
|
||||
EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error)
|
||||
// Publish or unpublish a room from the room directory.
|
||||
PublishRoom(ctx context.Context, roomID, appserviceID, networkID string, publish bool) error
|
||||
// Returns a list of room IDs for rooms which are published.
|
||||
|
|
@ -182,4 +179,36 @@ type Database interface {
|
|||
GetMembershipForHistoryVisibility(
|
||||
ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string,
|
||||
) (map[string]*gomatrixserverlib.HeaderedEvent, error)
|
||||
GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error)
|
||||
GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error)
|
||||
GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error)
|
||||
}
|
||||
|
||||
type RoomDatabase interface {
|
||||
// RoomInfo returns room information for the given room ID, or nil if there is no room.
|
||||
RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
|
||||
// IsEventRejected returns true if the event is known and rejected.
|
||||
IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error)
|
||||
MissingAuthPrevEvents(ctx context.Context, e *gomatrixserverlib.Event) (missingAuth, missingPrev []string, err error)
|
||||
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
|
||||
StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error
|
||||
GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error)
|
||||
StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)
|
||||
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error)
|
||||
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
|
||||
StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
|
||||
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
|
||||
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
|
||||
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
||||
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
|
||||
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
|
||||
Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||
EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error)
|
||||
LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error)
|
||||
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
|
||||
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
|
||||
GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error)
|
||||
GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error)
|
||||
GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,10 +140,10 @@ const bulkSelectEventIDSQL = "" +
|
|||
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||
|
||||
const bulkSelectEventNIDSQL = "" +
|
||||
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id = ANY($1)"
|
||||
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id = ANY($1)"
|
||||
|
||||
const bulkSelectUnsentEventNIDSQL = "" +
|
||||
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id = ANY($1) AND sent_to_output = FALSE"
|
||||
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id = ANY($1) AND sent_to_output = FALSE"
|
||||
|
||||
const selectMaxEventDepthSQL = "" +
|
||||
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||
|
|
@ -520,20 +520,20 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev
|
|||
|
||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||
// If an event ID is not in the database then it is omitted from the map.
|
||||
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
||||
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, false)
|
||||
}
|
||||
|
||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID
|
||||
// only for events that haven't already been sent to the roomserver output.
|
||||
// If an event ID is not in the database then it is omitted from the map.
|
||||
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
||||
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, true)
|
||||
}
|
||||
|
||||
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||
// If an event ID is not in the database then it is omitted from the map.
|
||||
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventNID, error) {
|
||||
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventMetadata, error) {
|
||||
var stmt *sql.Stmt
|
||||
if onlyUnsent {
|
||||
stmt = sqlutil.TxStmt(txn, s.bulkSelectUnsentEventNIDStmt)
|
||||
|
|
@ -545,14 +545,18 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e
|
|||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
||||
results := make(map[string]types.EventNID, len(eventIDs))
|
||||
results := make(map[string]types.EventMetadata, len(eventIDs))
|
||||
var eventID string
|
||||
var eventNID int64
|
||||
var roomNID int64
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(&eventID, &eventNID); err != nil {
|
||||
if err = rows.Scan(&eventID, &eventNID, &roomNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
results[eventID] = types.EventNID(eventNID)
|
||||
results[eventID] = types.EventMetadata{
|
||||
EventNID: types.EventNID(eventNID),
|
||||
RoomNID: types.RoomNID(roomNID),
|
||||
}
|
||||
}
|
||||
return results, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -116,10 +116,8 @@ func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEvent
|
|||
})
|
||||
}
|
||||
|
||||
func (u *RoomUpdater) Events(
|
||||
ctx context.Context, eventNIDs []types.EventNID,
|
||||
) ([]types.Event, error) {
|
||||
return u.d.events(ctx, u.txn, eventNIDs)
|
||||
func (u *RoomUpdater) Events(ctx context.Context, _ types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error) {
|
||||
return u.d.events(ctx, u.txn, u.roomInfo.RoomNID, eventNIDs)
|
||||
}
|
||||
|
||||
func (u *RoomUpdater) SnapshotNIDFromEventID(
|
||||
|
|
@ -197,12 +195,8 @@ func (u *RoomUpdater) StateAtEventIDs(
|
|||
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
|
||||
}
|
||||
|
||||
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
||||
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, false)
|
||||
}
|
||||
|
||||
func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
||||
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true)
|
||||
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) {
|
||||
return u.d.eventsFromIDs(ctx, u.txn, roomNID, eventIDs, NoFilter)
|
||||
}
|
||||
|
||||
// IsReferenced implements types.RoomRecentEventsUpdater
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
|
|
@ -67,12 +68,25 @@ func (d *Database) eventTypeNIDs(
|
|||
ctx context.Context, txn *sql.Tx, eventTypes []string,
|
||||
) (map[string]types.EventTypeNID, error) {
|
||||
result := make(map[string]types.EventTypeNID)
|
||||
nids, err := d.EventTypesTable.BulkSelectEventTypeNID(ctx, txn, eventTypes)
|
||||
// first try the cache
|
||||
fetchEventTypes := make([]string, 0, len(eventTypes))
|
||||
for _, eventType := range eventTypes {
|
||||
eventTypeNID, ok := d.Cache.GetEventTypeKey(eventType)
|
||||
if ok {
|
||||
result[eventType] = eventTypeNID
|
||||
continue
|
||||
}
|
||||
fetchEventTypes = append(fetchEventTypes, eventType)
|
||||
}
|
||||
if len(fetchEventTypes) > 0 {
|
||||
nids, err := d.EventTypesTable.BulkSelectEventTypeNID(ctx, txn, fetchEventTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for eventType, nid := range nids {
|
||||
result[eventType] = nid
|
||||
d.Cache.StoreEventTypeKey(nid, eventType)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
@ -89,6 +103,7 @@ func (d *Database) EventStateKeys(
|
|||
fetch = append(fetch, nid)
|
||||
}
|
||||
}
|
||||
if len(fetch) > 0 {
|
||||
fromDB, err := d.EventStateKeysTable.BulkSelectEventStateKey(ctx, nil, fetch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -97,6 +112,7 @@ func (d *Database) EventStateKeys(
|
|||
result[nid] = key
|
||||
d.Cache.StoreEventStateKey(nid, key)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
|
@ -111,16 +127,32 @@ func (d *Database) eventStateKeyNIDs(
|
|||
) (map[string]types.EventStateKeyNID, error) {
|
||||
result := make(map[string]types.EventStateKeyNID)
|
||||
eventStateKeys = util.UniqueStrings(eventStateKeys)
|
||||
nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, eventStateKeys)
|
||||
// first ask the cache about these keys
|
||||
fetchEventStateKeys := make([]string, 0, len(eventStateKeys))
|
||||
for _, eventStateKey := range eventStateKeys {
|
||||
eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey)
|
||||
if ok {
|
||||
result[eventStateKey] = eventStateKeyNID
|
||||
continue
|
||||
}
|
||||
fetchEventStateKeys = append(fetchEventStateKeys, eventStateKey)
|
||||
}
|
||||
|
||||
if len(fetchEventStateKeys) > 0 {
|
||||
nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, fetchEventStateKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for eventStateKey, nid := range nids {
|
||||
result[eventStateKey] = nid
|
||||
d.Cache.StoreEventStateKey(nid, eventStateKey)
|
||||
}
|
||||
}
|
||||
|
||||
// We received some nids, but are still missing some, work out which and create them
|
||||
if len(eventStateKeys) > len(result) {
|
||||
var nid types.EventStateKeyNID
|
||||
var err error
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
for _, eventStateKey := range eventStateKeys {
|
||||
if _, ok := result[eventStateKey]; ok {
|
||||
|
|
@ -262,7 +294,7 @@ func (d *Database) addState(
|
|||
|
||||
func (d *Database) EventNIDs(
|
||||
ctx context.Context, eventIDs []string,
|
||||
) (map[string]types.EventNID, error) {
|
||||
) (map[string]types.EventMetadata, error) {
|
||||
return d.eventNIDs(ctx, nil, eventIDs, NoFilter)
|
||||
}
|
||||
|
||||
|
|
@ -275,7 +307,7 @@ const (
|
|||
|
||||
func (d *Database) eventNIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string, filter UnsentFilter,
|
||||
) (map[string]types.EventNID, error) {
|
||||
) (map[string]types.EventMetadata, error) {
|
||||
switch filter {
|
||||
case FilterUnsentOnly:
|
||||
return d.EventsTable.BulkSelectUnsentEventNID(ctx, txn, eventIDs)
|
||||
|
|
@ -325,11 +357,11 @@ func (d *Database) EventIDs(
|
|||
return d.EventsTable.BulkSelectEventID(ctx, nil, eventNIDs)
|
||||
}
|
||||
|
||||
func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
||||
return d.eventsFromIDs(ctx, nil, eventIDs, NoFilter)
|
||||
func (d *Database) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) {
|
||||
return d.eventsFromIDs(ctx, nil, roomNID, eventIDs, NoFilter)
|
||||
}
|
||||
|
||||
func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, eventIDs []string, filter UnsentFilter) ([]types.Event, error) {
|
||||
func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventIDs []string, filter UnsentFilter) ([]types.Event, error) {
|
||||
nidMap, err := d.eventNIDs(ctx, txn, eventIDs, filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -337,10 +369,16 @@ func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, eventIDs []st
|
|||
|
||||
var nids []types.EventNID
|
||||
for _, nid := range nidMap {
|
||||
nids = append(nids, nid)
|
||||
nids = append(nids, nid.EventNID)
|
||||
if roomNID != 0 && roomNID != nid.RoomNID {
|
||||
logrus.Errorf("expected events from room %d, but also found %d", roomNID, nid.RoomNID)
|
||||
}
|
||||
if roomNID == 0 {
|
||||
roomNID = nid.RoomNID
|
||||
}
|
||||
}
|
||||
|
||||
return d.events(ctx, txn, nids)
|
||||
return d.events(ctx, txn, roomNID, nids)
|
||||
}
|
||||
|
||||
func (d *Database) LatestEventIDs(
|
||||
|
|
@ -480,14 +518,18 @@ func (d *Database) GetInvitesForUser(
|
|||
}
|
||||
|
||||
func (d *Database) Events(
|
||||
ctx context.Context, eventNIDs []types.EventNID,
|
||||
ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID,
|
||||
) ([]types.Event, error) {
|
||||
return d.events(ctx, nil, eventNIDs)
|
||||
return d.events(ctx, nil, roomNID, eventNIDs)
|
||||
}
|
||||
|
||||
func (d *Database) events(
|
||||
ctx context.Context, txn *sql.Tx, inputEventNIDs types.EventNIDs,
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, inputEventNIDs types.EventNIDs,
|
||||
) ([]types.Event, error) {
|
||||
if roomNID == 0 {
|
||||
// No need to go further, as we won't find any events for this room.
|
||||
return nil, nil
|
||||
}
|
||||
sort.Sort(inputEventNIDs)
|
||||
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(inputEventNIDs))
|
||||
eventNIDs := make([]types.EventNID, 0, len(inputEventNIDs))
|
||||
|
|
@ -519,40 +561,34 @@ func (d *Database) events(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventIDs, _ := d.EventsTable.BulkSelectEventID(ctx, txn, eventNIDs)
|
||||
eventIDs, err := d.EventsTable.BulkSelectEventID(ctx, txn, eventNIDs)
|
||||
if err != nil {
|
||||
eventIDs = map[types.EventNID]string{}
|
||||
}
|
||||
var roomNIDs map[types.EventNID]types.RoomNID
|
||||
roomNIDs, err = d.EventsTable.SelectRoomNIDsForEventNIDs(ctx, txn, eventNIDs)
|
||||
|
||||
var roomVersion gomatrixserverlib.RoomVersion
|
||||
var fetchRoomVersion bool
|
||||
var ok bool
|
||||
var roomID string
|
||||
if roomID, ok = d.Cache.GetRoomServerRoomID(roomNID); ok {
|
||||
roomVersion, ok = d.Cache.GetRoomVersion(roomID)
|
||||
if !ok {
|
||||
fetchRoomVersion = true
|
||||
}
|
||||
}
|
||||
|
||||
if roomVersion == "" || fetchRoomVersion {
|
||||
var dbRoomVersions map[types.RoomNID]gomatrixserverlib.RoomVersion
|
||||
dbRoomVersions, err = d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, txn, []types.RoomNID{roomNID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
uniqueRoomNIDs := make(map[types.RoomNID]struct{})
|
||||
for _, n := range roomNIDs {
|
||||
uniqueRoomNIDs[n] = struct{}{}
|
||||
}
|
||||
roomVersions := make(map[types.RoomNID]gomatrixserverlib.RoomVersion)
|
||||
fetchNIDList := make([]types.RoomNID, 0, len(uniqueRoomNIDs))
|
||||
for n := range uniqueRoomNIDs {
|
||||
if roomID, ok := d.Cache.GetRoomServerRoomID(n); ok {
|
||||
if roomVersion, ok := d.Cache.GetRoomVersion(roomID); ok {
|
||||
roomVersions[n] = roomVersion
|
||||
continue
|
||||
if roomVersion, ok = dbRoomVersions[roomNID]; !ok {
|
||||
return nil, fmt.Errorf("unable to find roomversion for room %d", roomNID)
|
||||
}
|
||||
}
|
||||
fetchNIDList = append(fetchNIDList, n)
|
||||
}
|
||||
dbRoomVersions, err := d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, txn, fetchNIDList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for n, v := range dbRoomVersions {
|
||||
roomVersions[n] = v
|
||||
}
|
||||
|
||||
for _, eventJSON := range eventJSONs {
|
||||
roomNID := roomNIDs[eventJSON.EventNID]
|
||||
roomVersion := roomVersions[roomNID]
|
||||
events[eventJSON.EventNID], err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
|
||||
eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion,
|
||||
)
|
||||
|
|
@ -624,37 +660,8 @@ func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, e
|
|||
return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID)
|
||||
}
|
||||
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event,
|
||||
authEventNIDs []types.EventNID, isRejected bool,
|
||||
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||
return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected)
|
||||
}
|
||||
|
||||
func (d *Database) storeEvent(
|
||||
ctx context.Context, updater *RoomUpdater, event *gomatrixserverlib.Event,
|
||||
authEventNIDs []types.EventNID, isRejected bool,
|
||||
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||
var (
|
||||
roomNID types.RoomNID
|
||||
eventTypeNID types.EventTypeNID
|
||||
eventStateKeyNID types.EventStateKeyNID
|
||||
eventNID types.EventNID
|
||||
stateNID types.StateSnapshotNID
|
||||
redactionEvent *gomatrixserverlib.Event
|
||||
redactedEventID string
|
||||
err error
|
||||
)
|
||||
var txn *sql.Tx
|
||||
if updater != nil && updater.txn != nil {
|
||||
txn = updater.txn
|
||||
}
|
||||
// First writer is with a database-provided transaction, so that NIDs are assigned
|
||||
// globally outside of the updater context, to help avoid races.
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
// TODO: Here we should aim to have two different code paths for new rooms
|
||||
// vs existing ones.
|
||||
|
||||
// GetOrCreateRoomNID gets or creates a new roomNID for the given event
|
||||
func (d *Database) GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (roomNID types.RoomNID, err error) {
|
||||
// Get the default room version. If the client doesn't supply a room_version
|
||||
// then we will use our configured default to create the room.
|
||||
// https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-createroom
|
||||
|
|
@ -663,34 +670,61 @@ func (d *Database) storeEvent(
|
|||
// room.
|
||||
var roomVersion gomatrixserverlib.RoomVersion
|
||||
if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil {
|
||||
return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
||||
return 0, fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
||||
}
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return roomNID, err
|
||||
}
|
||||
|
||||
if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil {
|
||||
return fmt.Errorf("d.assignRoomNID: %w", err)
|
||||
}
|
||||
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil {
|
||||
func (d *Database) GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) {
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, eventType); err != nil {
|
||||
return fmt.Errorf("d.assignEventTypeNID: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return eventTypeNID, err
|
||||
}
|
||||
|
||||
eventStateKey := event.StateKey()
|
||||
// Assigned a numeric ID for the state_key if there is one present.
|
||||
// Otherwise set the numeric ID for the state_key to 0.
|
||||
if eventStateKey != nil {
|
||||
func (d *Database) GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (eventStateKeyNID types.EventStateKeyNID, err error) {
|
||||
if eventStateKey == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
|
||||
return fmt.Errorf("d.assignStateKeyNID: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return eventStateKeyNID, nil
|
||||
}
|
||||
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event,
|
||||
roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID,
|
||||
authEventNIDs []types.EventNID, isRejected bool,
|
||||
) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||
var (
|
||||
eventNID types.EventNID
|
||||
stateNID types.StateSnapshotNID
|
||||
redactionEvent *gomatrixserverlib.Event
|
||||
redactedEventID string
|
||||
err error
|
||||
)
|
||||
// Second writer is using the database-provided transaction, probably from the
|
||||
// room updater, for easy roll-back if required.
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if eventNID, stateNID, err = d.EventsTable.InsertEvent(
|
||||
ctx,
|
||||
txn,
|
||||
|
|
@ -718,7 +752,7 @@ func (d *Database) storeEvent(
|
|||
return fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err)
|
||||
}
|
||||
if !isRejected { // ignore rejected redaction events
|
||||
redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event)
|
||||
redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, roomNID, eventNID, event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("d.handleRedactions: %w", err)
|
||||
}
|
||||
|
|
@ -726,7 +760,7 @@ func (d *Database) storeEvent(
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
}
|
||||
|
||||
// We should attempt to update the previous events table with any
|
||||
|
|
@ -741,28 +775,28 @@ func (d *Database) storeEvent(
|
|||
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
|
||||
// to do writes however then this will need to go inside `Writer.Do`.
|
||||
succeeded := false
|
||||
if updater == nil {
|
||||
var roomInfo *types.RoomInfo
|
||||
roomInfo, err = d.roomInfo(ctx, txn, event.RoomID())
|
||||
roomInfo, err = d.roomInfo(ctx, nil, event.RoomID())
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
||||
}
|
||||
if roomInfo == nil && len(prevEvents) > 0 {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
||||
}
|
||||
var updater *RoomUpdater
|
||||
updater, err = d.GetRoomUpdater(ctx, roomInfo)
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err)
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err)
|
||||
}
|
||||
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||
}
|
||||
|
||||
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
||||
}
|
||||
succeeded = true
|
||||
}
|
||||
|
||||
return eventNID, roomNID, types.StateAtEvent{
|
||||
return eventNID, types.StateAtEvent{
|
||||
BeforeStateSnapshotNID: stateNID,
|
||||
StateEntry: types.StateEntry{
|
||||
StateKeyTuple: types.StateKeyTuple{
|
||||
|
|
@ -814,6 +848,10 @@ func (d *Database) MissingAuthPrevEvents(
|
|||
func (d *Database) assignRoomNID(
|
||||
ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||
) (types.RoomNID, error) {
|
||||
roomNID, ok := d.Cache.GetRoomServerRoomNID(roomID)
|
||||
if ok {
|
||||
return roomNID, nil
|
||||
}
|
||||
// Check if we already have a numeric ID in the database.
|
||||
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -824,12 +862,20 @@ func (d *Database) assignRoomNID(
|
|||
roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
}
|
||||
}
|
||||
return roomNID, err
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
d.Cache.StoreRoomServerRoomID(roomNID, roomID)
|
||||
return roomNID, nil
|
||||
}
|
||||
|
||||
func (d *Database) assignEventTypeNID(
|
||||
ctx context.Context, txn *sql.Tx, eventType string,
|
||||
) (types.EventTypeNID, error) {
|
||||
eventTypeNID, ok := d.Cache.GetEventTypeKey(eventType)
|
||||
if ok {
|
||||
return eventTypeNID, nil
|
||||
}
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -840,12 +886,20 @@ func (d *Database) assignEventTypeNID(
|
|||
eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
}
|
||||
}
|
||||
return eventTypeNID, err
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
d.Cache.StoreEventTypeKey(eventTypeNID, eventType)
|
||||
return eventTypeNID, nil
|
||||
}
|
||||
|
||||
func (d *Database) assignStateKeyNID(
|
||||
ctx context.Context, txn *sql.Tx, eventStateKey string,
|
||||
) (types.EventStateKeyNID, error) {
|
||||
eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey)
|
||||
if ok {
|
||||
return eventStateKeyNID, nil
|
||||
}
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -856,6 +910,7 @@ func (d *Database) assignStateKeyNID(
|
|||
eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
}
|
||||
}
|
||||
d.Cache.StoreEventStateKey(eventStateKeyNID, eventStateKey)
|
||||
return eventStateKeyNID, err
|
||||
}
|
||||
|
||||
|
|
@ -899,7 +954,7 @@ func extractRoomVersionFromCreateEvent(event *gomatrixserverlib.Event) (
|
|||
//
|
||||
// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction.
|
||||
func (d *Database) handleRedactions(
|
||||
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
||||
) (*gomatrixserverlib.Event, string, error) {
|
||||
var err error
|
||||
isRedactionEvent := event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil
|
||||
|
|
@ -919,7 +974,7 @@ func (d *Database) handleRedactions(
|
|||
}
|
||||
}
|
||||
|
||||
redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event)
|
||||
redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, roomNID, eventNID, event)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err)
|
||||
}
|
||||
|
|
@ -985,7 +1040,7 @@ func (d *Database) handleRedactions(
|
|||
|
||||
// loadRedactionPair returns both the redaction event and the redacted event, else nil.
|
||||
func (d *Database) loadRedactionPair(
|
||||
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
||||
) (*types.Event, *types.Event, bool, error) {
|
||||
var redactionEvent, redactedEvent *types.Event
|
||||
var info *tables.RedactionInfo
|
||||
|
|
@ -1017,9 +1072,9 @@ func (d *Database) loadRedactionPair(
|
|||
}
|
||||
|
||||
if isRedactionEvent {
|
||||
redactedEvent = d.loadEvent(ctx, info.RedactsEventID)
|
||||
redactedEvent = d.loadEvent(ctx, roomNID, info.RedactsEventID)
|
||||
} else {
|
||||
redactionEvent = d.loadEvent(ctx, info.RedactionEventID)
|
||||
redactionEvent = d.loadEvent(ctx, roomNID, info.RedactionEventID)
|
||||
}
|
||||
|
||||
return redactionEvent, redactedEvent, info.Validated, nil
|
||||
|
|
@ -1035,7 +1090,7 @@ func (d *Database) applyRedactions(events []types.Event) {
|
|||
}
|
||||
|
||||
// loadEvent loads a single event or returns nil on any problems/missing event
|
||||
func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event {
|
||||
func (d *Database) loadEvent(ctx context.Context, roomNID types.RoomNID, eventID string) *types.Event {
|
||||
nids, err := d.EventNIDs(ctx, []string{eventID})
|
||||
if err != nil {
|
||||
return nil
|
||||
|
|
@ -1043,7 +1098,7 @@ func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event {
|
|||
if len(nids) == 0 {
|
||||
return nil
|
||||
}
|
||||
evs, err := d.Events(ctx, []types.EventNID{nids[eventID]})
|
||||
evs, err := d.Events(ctx, roomNID, []types.EventNID{nids[eventID].EventNID})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1470,14 +1525,20 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
|||
func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error {
|
||||
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
published, err := d.PublishedTable.SelectPublishedFromRoomID(ctx, txn, oldRoomID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get published room: %w", err)
|
||||
}
|
||||
if published {
|
||||
// un-publish old room
|
||||
if err := d.PublishedTable.UpsertRoomPublished(ctx, txn, oldRoomID, "", "", false); err != nil {
|
||||
if err = d.PublishedTable.UpsertRoomPublished(ctx, txn, oldRoomID, "", "", false); err != nil {
|
||||
return fmt.Errorf("failed to unpublish room: %w", err)
|
||||
}
|
||||
// publish new room
|
||||
if err := d.PublishedTable.UpsertRoomPublished(ctx, txn, newRoomID, "", "", true); err != nil {
|
||||
if err = d.PublishedTable.UpsertRoomPublished(ctx, txn, newRoomID, "", "", true); err != nil {
|
||||
return fmt.Errorf("failed to publish room: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate any existing room aliases
|
||||
aliases, err := d.RoomAliasesTable.SelectAliasesFromRoomID(ctx, txn, oldRoomID)
|
||||
|
|
|
|||
|
|
@ -3,7 +3,9 @@ package shared_test
|
|||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -48,11 +50,14 @@ func mustCreateRoomserverDatabase(t *testing.T, dbType test.DBType) (*shared.Dat
|
|||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
cache := caching.NewRistrettoCache(8*1024*1024, time.Hour, false)
|
||||
|
||||
return &shared.Database{
|
||||
DB: db,
|
||||
EventStateKeysTable: stateKeyTable,
|
||||
MembershipTable: membershipTable,
|
||||
Writer: sqlutil.NewExclusiveWriter(),
|
||||
Cache: cache,
|
||||
}, func() {
|
||||
err := base.Close()
|
||||
assert.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -110,10 +110,10 @@ const bulkSelectEventIDSQL = "" +
|
|||
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)"
|
||||
|
||||
const bulkSelectEventNIDSQL = "" +
|
||||
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id IN ($1)"
|
||||
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id IN ($1)"
|
||||
|
||||
const bulkSelectUnsentEventNIDSQL = "" +
|
||||
"SELECT event_id, event_nid FROM roomserver_events WHERE sent_to_output = 0 AND event_id IN ($1)"
|
||||
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE sent_to_output = 0 AND event_id IN ($1)"
|
||||
|
||||
const selectMaxEventDepthSQL = "" +
|
||||
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)"
|
||||
|
|
@ -572,20 +572,20 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev
|
|||
|
||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||
// If an event ID is not in the database then it is omitted from the map.
|
||||
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
||||
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, false)
|
||||
}
|
||||
|
||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID
|
||||
// only for events that haven't already been sent to the roomserver output.
|
||||
// If an event ID is not in the database then it is omitted from the map.
|
||||
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
||||
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, true)
|
||||
}
|
||||
|
||||
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||
// If an event ID is not in the database then it is omitted from the map.
|
||||
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventNID, error) {
|
||||
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventMetadata, error) {
|
||||
///////////////
|
||||
iEventIDs := make([]interface{}, len(eventIDs))
|
||||
for k, v := range eventIDs {
|
||||
|
|
@ -609,14 +609,18 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e
|
|||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
||||
results := make(map[string]types.EventNID, len(eventIDs))
|
||||
results := make(map[string]types.EventMetadata, len(eventIDs))
|
||||
var eventID string
|
||||
var eventNID int64
|
||||
var roomNID int64
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(&eventID, &eventNID); err != nil {
|
||||
if err = rows.Scan(&eventID, &eventNID, &roomNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
results[eventID] = types.EventNID(eventNID)
|
||||
results[eventID] = types.EventMetadata{
|
||||
EventNID: types.EventNID(eventNID),
|
||||
RoomNID: types.RoomNID(roomNID),
|
||||
}
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,8 +63,8 @@ type Events interface {
|
|||
BulkSelectEventID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
|
||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||
// If an event ID is not in the database then it is omitted from the map.
|
||||
BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error)
|
||||
BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error)
|
||||
BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error)
|
||||
BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error)
|
||||
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
|
||||
SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
|
||||
SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error)
|
||||
|
|
|
|||
|
|
@ -38,6 +38,11 @@ type EventNID int64
|
|||
// RoomNID is a numeric ID for a room.
|
||||
type RoomNID int64
|
||||
|
||||
type EventMetadata struct {
|
||||
EventNID EventNID
|
||||
RoomNID RoomNID
|
||||
}
|
||||
|
||||
// StateSnapshotNID is a numeric ID for the state at an event.
|
||||
type StateSnapshotNID int64
|
||||
|
||||
|
|
|
|||
42
setup/jetstream/log.go
Normal file
42
setup/jetstream/log.go
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
package jetstream
|
||||
|
||||
import (
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var _ server.Logger = &LogAdapter{}
|
||||
|
||||
type LogAdapter struct {
|
||||
entry *logrus.Entry
|
||||
}
|
||||
|
||||
func NewLogAdapter() *LogAdapter {
|
||||
return &LogAdapter{
|
||||
entry: logrus.StandardLogger().WithField("component", "jetstream"),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *LogAdapter) Noticef(format string, v ...interface{}) {
|
||||
l.entry.Infof(format, v...)
|
||||
}
|
||||
|
||||
func (l *LogAdapter) Warnf(format string, v ...interface{}) {
|
||||
l.entry.Warnf(format, v...)
|
||||
}
|
||||
|
||||
func (l *LogAdapter) Fatalf(format string, v ...interface{}) {
|
||||
l.entry.Fatalf(format, v...)
|
||||
}
|
||||
|
||||
func (l *LogAdapter) Errorf(format string, v ...interface{}) {
|
||||
l.entry.Errorf(format, v...)
|
||||
}
|
||||
|
||||
func (l *LogAdapter) Debugf(format string, v ...interface{}) {
|
||||
l.entry.Debugf(format, v...)
|
||||
}
|
||||
|
||||
func (l *LogAdapter) Tracef(format string, v ...interface{}) {
|
||||
l.entry.Tracef(format, v...)
|
||||
}
|
||||
|
|
@ -40,7 +40,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
|
|||
}
|
||||
if s.Server == nil {
|
||||
var err error
|
||||
s.Server, err = natsserver.NewServer(&natsserver.Options{
|
||||
opts := &natsserver.Options{
|
||||
ServerName: "monolith",
|
||||
DontListen: true,
|
||||
JetStream: true,
|
||||
|
|
@ -49,11 +49,12 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
|
|||
MaxPayload: 16 * 1024 * 1024,
|
||||
NoSigs: true,
|
||||
NoLog: cfg.NoLog,
|
||||
})
|
||||
}
|
||||
s.Server, err = natsserver.NewServer(opts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s.ConfigureLogger()
|
||||
s.SetLogger(NewLogAdapter(), opts.Debug, opts.Trace)
|
||||
go func() {
|
||||
process.ComponentStarted()
|
||||
s.Start()
|
||||
|
|
|
|||
Loading…
Reference in a new issue