From 2582a054513b805da07c01ee37d19dc7bde4cfb0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 10 Feb 2022 11:45:05 +0000 Subject: [PATCH] NATS JetStream --- go.mod | 5 +- go.sum | 42 +++++++++++++++ pushserver/consumers/clientapi.go | 71 +++++++++++++------------ pushserver/consumers/eduserver.go | 58 ++++++++++---------- pushserver/consumers/roomserver.go | 55 ++++++++++--------- pushserver/consumers/roomserver_test.go | 22 ++++---- pushserver/producers/syncapi.go | 45 ++++++++++------ pushserver/pushserver.go | 20 ++++--- pushserver/storage/interface.go | 2 - setup/jetstream/streams.go | 6 +++ syncapi/consumers/pushserver.go | 54 +++++++++---------- syncapi/storage/interface.go | 6 +-- syncapi/streams/streams.go | 12 +---- syncapi/syncapi.go | 2 +- syncapi/types/types.go | 27 ++++++---- syncapi/types/types_test.go | 8 +-- 16 files changed, 245 insertions(+), 190 deletions(-) diff --git a/go.mod b/go.mod index a1dc04084..40496c074 100644 --- a/go.mod +++ b/go.mod @@ -11,21 +11,21 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/MFAshby/stdemuxerhook v1.0.0 github.com/Masterminds/semver/v3 v3.1.1 + github.com/Shopify/sarama v1.31.1 github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/codeclysm/extract v2.2.0+incompatible github.com/containerd/containerd v1.5.9 // indirect github.com/docker/docker v20.10.12+incompatible github.com/docker/go-connections v0.4.0 - github.com/frankban/quicktest v1.14.0 // indirect github.com/getsentry/sentry-go v0.12.0 github.com/gologme/log v1.3.0 + github.com/google/go-cmp v0.5.6 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 github.com/h2non/filetype v1.1.3 // indirect github.com/hashicorp/golang-lru v0.5.4 github.com/json-iterator/go v1.1.12 // indirect github.com/juju/testing v0.0.0-20211215003918-77eb13d6cad2 // indirect - github.com/klauspost/compress v1.14.2 // indirect github.com/lib/pq v1.10.4 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-circuit v0.4.0 @@ -44,6 +44,7 @@ require ( github.com/matrix-org/gomatrixserverlib v0.0.0-20220209202448-9805ef634335 github.com/matrix-org/pinecone v0.0.0-20220121094951-351265543ddf github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 + github.com/matryer/is v1.4.0 github.com/mattn/go-sqlite3 v1.14.10 github.com/morikuni/aec v1.0.0 // indirect github.com/nats-io/nats-server/v2 v2.3.2 diff --git a/go.sum b/go.sum index 9063cf633..79c0a8eb5 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,10 @@ github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEk github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= +github.com/Shopify/sarama v1.31.1 h1:uxwJ+p4isb52RyV83MCJD8v2wJ/HBxEGMmG/8+sEzG0= +github.com/Shopify/sarama v1.31.1/go.mod h1:99E1xQ1Ql2bYcuJfwdXY3cE17W8+549Ty8PG/11BDqY= +github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= +github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -344,6 +348,12 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -364,6 +374,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNp github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= github.com/frankban/quicktest v1.0.0/go.mod h1:R98jIehRai+d1/3Hv2//jOVCTJhW1VBavT6B6CuGq2k= github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= @@ -481,6 +493,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gologme/log v1.2.0/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/gologme/log v1.3.0 h1:l781G4dE+pbigClDSDzSaaYKtiueHCILUa/qSDsmHAo= github.com/gologme/log v1.3.0/go.mod h1:yKT+DvIPdDdDoPtqFrFxheooyVmoqi0BAsw+erN3wA4= @@ -533,6 +547,8 @@ github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -559,6 +575,8 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -644,6 +662,18 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -1214,6 +1244,8 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9 github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1266,6 +1298,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -1391,6 +1425,7 @@ github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w= @@ -1421,6 +1456,9 @@ github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= +github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= @@ -1503,6 +1541,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= @@ -1511,6 +1550,7 @@ golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5 golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220128200615-198e4374d7ed/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a h1:atOEWVSedO4ksXBe/UrlbSLVxQQ9RxM/tT2Jy10IaHo= golang.org/x/crypto v0.0.0-20220209195652-db638375bc3a/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1748,6 +1788,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7-0.20210503195748-5c7c50ebbd4f/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= @@ -1983,6 +2024,7 @@ gopkg.in/yaml.v2 v2.0.0-20170712054546-1be3d31502d6/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pushserver/consumers/clientapi.go b/pushserver/consumers/clientapi.go index f27cc6a11..898d3bd45 100644 --- a/pushserver/consumers/clientapi.go +++ b/pushserver/consumers/clientapi.go @@ -4,25 +4,29 @@ import ( "context" "encoding/json" - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/pushserver/util" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) type OutputClientDataConsumer struct { + ctx context.Context cfg *config.PushServer - rsConsumer *internal.ContinualConsumer + jetstream nats.JetStreamContext + durable string db storage.Database pgClient pushgateway.Client + ServerName gomatrixserverlib.ServerName + topic string userAPI uapi.UserInternalAPI syncProducer *producers.SyncAPI } @@ -30,49 +34,48 @@ type OutputClientDataConsumer struct { func NewOutputClientDataConsumer( process *process.ProcessContext, cfg *config.PushServer, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, pgClient pushgateway.Client, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPI, ) *OutputClientDataConsumer { - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "pushserver/clientapi", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)), - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputClientDataConsumer{ + return &OutputClientDataConsumer{ + ctx: process.Context(), cfg: cfg, - rsConsumer: &consumer, + jetstream: js, db: store, + ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("PushServerClientAPIConsumer"), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), pgClient: pgClient, userAPI: userAPI, syncProducer: syncProducer, } - consumer.ProcessMessage = s.onMessage - return s } func (s *OutputClientDataConsumer) Start() error { - return s.rsConsumer.Start() + if err := jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ); err != nil { + return err + } + return nil } -func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { - ctx := context.Background() - +func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { var event eventutil.AccountData - if err := json.Unmarshal(msg.Value, &event); err != nil { + if err := json.Unmarshal(msg.Data, &event); err != nil { log.WithError(err).Error("pushserver clientapi consumer: message parse failure") - return nil + return true } if event.Type != mFullyRead { - return nil + return true } - userID := string(msg.Key) + userID := string(msg.Header.Get("user_id")) localpart, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { log.WithFields(log.Fields{ @@ -80,16 +83,16 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": event.RoomID, "event_type": event.Type, }).WithError(err).Error("pushserver clientapi consumer: SplitID failure") - return nil + return true } - if domain != s.cfg.Matrix.ServerName { + if domain != s.ServerName { log.WithFields(log.Fields{ "user_id": userID, "room_id": event.RoomID, "event_type": event.Type, }).Error("pushserver clientapi consumer: not a local user") - return nil + return true } log.WithFields(log.Fields{ @@ -110,7 +113,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": event.RoomID, "event_type": event.Type, }).WithError(err).Error("pushserver clientapi consumer: failed to query account data") - return nil + return false } ad, ok := userRes.RoomAccountData[event.RoomID] if !ok { @@ -118,7 +121,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "localpart": localpart, "room_id": event.RoomID, }).Errorf("pushserver clientapi consumer: room not found in account data response: %#v", userRes.RoomAccountData) - return nil + return true } bs, ok := ad[mFullyRead] if !ok { @@ -126,7 +129,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "localpart": localpart, "room_id": event.RoomID, }).Errorf("pushserver clientapi consumer: m.fully_read not found in account data: %#v", ad) - return nil + return true } var data fullyReadAccountData if err = json.Unmarshal([]byte(bs), &data); err != nil { @@ -134,7 +137,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "localpart": localpart, "room_id": event.RoomID, }).WithError(err).Error("pushserver clientapi consumer: json.Unmarshal of m.fully_read failed") - return nil + return true } // TODO: we cannot know if this EventID caused a notification, so @@ -147,7 +150,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": event.RoomID, "event_id": data.EventID, }).WithError(err).Errorf("pushserver clientapi consumer: DeleteNotificationsUpTo failed") - return nil + return false } if deleted { @@ -157,7 +160,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": event.RoomID, "event_id": data.EventID, }).WithError(err).Error("pushserver clientapi consumer: NotifyUserCounts failed") - return nil + return false } if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, event.RoomID); err != nil { @@ -166,11 +169,11 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": event.RoomID, "event_id": data.EventID, }).WithError(err).Errorf("pushserver clientapi consumer: GetAndSendNotificationData failed") - return nil + return false } } - return nil + return true } // mFullyRead is the account data type for the marker for the event up diff --git a/pushserver/consumers/eduserver.go b/pushserver/consumers/eduserver.go index b6e66897d..2beae5783 100644 --- a/pushserver/consumers/eduserver.go +++ b/pushserver/consumers/eduserver.go @@ -3,75 +3,75 @@ package consumers import ( "context" "encoding/json" - "fmt" - "github.com/Shopify/sarama" eduapi "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/pushserver/producers" "github.com/matrix-org/dendrite/pushserver/storage" "github.com/matrix-org/dendrite/pushserver/util" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) type OutputReceiptEventConsumer struct { + ctx context.Context cfg *config.PushServer - rsConsumer *internal.ContinualConsumer + jetstream nats.JetStreamContext + durable string db storage.Database pgClient pushgateway.Client + receiptTopic string syncProducer *producers.SyncAPI } +// NewOutputReceiptEventConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. func NewOutputReceiptEventConsumer( process *process.ProcessContext, cfg *config.PushServer, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, pgClient pushgateway.Client, syncProducer *producers.SyncAPI, ) *OutputReceiptEventConsumer { - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "pushserver/eduserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent)), - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputReceiptEventConsumer{ + return &OutputReceiptEventConsumer{ + ctx: process.Context(), cfg: cfg, - rsConsumer: &consumer, + jetstream: js, db: store, + durable: cfg.Matrix.JetStream.Durable("PushServerEDUServerConsumer"), + receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), pgClient: pgClient, syncProducer: syncProducer, } - consumer.ProcessMessage = s.onMessage - return s } func (s *OutputReceiptEventConsumer) Start() error { - return s.rsConsumer.Start() + if err := jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.receiptTopic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ); err != nil { + return err + } + return nil } -func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - ctx := context.Background() - +func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { var event eduapi.OutputReceiptEvent - if err := json.Unmarshal(msg.Value, &event); err != nil { + if err := json.Unmarshal(msg.Data, &event); err != nil { log.WithError(err).Errorf("pushserver EDU consumer: message parse failure") - return nil + return true } localpart, domain, err := gomatrixserverlib.SplitID('@', event.UserID) if err != nil { - return err + return true } - if domain != s.cfg.Matrix.ServerName { - return fmt.Errorf("pushserver EDU consumer: not a local user: %v", event.UserID) + return true } log.WithFields(log.Fields{ @@ -91,7 +91,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro "room_id": event.RoomID, "event_id": event.EventID, }).WithError(err).Error("pushserver EDU consumer") - return nil + return false } if updated { @@ -101,7 +101,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro "room_id": event.RoomID, "event_id": event.EventID, }).WithError(err).Error("pushserver EDU consumer: GetAndSendNotificationData failed") - return nil + return false } if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { @@ -110,10 +110,10 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro "room_id": event.RoomID, "event_id": event.EventID, }).WithError(err).Error("pushserver EDU consumer: NotifyUserCounts failed") - return nil + return false } } - return nil + return true } diff --git a/pushserver/consumers/roomserver.go b/pushserver/consumers/roomserver.go index c0271e683..857bd0916 100644 --- a/pushserver/consumers/roomserver.go +++ b/pushserver/consumers/roomserver.go @@ -7,8 +7,7 @@ import ( "strings" "time" - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushrules" @@ -19,62 +18,66 @@ import ( "github.com/matrix-org/dendrite/pushserver/util" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) type OutputRoomEventConsumer struct { + ctx context.Context cfg *config.PushServer - rsAPI rsapi.RoomserverInternalAPI psAPI api.PushserverInternalAPI - pgClient pushgateway.Client - rsConsumer *internal.ContinualConsumer + rsAPI rsapi.RoomserverInternalAPI + jetstream nats.JetStreamContext + durable string db storage.Database + queues *queue.OutgoingQueues + topic string + pgClient pushgateway.Client syncProducer *producers.SyncAPI } func NewOutputRoomEventConsumer( process *process.ProcessContext, cfg *config.PushServer, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, pgClient pushgateway.Client, psAPI api.PushserverInternalAPI, rsAPI rsapi.RoomserverInternalAPI, syncProducer *producers.SyncAPI, ) *OutputRoomEventConsumer { - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "pushserver/roomserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputRoomEventConsumer{ + return &OutputRoomEventConsumer{ + ctx: process.Context(), cfg: cfg, - rsConsumer: &consumer, + jetstream: js, db: store, - rsAPI: rsAPI, - psAPI: psAPI, + durable: cfg.Matrix.JetStream.Durable("PushServerClientAPIConsumer"), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), pgClient: pgClient, + psAPI: psAPI, + rsAPI: rsAPI, syncProducer: syncProducer, } - consumer.ProcessMessage = s.onMessage - return s } func (s *OutputRoomEventConsumer) Start() error { - return s.rsConsumer.Start() + if err := jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ); err != nil { + return err + } + return nil } -func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - ctx := context.Background() - +func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { var output rsapi.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { + if err := json.Unmarshal(msg.Data, &output); err != nil { log.WithError(err).Errorf("pushserver consumer: message parse failure") - return nil + return true } log.WithFields(log.Fields{ @@ -104,7 +107,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Ignore old events, peeks, so on. } - return nil + return true } func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { diff --git a/pushserver/consumers/roomserver_test.go b/pushserver/consumers/roomserver_test.go index f6ed1eced..3512573e1 100644 --- a/pushserver/consumers/roomserver_test.go +++ b/pushserver/consumers/roomserver_test.go @@ -6,7 +6,6 @@ import ( "sync" "testing" - "github.com/Shopify/sarama" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/matrix-org/dendrite/internal/pushgateway" @@ -17,6 +16,7 @@ import ( rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" ) const serverName = gomatrixserverlib.ServerName("example.org") @@ -124,11 +124,13 @@ func TestOutputRoomEventConsumer(t *testing.T) { }}, pgClient.Reqs); diff != "" { t.Errorf("pgClient.NotifyHTTP Reqs: +got -want:\n%s", diff) } - if diff := cmp.Diff([]sarama.ProducerMessage{{ - Topic: "notificationDataTopic", - Key: sarama.StringEncoder("@alice:example.org"), - Value: sarama.ByteEncoder([]byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`)), - }}, messageSender.Messages, cmpopts.IgnoreUnexported(sarama.ProducerMessage{})); diff != "" { + msg := &nats.Msg{ + Subject: "notificationDataTopic", + Header: nats.Header{}, + Data: []byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`), + } + msg.Header.Set("user_id", "@alice:example.org") + if diff := cmp.Diff([]*nats.Msg{msg}, messageSender.Messages, cmpopts.IgnoreUnexported(nats.Msg{})); diff != "" { t.Errorf("SendMessage Messages: +got -want:\n%s", diff) } } @@ -243,10 +245,10 @@ func mustParseHeaderedEventJSON(s string) *gomatrixserverlib.HeaderedEvent { } type fakeMessageSender struct { - Messages []sarama.ProducerMessage + Messages []*nats.Msg } -func (s *fakeMessageSender) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { - s.Messages = append(s.Messages, *msg) - return 0, 0, nil +func (s *fakeMessageSender) PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { + s.Messages = append(s.Messages, msg) + return nil, nil } diff --git a/pushserver/producers/syncapi.go b/pushserver/producers/syncapi.go index 61ad89527..0ef5f3289 100644 --- a/pushserver/producers/syncapi.go +++ b/pushserver/producers/syncapi.go @@ -7,22 +7,28 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) +type JetStreamPublisher interface { + PublishMsg(*nats.Msg, ...nats.PubOpt) (*nats.PubAck, error) +} + // SyncAPI produces messages for the Sync API server to consume. type SyncAPI struct { db storage.Database - producer MessageSender + producer JetStreamPublisher clientDataTopic string notificationDataTopic string } -func NewSyncAPI(db storage.Database, producer MessageSender, clientDataTopic string, notificationDataTopic string) *SyncAPI { +func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic string, notificationDataTopic string) *SyncAPI { return &SyncAPI{ db: db, - producer: producer, + producer: js, clientDataTopic: clientDataTopic, notificationDataTopic: notificationDataTopic, } @@ -30,27 +36,29 @@ func NewSyncAPI(db storage.Database, producer MessageSender, clientDataTopic str // SendAccountData sends account data to the Sync API server. func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error { - var m sarama.ProducerMessage + m := &nats.Msg{ + Subject: p.clientDataTopic, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) data := eventutil.AccountData{ RoomID: roomID, Type: dataType, } - value, err := json.Marshal(data) + var err error + m.Data, err = json.Marshal(data) if err != nil { return err } - m.Topic = string(p.clientDataTopic) - m.Key = sarama.StringEncoder(userID) - m.Value = sarama.ByteEncoder(value) log.WithFields(log.Fields{ "user_id": userID, "room_id": roomID, "data_type": dataType, - }).Infof("Producing to topic %q", m.Topic) + }).Tracef("Producing to topic '%s'", p.clientDataTopic) - _, _, err = p.producer.SendMessage(&m) + _, err = p.producer.PublishMsg(m) return err } @@ -76,21 +84,24 @@ func (p *SyncAPI) GetAndSendNotificationData(ctx context.Context, userID, roomID // sendNotificationData sends data about unread notifications to the Sync API server. func (p *SyncAPI) sendNotificationData(userID string, data *eventutil.NotificationData) error { - value, err := json.Marshal(data) + m := &nats.Msg{ + Subject: p.notificationDataTopic, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + + var err error + m.Data, err = json.Marshal(data) if err != nil { return err } - var m sarama.ProducerMessage - m.Topic = string(p.notificationDataTopic) - m.Key = sarama.StringEncoder(userID) - m.Value = sarama.ByteEncoder(value) log.WithFields(log.Fields{ "user_id": userID, "room_id": data.RoomID, - }).Infof("Producing to topic %q", m.Topic) + }).Tracef("Producing to topic '%s'", p.clientDataTopic) - _, _, err = p.producer.SendMessage(&m) + _, err = p.producer.PublishMsg(m) return err } diff --git a/pushserver/pushserver.go b/pushserver/pushserver.go index 14c3a5ae4..f09fa2169 100644 --- a/pushserver/pushserver.go +++ b/pushserver/pushserver.go @@ -11,7 +11,7 @@ import ( "github.com/matrix-org/dendrite/pushserver/storage" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -32,23 +32,21 @@ func NewInternalAPI( rsAPI roomserverAPI.RoomserverInternalAPI, userAPI uapi.UserInternalAPI, ) api.PushserverInternalAPI { - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) - db, err := storage.Open(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to push server db") } - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + js := jetstream.Prepare(&cfg.Matrix.JetStream) + syncProducer := producers.NewSyncAPI( - db, - producer, + db, js, // TODO: user API should handle syncs for account data. Right now, // it's handled by clientapi, and hence uses its topic. When user // API handles it for all account data, we can remove it from // here. - cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), - cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData), + cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), ) psAPI := internal.NewPushserverAPI( @@ -56,21 +54,21 @@ func NewInternalAPI( ) caConsumer := consumers.NewOutputClientDataConsumer( - process, cfg, consumer, db, pgClient, userAPI, syncProducer, + process, cfg, js, db, pgClient, userAPI, syncProducer, ) if err := caConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start push server clientapi consumer") } eduConsumer := consumers.NewOutputReceiptEventConsumer( - process, cfg, consumer, db, pgClient, syncProducer, + process, cfg, js, db, pgClient, syncProducer, ) if err := eduConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start push server EDU consumer") } rsConsumer := consumers.NewOutputRoomEventConsumer( - process, cfg, consumer, db, pgClient, psAPI, rsAPI, syncProducer, + process, cfg, js, db, pgClient, psAPI, rsAPI, syncProducer, ) if err := rsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start push server room server consumer") diff --git a/pushserver/storage/interface.go b/pushserver/storage/interface.go index 6263db435..492e9aed7 100644 --- a/pushserver/storage/interface.go +++ b/pushserver/storage/interface.go @@ -3,13 +3,11 @@ package storage import ( "context" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/pushserver/api" "github.com/matrix-org/dendrite/pushserver/storage/tables" ) type Database interface { - internal.PartitionStorer UpsertPusher(ctx context.Context, pusher api.Pusher, localpart string) error GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) RemovePusher(ctx context.Context, appId, pushkey, localpart string) error diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 5810a2a91..418e019f4 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -18,6 +18,7 @@ var ( OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputTypingEvent = "OutputTypingEvent" OutputClientData = "OutputClientData" + OutputNotificationData = "OutputNotificationData" OutputReceiptEvent = "OutputReceiptEvent" ) @@ -58,4 +59,9 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputNotificationData, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, } diff --git a/syncapi/consumers/pushserver.go b/syncapi/consumers/pushserver.go index 3b35dd4d7..38be7f0d1 100644 --- a/syncapi/consumers/pushserver.go +++ b/syncapi/consumers/pushserver.go @@ -18,25 +18,28 @@ import ( "context" "encoding/json" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputNotificationDataConsumer consumes events that originated in // the Push server. type OutputNotificationDataConsumer struct { - consumer *internal.ContinualConsumer - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + notifier *notifier.Notifier + stream types.StreamProvider } // NewOutputNotificationDataConsumer creates a new consumer. Call @@ -44,49 +47,44 @@ type OutputNotificationDataConsumer struct { func NewOutputNotificationDataConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputNotificationDataConsumer { - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/pushserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData)), - Consumer: kafkaConsumer, - PartitionStore: store, - } s := &OutputNotificationDataConsumer{ - consumer: &consumer, - db: store, - notifier: notifier, - stream: stream, + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), + db: store, + notifier: notifier, + stream: stream, } - consumer.ProcessMessage = s.onMessage - return s } // Start starts consumption. func (s *OutputNotificationDataConsumer) Start() error { - return s.consumer.Start() + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) } // onMessage is called when the Sync server receives a new event from // the push server. It is not safe for this function to be called from // multiple goroutines, or else the sync stream position may race and // be incorrectly calculated. -func (s *OutputNotificationDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { - ctx := context.Background() - - userID := string(msg.Key) +func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + userID := string(msg.Header.Get(jetstream.UserID)) // Parse out the event JSON var data eventutil.NotificationData - if err := json.Unmarshal(msg.Value, &data); err != nil { + if err := json.Unmarshal(msg.Data, &data); err != nil { sentry.CaptureException(err) log.WithField("user_id", userID).WithError(err).Error("push server consumer: message parse failure") - return nil + return true } streamPos, err := s.db.UpsertRoomUnreadNotificationCounts(ctx, userID, data.RoomID, data.UnreadNotificationCount, data.UnreadHighlightCount) @@ -107,5 +105,5 @@ func (s *OutputNotificationDataConsumer) onMessage(msg *sarama.ConsumerMessage) "streamPos": streamPos, }).Info("Received data from Push server") - return nil + return true } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 4c8251da5..4dd2bbc45 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -18,12 +18,8 @@ import ( "context" eduAPI "github.com/matrix-org/dendrite/eduserver/api" - -<<<<<<< HEAD - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/eventutil" -======= ->>>>>>> main + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 6882ec3cb..893909872 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -12,24 +12,14 @@ import ( ) type Streams struct { -<<<<<<< HEAD PDUStreamProvider types.StreamProvider TypingStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider + DeviceListStreamProvider types.StreamProvider NotificationDataStreamProvider types.StreamProvider - DeviceListStreamProvider types.PartitionedStreamProvider -======= - PDUStreamProvider types.StreamProvider - TypingStreamProvider types.StreamProvider - ReceiptStreamProvider types.StreamProvider - InviteStreamProvider types.StreamProvider - SendToDeviceStreamProvider types.StreamProvider - AccountDataStreamProvider types.StreamProvider - DeviceListStreamProvider types.StreamProvider ->>>>>>> main } func NewSyncStreamProviders( diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index ae36d0cd9..3462fed04 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -89,7 +89,7 @@ func AddPublicRoutes( } notificationConsumer := consumers.NewOutputNotificationDataConsumer( - process, cfg, consumer, syncDB, notifier, streams.NotificationDataStreamProvider, + process, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider, ) if err = notificationConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start notification data consumer") diff --git a/syncapi/types/types.go b/syncapi/types/types.go index cacd3eba4..1a304d8df 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -95,13 +95,14 @@ const ( ) type StreamingToken struct { - PDUPosition StreamPosition - TypingPosition StreamPosition - ReceiptPosition StreamPosition - SendToDevicePosition StreamPosition - InvitePosition StreamPosition - AccountDataPosition StreamPosition - DeviceListPosition StreamPosition + PDUPosition StreamPosition + TypingPosition StreamPosition + ReceiptPosition StreamPosition + SendToDevicePosition StreamPosition + InvitePosition StreamPosition + AccountDataPosition StreamPosition + DeviceListPosition StreamPosition + NotificationDataPosition StreamPosition } // This will be used as a fallback by json.Marshal. @@ -117,10 +118,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, - t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition, + t.InvitePosition, t.AccountDataPosition, + t.DeviceListPosition, t.NotificationDataPosition, ) return posStr } @@ -142,12 +144,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.DeviceListPosition > other.DeviceListPosition: return true + case t.NotificationDataPosition > other.NotificationDataPosition: + return true } return false } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition == 0 + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0 } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -185,6 +189,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.DeviceListPosition > t.DeviceListPosition { t.DeviceListPosition = other.DeviceListPosition } + if other.NotificationDataPosition > t.NotificationDataPosition { + t.NotificationDataPosition = other.NotificationDataPosition + } } type TopologyToken struct { diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index cda178b37..ff78bfb9d 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -9,10 +9,10 @@ import ( func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0}.String(), - "s3_1_0_0_0_0_2": StreamingToken{3, 1, 0, 0, 0, 0, 2}.String(), - "s3_1_2_3_5_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(), + "s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(), + "s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass {