NATS JetStream

This commit is contained in:
Neil Alexander 2022-02-10 11:45:05 +00:00
parent 87dc8c7412
commit 2582a05451
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
16 changed files with 245 additions and 190 deletions

5
go.mod
View file

@ -11,21 +11,21 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/MFAshby/stdemuxerhook v1.0.0 github.com/MFAshby/stdemuxerhook v1.0.0
github.com/Masterminds/semver/v3 v3.1.1 github.com/Masterminds/semver/v3 v3.1.1
github.com/Shopify/sarama v1.31.1
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/codeclysm/extract v2.2.0+incompatible github.com/codeclysm/extract v2.2.0+incompatible
github.com/containerd/containerd v1.5.9 // indirect github.com/containerd/containerd v1.5.9 // indirect
github.com/docker/docker v20.10.12+incompatible github.com/docker/docker v20.10.12+incompatible
github.com/docker/go-connections v0.4.0 github.com/docker/go-connections v0.4.0
github.com/frankban/quicktest v1.14.0 // indirect
github.com/getsentry/sentry-go v0.12.0 github.com/getsentry/sentry-go v0.12.0
github.com/gologme/log v1.3.0 github.com/gologme/log v1.3.0
github.com/google/go-cmp v0.5.6
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/h2non/filetype v1.1.3 // indirect github.com/h2non/filetype v1.1.3 // indirect
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/testing v0.0.0-20211215003918-77eb13d6cad2 // indirect github.com/juju/testing v0.0.0-20211215003918-77eb13d6cad2 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/lib/pq v1.10.4 github.com/lib/pq v1.10.4
github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-circuit v0.4.0 github.com/libp2p/go-libp2p-circuit v0.4.0
@ -44,6 +44,7 @@ require (
github.com/matrix-org/gomatrixserverlib v0.0.0-20220209202448-9805ef634335 github.com/matrix-org/gomatrixserverlib v0.0.0-20220209202448-9805ef634335
github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/matryer/is v1.4.0
github.com/mattn/go-sqlite3 v1.14.10 github.com/mattn/go-sqlite3 v1.14.10
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect
github.com/nats-io/nats-server/v2 v2.3.2 github.com/nats-io/nats-server/v2 v2.3.2

42
go.sum
View file

@ -102,6 +102,10 @@ github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEk
github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30= github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ=
github.com/Shopify/sarama v1.31.1 h1:uxwJ+p4isb52RyV83MCJD8v2wJ/HBxEGMmG/8+sEzG0=
github.com/Shopify/sarama v1.31.1/go.mod h1:99E1xQ1Ql2bYcuJfwdXY3cE17W8+549Ty8PG/11BDqY=
github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ=
github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
@ -344,6 +348,12 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
@ -364,6 +374,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNp
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/frankban/quicktest v1.0.0/go.mod h1:R98jIehRai+d1/3Hv2//jOVCTJhW1VBavT6B6CuGq2k= github.com/frankban/quicktest v1.0.0/go.mod h1:R98jIehRai+d1/3Hv2//jOVCTJhW1VBavT6B6CuGq2k=
github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
@ -481,6 +493,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gologme/log v1.2.0/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/gologme/log v1.2.0/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
github.com/gologme/log v1.3.0 h1:l781G4dE+pbigClDSDzSaaYKtiueHCILUa/qSDsmHAo= github.com/gologme/log v1.3.0 h1:l781G4dE+pbigClDSDzSaaYKtiueHCILUa/qSDsmHAo=
github.com/gologme/log v1.3.0/go.mod h1:yKT+DvIPdDdDoPtqFrFxheooyVmoqi0BAsw+erN3wA4= github.com/gologme/log v1.3.0/go.mod h1:yKT+DvIPdDdDoPtqFrFxheooyVmoqi0BAsw+erN3wA4=
@ -533,6 +547,8 @@ github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@ -559,6 +575,8 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@ -644,6 +662,18 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA=
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
@ -1214,6 +1244,8 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -1266,6 +1298,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@ -1391,6 +1425,7 @@ github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w= github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
@ -1421,6 +1456,9 @@ github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
@ -1503,6 +1541,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
@ -1511,6 +1550,7 @@ golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220128200615-198e4374d7ed/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a h1:atOEWVSedO4ksXBe/UrlbSLVxQQ9RxM/tT2Jy10IaHo= golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a h1:atOEWVSedO4ksXBe/UrlbSLVxQQ9RxM/tT2Jy10IaHo=
golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -1748,6 +1788,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7-0.20210503195748-5c7c50ebbd4f/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7-0.20210503195748-5c7c50ebbd4f/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
@ -1983,6 +2024,7 @@ gopkg.in/yaml.v2 v2.0.0-20170712054546-1be3d31502d6/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -4,25 +4,29 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/pushserver/util" "github.com/matrix-org/dendrite/pushserver/util"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
uapi "github.com/matrix-org/dendrite/userapi/api" uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type OutputClientDataConsumer struct { type OutputClientDataConsumer struct {
ctx context.Context
cfg *config.PushServer cfg *config.PushServer
rsConsumer *internal.ContinualConsumer jetstream nats.JetStreamContext
durable string
db storage.Database db storage.Database
pgClient pushgateway.Client pgClient pushgateway.Client
ServerName gomatrixserverlib.ServerName
topic string
userAPI uapi.UserInternalAPI userAPI uapi.UserInternalAPI
syncProducer *producers.SyncAPI syncProducer *producers.SyncAPI
} }
@ -30,49 +34,48 @@ type OutputClientDataConsumer struct {
func NewOutputClientDataConsumer( func NewOutputClientDataConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.PushServer, cfg *config.PushServer,
kafkaConsumer sarama.Consumer, js nats.JetStreamContext,
store storage.Database, store storage.Database,
pgClient pushgateway.Client, pgClient pushgateway.Client,
userAPI uapi.UserInternalAPI, userAPI uapi.UserInternalAPI,
syncProducer *producers.SyncAPI, syncProducer *producers.SyncAPI,
) *OutputClientDataConsumer { ) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{ return &OutputClientDataConsumer{
Process: process, ctx: process.Context(),
ComponentName: "pushserver/clientapi",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputClientDataConsumer{
cfg: cfg, cfg: cfg,
rsConsumer: &consumer, jetstream: js,
db: store, db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("PushServerClientAPIConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
pgClient: pgClient, pgClient: pgClient,
userAPI: userAPI, userAPI: userAPI,
syncProducer: syncProducer, syncProducer: syncProducer,
} }
consumer.ProcessMessage = s.onMessage
return s
} }
func (s *OutputClientDataConsumer) Start() error { func (s *OutputClientDataConsumer) Start() error {
return s.rsConsumer.Start() if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
} }
func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
ctx := context.Background()
var event eventutil.AccountData var event eventutil.AccountData
if err := json.Unmarshal(msg.Value, &event); err != nil { if err := json.Unmarshal(msg.Data, &event); err != nil {
log.WithError(err).Error("pushserver clientapi consumer: message parse failure") log.WithError(err).Error("pushserver clientapi consumer: message parse failure")
return nil return true
} }
if event.Type != mFullyRead { if event.Type != mFullyRead {
return nil return true
} }
userID := string(msg.Key) userID := string(msg.Header.Get("user_id"))
localpart, domain, err := gomatrixserverlib.SplitID('@', userID) localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -80,16 +83,16 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": event.RoomID, "room_id": event.RoomID,
"event_type": event.Type, "event_type": event.Type,
}).WithError(err).Error("pushserver clientapi consumer: SplitID failure") }).WithError(err).Error("pushserver clientapi consumer: SplitID failure")
return nil return true
} }
if domain != s.cfg.Matrix.ServerName { if domain != s.ServerName {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"user_id": userID, "user_id": userID,
"room_id": event.RoomID, "room_id": event.RoomID,
"event_type": event.Type, "event_type": event.Type,
}).Error("pushserver clientapi consumer: not a local user") }).Error("pushserver clientapi consumer: not a local user")
return nil return true
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -110,7 +113,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": event.RoomID, "room_id": event.RoomID,
"event_type": event.Type, "event_type": event.Type,
}).WithError(err).Error("pushserver clientapi consumer: failed to query account data") }).WithError(err).Error("pushserver clientapi consumer: failed to query account data")
return nil return false
} }
ad, ok := userRes.RoomAccountData[event.RoomID] ad, ok := userRes.RoomAccountData[event.RoomID]
if !ok { if !ok {
@ -118,7 +121,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"localpart": localpart, "localpart": localpart,
"room_id": event.RoomID, "room_id": event.RoomID,
}).Errorf("pushserver clientapi consumer: room not found in account data response: %#v", userRes.RoomAccountData) }).Errorf("pushserver clientapi consumer: room not found in account data response: %#v", userRes.RoomAccountData)
return nil return true
} }
bs, ok := ad[mFullyRead] bs, ok := ad[mFullyRead]
if !ok { if !ok {
@ -126,7 +129,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"localpart": localpart, "localpart": localpart,
"room_id": event.RoomID, "room_id": event.RoomID,
}).Errorf("pushserver clientapi consumer: m.fully_read not found in account data: %#v", ad) }).Errorf("pushserver clientapi consumer: m.fully_read not found in account data: %#v", ad)
return nil return true
} }
var data fullyReadAccountData var data fullyReadAccountData
if err = json.Unmarshal([]byte(bs), &data); err != nil { if err = json.Unmarshal([]byte(bs), &data); err != nil {
@ -134,7 +137,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"localpart": localpart, "localpart": localpart,
"room_id": event.RoomID, "room_id": event.RoomID,
}).WithError(err).Error("pushserver clientapi consumer: json.Unmarshal of m.fully_read failed") }).WithError(err).Error("pushserver clientapi consumer: json.Unmarshal of m.fully_read failed")
return nil return true
} }
// TODO: we cannot know if this EventID caused a notification, so // TODO: we cannot know if this EventID caused a notification, so
@ -147,7 +150,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": event.RoomID, "room_id": event.RoomID,
"event_id": data.EventID, "event_id": data.EventID,
}).WithError(err).Errorf("pushserver clientapi consumer: DeleteNotificationsUpTo failed") }).WithError(err).Errorf("pushserver clientapi consumer: DeleteNotificationsUpTo failed")
return nil return false
} }
if deleted { if deleted {
@ -157,7 +160,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": event.RoomID, "room_id": event.RoomID,
"event_id": data.EventID, "event_id": data.EventID,
}).WithError(err).Error("pushserver clientapi consumer: NotifyUserCounts failed") }).WithError(err).Error("pushserver clientapi consumer: NotifyUserCounts failed")
return nil return false
} }
if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, event.RoomID); err != nil { if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, event.RoomID); err != nil {
@ -166,11 +169,11 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": event.RoomID, "room_id": event.RoomID,
"event_id": data.EventID, "event_id": data.EventID,
}).WithError(err).Errorf("pushserver clientapi consumer: GetAndSendNotificationData failed") }).WithError(err).Errorf("pushserver clientapi consumer: GetAndSendNotificationData failed")
return nil return false
} }
} }
return nil return true
} }
// mFullyRead is the account data type for the marker for the event up // mFullyRead is the account data type for the marker for the event up

View file

@ -3,75 +3,75 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/Shopify/sarama"
eduapi "github.com/matrix-org/dendrite/eduserver/api" eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/pushserver/util" "github.com/matrix-org/dendrite/pushserver/util"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type OutputReceiptEventConsumer struct { type OutputReceiptEventConsumer struct {
ctx context.Context
cfg *config.PushServer cfg *config.PushServer
rsConsumer *internal.ContinualConsumer jetstream nats.JetStreamContext
durable string
db storage.Database db storage.Database
pgClient pushgateway.Client pgClient pushgateway.Client
receiptTopic string
syncProducer *producers.SyncAPI syncProducer *producers.SyncAPI
} }
// NewOutputReceiptEventConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputReceiptEventConsumer( func NewOutputReceiptEventConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.PushServer, cfg *config.PushServer,
kafkaConsumer sarama.Consumer, js nats.JetStreamContext,
store storage.Database, store storage.Database,
pgClient pushgateway.Client, pgClient pushgateway.Client,
syncProducer *producers.SyncAPI, syncProducer *producers.SyncAPI,
) *OutputReceiptEventConsumer { ) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{ return &OutputReceiptEventConsumer{
Process: process, ctx: process.Context(),
ComponentName: "pushserver/eduserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputReceiptEventConsumer{
cfg: cfg, cfg: cfg,
rsConsumer: &consumer, jetstream: js,
db: store, db: store,
durable: cfg.Matrix.JetStream.Durable("PushServerEDUServerConsumer"),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
pgClient: pgClient, pgClient: pgClient,
syncProducer: syncProducer, syncProducer: syncProducer,
} }
consumer.ProcessMessage = s.onMessage
return s
} }
func (s *OutputReceiptEventConsumer) Start() error { func (s *OutputReceiptEventConsumer) Start() error {
return s.rsConsumer.Start() if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.receiptTopic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
} }
func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
ctx := context.Background()
var event eduapi.OutputReceiptEvent var event eduapi.OutputReceiptEvent
if err := json.Unmarshal(msg.Value, &event); err != nil { if err := json.Unmarshal(msg.Data, &event); err != nil {
log.WithError(err).Errorf("pushserver EDU consumer: message parse failure") log.WithError(err).Errorf("pushserver EDU consumer: message parse failure")
return nil return true
} }
localpart, domain, err := gomatrixserverlib.SplitID('@', event.UserID) localpart, domain, err := gomatrixserverlib.SplitID('@', event.UserID)
if err != nil { if err != nil {
return err return true
} }
if domain != s.cfg.Matrix.ServerName { if domain != s.cfg.Matrix.ServerName {
return fmt.Errorf("pushserver EDU consumer: not a local user: %v", event.UserID) return true
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -91,7 +91,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
"room_id": event.RoomID, "room_id": event.RoomID,
"event_id": event.EventID, "event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer") }).WithError(err).Error("pushserver EDU consumer")
return nil return false
} }
if updated { if updated {
@ -101,7 +101,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
"room_id": event.RoomID, "room_id": event.RoomID,
"event_id": event.EventID, "event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: GetAndSendNotificationData failed") }).WithError(err).Error("pushserver EDU consumer: GetAndSendNotificationData failed")
return nil return false
} }
if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
@ -110,10 +110,10 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
"room_id": event.RoomID, "room_id": event.RoomID,
"event_id": event.EventID, "event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: NotifyUserCounts failed") }).WithError(err).Error("pushserver EDU consumer: NotifyUserCounts failed")
return nil return false
} }
} }
return nil return true
} }

View file

@ -7,8 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/Shopify/sarama" "github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/pushrules"
@ -19,62 +18,66 @@ import (
"github.com/matrix-org/dendrite/pushserver/util" "github.com/matrix-org/dendrite/pushserver/util"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
ctx context.Context
cfg *config.PushServer cfg *config.PushServer
rsAPI rsapi.RoomserverInternalAPI
psAPI api.PushserverInternalAPI psAPI api.PushserverInternalAPI
pgClient pushgateway.Client rsAPI rsapi.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer jetstream nats.JetStreamContext
durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI syncProducer *producers.SyncAPI
} }
func NewOutputRoomEventConsumer( func NewOutputRoomEventConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.PushServer, cfg *config.PushServer,
kafkaConsumer sarama.Consumer, js nats.JetStreamContext,
store storage.Database, store storage.Database,
pgClient pushgateway.Client, pgClient pushgateway.Client,
psAPI api.PushserverInternalAPI, psAPI api.PushserverInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, rsAPI rsapi.RoomserverInternalAPI,
syncProducer *producers.SyncAPI, syncProducer *producers.SyncAPI,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{ return &OutputRoomEventConsumer{
Process: process, ctx: process.Context(),
ComponentName: "pushserver/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
cfg: cfg, cfg: cfg,
rsConsumer: &consumer, jetstream: js,
db: store, db: store,
rsAPI: rsAPI, durable: cfg.Matrix.JetStream.Durable("PushServerClientAPIConsumer"),
psAPI: psAPI, topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
pgClient: pgClient, pgClient: pgClient,
psAPI: psAPI,
rsAPI: rsAPI,
syncProducer: syncProducer, syncProducer: syncProducer,
} }
consumer.ProcessMessage = s.onMessage
return s
} }
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
return s.rsConsumer.Start() if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
} }
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
ctx := context.Background()
var output rsapi.OutputEvent var output rsapi.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {
log.WithError(err).Errorf("pushserver consumer: message parse failure") log.WithError(err).Errorf("pushserver consumer: message parse failure")
return nil return true
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -104,7 +107,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Ignore old events, peeks, so on. // Ignore old events, peeks, so on.
} }
return nil return true
} }
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {

View file

@ -6,7 +6,6 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/Shopify/sarama"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
@ -17,6 +16,7 @@ import (
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
) )
const serverName = gomatrixserverlib.ServerName("example.org") const serverName = gomatrixserverlib.ServerName("example.org")
@ -124,11 +124,13 @@ func TestOutputRoomEventConsumer(t *testing.T) {
}}, pgClient.Reqs); diff != "" { }}, pgClient.Reqs); diff != "" {
t.Errorf("pgClient.NotifyHTTP Reqs: +got -want:\n%s", diff) t.Errorf("pgClient.NotifyHTTP Reqs: +got -want:\n%s", diff)
} }
if diff := cmp.Diff([]sarama.ProducerMessage{{ msg := &nats.Msg{
Topic: "notificationDataTopic", Subject: "notificationDataTopic",
Key: sarama.StringEncoder("@alice:example.org"), Header: nats.Header{},
Value: sarama.ByteEncoder([]byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`)), Data: []byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`),
}}, messageSender.Messages, cmpopts.IgnoreUnexported(sarama.ProducerMessage{})); diff != "" { }
msg.Header.Set("user_id", "@alice:example.org")
if diff := cmp.Diff([]*nats.Msg{msg}, messageSender.Messages, cmpopts.IgnoreUnexported(nats.Msg{})); diff != "" {
t.Errorf("SendMessage Messages: +got -want:\n%s", diff) t.Errorf("SendMessage Messages: +got -want:\n%s", diff)
} }
} }
@ -243,10 +245,10 @@ func mustParseHeaderedEventJSON(s string) *gomatrixserverlib.HeaderedEvent {
} }
type fakeMessageSender struct { type fakeMessageSender struct {
Messages []sarama.ProducerMessage Messages []*nats.Msg
} }
func (s *fakeMessageSender) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { func (s *fakeMessageSender) PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) {
s.Messages = append(s.Messages, *msg) s.Messages = append(s.Messages, msg)
return 0, 0, nil return nil, nil
} }

View file

@ -7,22 +7,28 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type JetStreamPublisher interface {
PublishMsg(*nats.Msg, ...nats.PubOpt) (*nats.PubAck, error)
}
// SyncAPI produces messages for the Sync API server to consume. // SyncAPI produces messages for the Sync API server to consume.
type SyncAPI struct { type SyncAPI struct {
db storage.Database db storage.Database
producer MessageSender producer JetStreamPublisher
clientDataTopic string clientDataTopic string
notificationDataTopic string notificationDataTopic string
} }
func NewSyncAPI(db storage.Database, producer MessageSender, clientDataTopic string, notificationDataTopic string) *SyncAPI { func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic string, notificationDataTopic string) *SyncAPI {
return &SyncAPI{ return &SyncAPI{
db: db, db: db,
producer: producer, producer: js,
clientDataTopic: clientDataTopic, clientDataTopic: clientDataTopic,
notificationDataTopic: notificationDataTopic, notificationDataTopic: notificationDataTopic,
} }
@ -30,27 +36,29 @@ func NewSyncAPI(db storage.Database, producer MessageSender, clientDataTopic str
// SendAccountData sends account data to the Sync API server. // SendAccountData sends account data to the Sync API server.
func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error { func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error {
var m sarama.ProducerMessage m := &nats.Msg{
Subject: p.clientDataTopic,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
data := eventutil.AccountData{ data := eventutil.AccountData{
RoomID: roomID, RoomID: roomID,
Type: dataType, Type: dataType,
} }
value, err := json.Marshal(data) var err error
m.Data, err = json.Marshal(data)
if err != nil { if err != nil {
return err return err
} }
m.Topic = string(p.clientDataTopic)
m.Key = sarama.StringEncoder(userID)
m.Value = sarama.ByteEncoder(value)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"user_id": userID, "user_id": userID,
"room_id": roomID, "room_id": roomID,
"data_type": dataType, "data_type": dataType,
}).Infof("Producing to topic %q", m.Topic) }).Tracef("Producing to topic '%s'", p.clientDataTopic)
_, _, err = p.producer.SendMessage(&m) _, err = p.producer.PublishMsg(m)
return err return err
} }
@ -76,21 +84,24 @@ func (p *SyncAPI) GetAndSendNotificationData(ctx context.Context, userID, roomID
// sendNotificationData sends data about unread notifications to the Sync API server. // sendNotificationData sends data about unread notifications to the Sync API server.
func (p *SyncAPI) sendNotificationData(userID string, data *eventutil.NotificationData) error { func (p *SyncAPI) sendNotificationData(userID string, data *eventutil.NotificationData) error {
value, err := json.Marshal(data) m := &nats.Msg{
Subject: p.notificationDataTopic,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
var err error
m.Data, err = json.Marshal(data)
if err != nil { if err != nil {
return err return err
} }
var m sarama.ProducerMessage
m.Topic = string(p.notificationDataTopic)
m.Key = sarama.StringEncoder(userID)
m.Value = sarama.ByteEncoder(value)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"user_id": userID, "user_id": userID,
"room_id": data.RoomID, "room_id": data.RoomID,
}).Infof("Producing to topic %q", m.Topic) }).Tracef("Producing to topic '%s'", p.clientDataTopic)
_, _, err = p.producer.SendMessage(&m) _, err = p.producer.PublishMsg(m)
return err return err
} }

View file

@ -11,7 +11,7 @@ import (
"github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/pushserver/storage"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/kafka" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
uapi "github.com/matrix-org/dendrite/userapi/api" uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -32,23 +32,21 @@ func NewInternalAPI(
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
userAPI uapi.UserInternalAPI, userAPI uapi.UserInternalAPI,
) api.PushserverInternalAPI { ) api.PushserverInternalAPI {
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
db, err := storage.Open(&cfg.Database) db, err := storage.Open(&cfg.Database)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to push server db") logrus.WithError(err).Panicf("failed to connect to push server db")
} }
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := producers.NewSyncAPI( syncProducer := producers.NewSyncAPI(
db, db, js,
producer,
// TODO: user API should handle syncs for account data. Right now, // TODO: user API should handle syncs for account data. Right now,
// it's handled by clientapi, and hence uses its topic. When user // it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from // API handles it for all account data, we can remove it from
// here. // here.
cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData), cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
) )
psAPI := internal.NewPushserverAPI( psAPI := internal.NewPushserverAPI(
@ -56,21 +54,21 @@ func NewInternalAPI(
) )
caConsumer := consumers.NewOutputClientDataConsumer( caConsumer := consumers.NewOutputClientDataConsumer(
process, cfg, consumer, db, pgClient, userAPI, syncProducer, process, cfg, js, db, pgClient, userAPI, syncProducer,
) )
if err := caConsumer.Start(); err != nil { if err := caConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start push server clientapi consumer") logrus.WithError(err).Panic("failed to start push server clientapi consumer")
} }
eduConsumer := consumers.NewOutputReceiptEventConsumer( eduConsumer := consumers.NewOutputReceiptEventConsumer(
process, cfg, consumer, db, pgClient, syncProducer, process, cfg, js, db, pgClient, syncProducer,
) )
if err := eduConsumer.Start(); err != nil { if err := eduConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start push server EDU consumer") logrus.WithError(err).Panic("failed to start push server EDU consumer")
} }
rsConsumer := consumers.NewOutputRoomEventConsumer( rsConsumer := consumers.NewOutputRoomEventConsumer(
process, cfg, consumer, db, pgClient, psAPI, rsAPI, syncProducer, process, cfg, js, db, pgClient, psAPI, rsAPI, syncProducer,
) )
if err := rsConsumer.Start(); err != nil { if err := rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start push server room server consumer") logrus.WithError(err).Panic("failed to start push server room server consumer")

View file

@ -3,13 +3,11 @@ package storage
import ( import (
"context" "context"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/pushserver/api" "github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/storage/tables" "github.com/matrix-org/dendrite/pushserver/storage/tables"
) )
type Database interface { type Database interface {
internal.PartitionStorer
UpsertPusher(ctx context.Context, pusher api.Pusher, localpart string) error UpsertPusher(ctx context.Context, pusher api.Pusher, localpart string) error
GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error)
RemovePusher(ctx context.Context, appId, pushkey, localpart string) error RemovePusher(ctx context.Context, appId, pushkey, localpart string) error

View file

@ -18,6 +18,7 @@ var (
OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent"
OutputTypingEvent = "OutputTypingEvent" OutputTypingEvent = "OutputTypingEvent"
OutputClientData = "OutputClientData" OutputClientData = "OutputClientData"
OutputNotificationData = "OutputNotificationData"
OutputReceiptEvent = "OutputReceiptEvent" OutputReceiptEvent = "OutputReceiptEvent"
) )
@ -58,4 +59,9 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,
Storage: nats.FileStorage, Storage: nats.FileStorage,
}, },
{
Name: OutputNotificationData,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
} }

View file

@ -18,25 +18,28 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// OutputNotificationDataConsumer consumes events that originated in // OutputNotificationDataConsumer consumes events that originated in
// the Push server. // the Push server.
type OutputNotificationDataConsumer struct { type OutputNotificationDataConsumer struct {
consumer *internal.ContinualConsumer ctx context.Context
db storage.Database jetstream nats.JetStreamContext
stream types.StreamProvider durable string
notifier *notifier.Notifier topic string
db storage.Database
notifier *notifier.Notifier
stream types.StreamProvider
} }
// NewOutputNotificationDataConsumer creates a new consumer. Call // NewOutputNotificationDataConsumer creates a new consumer. Call
@ -44,49 +47,44 @@ type OutputNotificationDataConsumer struct {
func NewOutputNotificationDataConsumer( func NewOutputNotificationDataConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.SyncAPI, cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer, js nats.JetStreamContext,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, stream types.StreamProvider,
) *OutputNotificationDataConsumer { ) *OutputNotificationDataConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/pushserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputNotificationDataConsumer{ s := &OutputNotificationDataConsumer{
consumer: &consumer, ctx: process.Context(),
db: store, jetstream: js,
notifier: notifier, durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"),
stream: stream, topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
db: store,
notifier: notifier,
stream: stream,
} }
consumer.ProcessMessage = s.onMessage
return s return s
} }
// Start starts consumption. // Start starts consumption.
func (s *OutputNotificationDataConsumer) Start() error { func (s *OutputNotificationDataConsumer) Start() error {
return s.consumer.Start() return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
} }
// onMessage is called when the Sync server receives a new event from // onMessage is called when the Sync server receives a new event from
// the push server. It is not safe for this function to be called from // the push server. It is not safe for this function to be called from
// multiple goroutines, or else the sync stream position may race and // multiple goroutines, or else the sync stream position may race and
// be incorrectly calculated. // be incorrectly calculated.
func (s *OutputNotificationDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
ctx := context.Background() userID := string(msg.Header.Get(jetstream.UserID))
userID := string(msg.Key)
// Parse out the event JSON // Parse out the event JSON
var data eventutil.NotificationData var data eventutil.NotificationData
if err := json.Unmarshal(msg.Value, &data); err != nil { if err := json.Unmarshal(msg.Data, &data); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
log.WithField("user_id", userID).WithError(err).Error("push server consumer: message parse failure") log.WithField("user_id", userID).WithError(err).Error("push server consumer: message parse failure")
return nil return true
} }
streamPos, err := s.db.UpsertRoomUnreadNotificationCounts(ctx, userID, data.RoomID, data.UnreadNotificationCount, data.UnreadHighlightCount) streamPos, err := s.db.UpsertRoomUnreadNotificationCounts(ctx, userID, data.RoomID, data.UnreadNotificationCount, data.UnreadHighlightCount)
@ -107,5 +105,5 @@ func (s *OutputNotificationDataConsumer) onMessage(msg *sarama.ConsumerMessage)
"streamPos": streamPos, "streamPos": streamPos,
}).Info("Received data from Push server") }).Info("Received data from Push server")
return nil return true
} }

View file

@ -18,12 +18,8 @@ import (
"context" "context"
eduAPI "github.com/matrix-org/dendrite/eduserver/api" eduAPI "github.com/matrix-org/dendrite/eduserver/api"
<<<<<<< HEAD
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
=======
>>>>>>> main
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"

View file

@ -12,24 +12,14 @@ import (
) )
type Streams struct { type Streams struct {
<<<<<<< HEAD
PDUStreamProvider types.StreamProvider PDUStreamProvider types.StreamProvider
TypingStreamProvider types.StreamProvider TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamProvider
NotificationDataStreamProvider types.StreamProvider NotificationDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
=======
PDUStreamProvider types.StreamProvider
TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamProvider
>>>>>>> main
} }
func NewSyncStreamProviders( func NewSyncStreamProviders(

View file

@ -89,7 +89,7 @@ func AddPublicRoutes(
} }
notificationConsumer := consumers.NewOutputNotificationDataConsumer( notificationConsumer := consumers.NewOutputNotificationDataConsumer(
process, cfg, consumer, syncDB, notifier, streams.NotificationDataStreamProvider, process, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider,
) )
if err = notificationConsumer.Start(); err != nil { if err = notificationConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start notification data consumer") logrus.WithError(err).Panicf("failed to start notification data consumer")

View file

@ -95,13 +95,14 @@ const (
) )
type StreamingToken struct { type StreamingToken struct {
PDUPosition StreamPosition PDUPosition StreamPosition
TypingPosition StreamPosition TypingPosition StreamPosition
ReceiptPosition StreamPosition ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition SendToDevicePosition StreamPosition
InvitePosition StreamPosition InvitePosition StreamPosition
AccountDataPosition StreamPosition AccountDataPosition StreamPosition
DeviceListPosition StreamPosition DeviceListPosition StreamPosition
NotificationDataPosition StreamPosition
} }
// This will be used as a fallback by json.Marshal. // This will be used as a fallback by json.Marshal.
@ -117,10 +118,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string { func (t StreamingToken) String() string {
posStr := fmt.Sprintf( posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d_%d_%d", "s%d_%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition, t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition, t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition, t.InvitePosition, t.AccountDataPosition,
t.DeviceListPosition, t.NotificationDataPosition,
) )
return posStr return posStr
} }
@ -142,12 +144,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true return true
case t.DeviceListPosition > other.DeviceListPosition: case t.DeviceListPosition > other.DeviceListPosition:
return true return true
case t.NotificationDataPosition > other.NotificationDataPosition:
return true
} }
return false return false
} }
func (t *StreamingToken) IsEmpty() bool { func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition == 0 return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0
} }
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -185,6 +189,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.DeviceListPosition > t.DeviceListPosition { if other.DeviceListPosition > t.DeviceListPosition {
t.DeviceListPosition = other.DeviceListPosition t.DeviceListPosition = other.DeviceListPosition
} }
if other.NotificationDataPosition > t.NotificationDataPosition {
t.NotificationDataPosition = other.NotificationDataPosition
}
} }
type TopologyToken struct { type TopologyToken struct {

View file

@ -9,10 +9,10 @@ import (
func TestSyncTokens(t *testing.T) { func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{ shouldPass := map[string]string{
"s4_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0}.String(), "s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(),
"s3_1_0_0_0_0_2": StreamingToken{3, 1, 0, 0, 0, 0, 2}.String(), "s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(),
"s3_1_2_3_5_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0}.String(), "s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(),
"t3_1": TopologyToken{3, 1}.String(), "t3_1": TopologyToken{3, 1}.String(),
} }
for a, b := range shouldPass { for a, b := range shouldPass {