diff --git a/appservice/appservice.go b/appservice/appservice.go index 782e090a1..4ff42360b 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -32,7 +32,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" ) @@ -58,7 +58,7 @@ func NewInternalAPI( }, }, } - consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) + consumer, _ := jetstream.SetupConsumerProducer(&base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 2ad7f68fe..f9790a0b1 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" @@ -52,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "appservice/roomserver", - Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), + Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: appserviceDB, } diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index e30057ed4..000039cc0 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -264,8 +264,7 @@ func (m *DendriteMonolith) Start() { cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) cfg.Global.PrivateKey = sk cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-naffka.db", m.StorageDirectory, prefix)) + cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("file:%s/%s", m.StorageDirectory, prefix)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-account.db", m.StorageDirectory, prefix)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-device.db", m.StorageDirectory, prefix)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-mediaapi.db", m.CacheDirectory, prefix)) diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index d5aedeac6..441cc26c8 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -85,8 +85,7 @@ func (m *DendriteMonolith) Start() { cfg.Global.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Global.PrivateKey = ygg.PrivateKey() cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-naffka.db", m.StorageDirectory)) + cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("file:%s/", m.StorageDirectory)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-account.db", m.StorageDirectory)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-device.db", m.StorageDirectory)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-mediaapi.db", m.StorageDirectory)) diff --git a/clientapi/auth/user_interactive.go b/clientapi/auth/user_interactive.go index 839637351..9e0db68e0 100644 --- a/clientapi/auth/user_interactive.go +++ b/clientapi/auth/user_interactive.go @@ -78,7 +78,7 @@ func (r *Login) Username() string { if r.Identifier.Type == "m.id.user" { return r.Identifier.User } - // deprecated but without it Riot iOS won't log in + // deprecated but without it Element iOS won't log in return r.User } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 562d89d28..ffab1337d 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -26,7 +26,7 @@ import ( keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/gomatrixserverlib" @@ -49,11 +49,11 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ Producer: producer, - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), } routing.Setup( diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index bc679631a..b85cfde03 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -47,6 +47,37 @@ func SendBan( if reqErr != nil { return *reqErr } + + errRes := checkMemberInRoom(req.Context(), rsAPI, device.UserID, roomID) + if errRes != nil { + return *errRes + } + + plEvent := roomserverAPI.GetStateEvent(req.Context(), rsAPI, roomID, gomatrixserverlib.StateKeyTuple{ + EventType: gomatrixserverlib.MRoomPowerLevels, + StateKey: "", + }) + if plEvent == nil { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("You don't have permission to ban this user, no power_levels event in this room."), + } + } + pl, err := plEvent.PowerLevels() + if err != nil { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("You don't have permission to ban this user, the power_levels event for this room is malformed so auth checks cannot be performed."), + } + } + allowedToBan := pl.UserLevel(device.UserID) >= pl.Ban + if !allowedToBan { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden("You don't have permission to ban this user, power level too low."), + } + } + return sendMembership(req.Context(), accountDB, device, roomID, "ban", body.Reason, cfg, body.UserID, evTime, roomVer, rsAPI, asAPI) } diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 6b0e57d8b..8ebe9541c 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -122,8 +122,8 @@ func main() { cfg.Global.ServerName = "p2p" cfg.Global.PrivateKey = privKey cfg.Global.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName)) - cfg.Global.Kafka.UseNaffka = true cfg.FederationSender.FederationMaxRetries = 6 + cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", *instanceName)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) @@ -132,7 +132,6 @@ func main() { cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName)) cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836"} cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", *instanceName)) diff --git a/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go b/cmd/dendrite-demo-pinecone/embed/embed_elementweb.go similarity index 90% rename from cmd/dendrite-demo-pinecone/embed/embed_riotweb.go rename to cmd/dendrite-demo-pinecone/embed/embed_elementweb.go index d25745ca6..4d2da55cb 100644 --- a/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go +++ b/cmd/dendrite-demo-pinecone/embed/embed_elementweb.go @@ -1,4 +1,4 @@ -// +build riotweb +// +build elementweb package embed @@ -12,8 +12,8 @@ import ( "github.com/tidwall/sjson" ) -// From within the Riot Web directory: -// go run github.com/mjibson/esc -o /path/to/dendrite/internal/embed/fs_riotweb.go -private -pkg embed . +// From within the Element Web directory: +// go run github.com/mjibson/esc -o /path/to/dendrite/internal/embed/fs_elementweb.go -private -pkg embed . var cssFile = regexp.MustCompile("\\.css$") var jsFile = regexp.MustCompile("\\.js$") @@ -68,7 +68,7 @@ func Embed(rootMux *mux.Router, listenPort int, serverName string) { } js, _ := sjson.SetBytes(buf, "default_server_config.m\\.homeserver.base_url", url) js, _ = sjson.SetBytes(js, "default_server_config.m\\.homeserver.server_name", serverName) - js, _ = sjson.SetBytes(js, "brand", fmt.Sprintf("Riot %s", serverName)) + js, _ = sjson.SetBytes(js, "brand", fmt.Sprintf("Element %s", serverName)) js, _ = sjson.SetBytes(js, "disable_guests", true) js, _ = sjson.SetBytes(js, "disable_3pid_login", true) js, _ = sjson.DeleteBytes(js, "welcomeUserId") @@ -76,7 +76,7 @@ func Embed(rootMux *mux.Router, listenPort int, serverName string) { }) fmt.Println("*-------------------------------*") - fmt.Println("| This build includes Riot Web! |") + fmt.Println("| This build includes Element Web! |") fmt.Println("*-------------------------------*") fmt.Println("Point your browser to:", url) fmt.Println() diff --git a/cmd/dendrite-demo-pinecone/embed/embed_other.go b/cmd/dendrite-demo-pinecone/embed/embed_other.go index 598881148..04c2188c3 100644 --- a/cmd/dendrite-demo-pinecone/embed/embed_other.go +++ b/cmd/dendrite-demo-pinecone/embed/embed_other.go @@ -1,4 +1,4 @@ -// +build !riotweb +// +build !elementweb package embed diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 2712ed4a1..e6a88fcc9 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -144,7 +144,7 @@ func main() { cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) cfg.Global.PrivateKey = sk cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Global.Kafka.UseNaffka = true + cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", *instanceName)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) @@ -154,7 +154,6 @@ func main() { cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} if err := cfg.Derive(); err != nil { panic(err) diff --git a/cmd/dendrite-demo-yggdrasil/embed/embed_riotweb.go b/cmd/dendrite-demo-yggdrasil/embed/embed_elementweb.go similarity index 85% rename from cmd/dendrite-demo-yggdrasil/embed/embed_riotweb.go rename to cmd/dendrite-demo-yggdrasil/embed/embed_elementweb.go index 9ee4e626b..8d49c553a 100644 --- a/cmd/dendrite-demo-yggdrasil/embed/embed_riotweb.go +++ b/cmd/dendrite-demo-yggdrasil/embed/embed_elementweb.go @@ -1,4 +1,4 @@ -// +build riotweb +// +build elementweb package embed @@ -12,8 +12,8 @@ import ( "github.com/tidwall/sjson" ) -// From within the Riot Web directory: -// go run github.com/mjibson/esc -o /path/to/dendrite/internal/embed/fs_riotweb.go -private -pkg embed . +// From within the Element Web directory: +// go run github.com/mjibson/esc -o /path/to/dendrite/internal/embed/fs_elementweb.go -private -pkg embed . var cssFile = regexp.MustCompile("\\.css$") var jsFile = regexp.MustCompile("\\.js$") @@ -68,16 +68,16 @@ func Embed(rootMux *mux.Router, listenPort int, serverName string) { } js, _ := sjson.SetBytes(buf, "default_server_config.m\\.homeserver.base_url", url) js, _ = sjson.SetBytes(js, "default_server_config.m\\.homeserver.server_name", serverName) - js, _ = sjson.SetBytes(js, "brand", fmt.Sprintf("Riot %s", serverName)) + js, _ = sjson.SetBytes(js, "brand", fmt.Sprintf("Element %s", serverName)) js, _ = sjson.SetBytes(js, "disable_guests", true) js, _ = sjson.SetBytes(js, "disable_3pid_login", true) js, _ = sjson.DeleteBytes(js, "welcomeUserId") _, _ = w.Write(js) }) - fmt.Println("*-------------------------------*") - fmt.Println("| This build includes Riot Web! |") - fmt.Println("*-------------------------------*") + fmt.Println("*----------------------------------*") + fmt.Println("| This build includes Element Web! |") + fmt.Println("*----------------------------------*") fmt.Println("Point your browser to:", url) fmt.Println() } diff --git a/cmd/dendrite-demo-yggdrasil/embed/embed_other.go b/cmd/dendrite-demo-yggdrasil/embed/embed_other.go index 598881148..04c2188c3 100644 --- a/cmd/dendrite-demo-yggdrasil/embed/embed_other.go +++ b/cmd/dendrite-demo-yggdrasil/embed/embed_other.go @@ -1,4 +1,4 @@ -// +build !riotweb +// +build !elementweb package embed diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 95207a601..3b8ceca24 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -74,7 +74,7 @@ func main() { cfg.Global.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Global.PrivateKey = ygg.PrivateKey() cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Global.Kafka.UseNaffka = true + cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", *instanceName)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) @@ -84,7 +84,6 @@ func main() { cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836"} cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", *instanceName)) if err = cfg.Derive(); err != nil { diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go index 25e496909..1a97e7784 100644 --- a/cmd/dendritejs-pinecone/main.go +++ b/cmd/dendritejs-pinecone/main.go @@ -162,8 +162,7 @@ func main() { cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db" cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db" cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db" - cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db" + cfg.Global.JetStream.StoragePath = "file:/idb/dendritejs/" cfg.Global.TrustedIDServers = []string{} cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Global.PrivateKey = sk diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index d5a845ae0..f7faf55e8 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -173,8 +173,7 @@ func main() { cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db" cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db" cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db" - cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db" + cfg.Global.JetStream.StoragePath = "file:/idb/dendritejs/" cfg.Global.TrustedIDServers = []string{ "matrix.org", "vector.im", } diff --git a/cmd/generate-config/main.go b/cmd/generate-config/main.go index bd70cbc5d..b8eeb8fd4 100644 --- a/cmd/generate-config/main.go +++ b/cmd/generate-config/main.go @@ -70,6 +70,7 @@ func main() { // don't hit matrix.org when running tests!!! cfg.SigningKeyServer.KeyPerspectives = config.KeyPerspectives{} cfg.UserAPI.BCryptCost = bcrypt.MinCost + cfg.Global.JetStream.InMemory = true } j, err := yaml.Marshal(cfg) diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 7cc405108..2e8ef1893 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -23,8 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/input" "github.com/matrix-org/dendrite/eduserver/inthttp" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -43,15 +42,15 @@ func NewInternalAPI( ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) return &input.EDUServerInputAPI{ Cache: eduCache, UserAPI: userAPI, Producer: producer, - OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index 505a11dae..3d22eb07b 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -23,8 +23,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { cfg.Global.KeyID = gomatrixserverlib.KeyID("ed25519:auto") cfg.Global.ServerName = gomatrixserverlib.ServerName("localhost") cfg.Global.PrivateKey = privKey - cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Database.ConnectionString = config.DataSource("file::memory:") + cfg.Global.JetStream.InMemory = true cfg.FederationSender.Database.ConnectionString = config.DataSource("file::memory:") base := setup.NewBaseDendrite(cfg, "Monolith", false) keyRing := &test.NopJSONVerifier{} diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index a8f850fb0..f0e1ae0d6 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // MakeJoin implements the /make_join API @@ -228,6 +229,21 @@ func SendJoin( } } + // Check that this is in fact a join event + membership, err := event.Membership() + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("missing content.membership key"), + } + } + if membership != gomatrixserverlib.Join { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("membership must be 'join'"), + } + } + // Check that the event is signed by the server sending the request. redacted := event.Redact() verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{ @@ -296,16 +312,26 @@ func SendJoin( // We are responsible for notifying other servers that the user has joined // the room, so set SendAsServer to cfg.Matrix.ServerName if !alreadyJoined { - if err = api.SendEvents( - httpReq.Context(), rsAPI, - api.KindNew, - []*gomatrixserverlib.HeaderedEvent{ - event.Headered(stateAndAuthChainResponse.RoomVersion), + var response api.InputRoomEventsResponse + rsAPI.InputRoomEvents(httpReq.Context(), &api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{ + { + Kind: api.KindNew, + Event: event.Headered(stateAndAuthChainResponse.RoomVersion), + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: string(cfg.Matrix.ServerName), + TransactionID: nil, + }, }, - cfg.Matrix.ServerName, - nil, - ); err != nil { - util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") + }, &response) + if response.ErrMsg != "" { + util.GetLogger(httpReq.Context()).WithField(logrus.ErrorKey, response.ErrMsg).Error("SendEvents failed") + if response.NotAllowed { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Forbidden(response.ErrMsg), + } + } return jsonerror.InternalServerError() } } diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index a5231004c..38f4ca76f 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // MakeLeave implements the /make_leave API @@ -174,6 +175,13 @@ func SendLeave( } } + if event.StateKey() == nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("missing state_key"), + } + } + // Check if the user has already left. If so, no-op! queryReq := &api.QueryLatestEventsAndStateRequest{ RoomID: roomID, @@ -240,7 +248,10 @@ func SendLeave( mem, err := event.Membership() if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("event.Membership failed") - return jsonerror.InternalServerError() + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("missing content.membership key"), + } } if mem != gomatrixserverlib.Leave { return util.JSONResponse{ @@ -252,16 +263,27 @@ func SendLeave( // Send the events to the room server. // We are responsible for notifying other servers that the user has left // the room, so set SendAsServer to cfg.Matrix.ServerName - if err = api.SendEvents( - httpReq.Context(), rsAPI, - api.KindNew, - []*gomatrixserverlib.HeaderedEvent{ - event.Headered(verRes.RoomVersion), + var response api.InputRoomEventsResponse + rsAPI.InputRoomEvents(httpReq.Context(), &api.InputRoomEventsRequest{ + InputRoomEvents: []api.InputRoomEvent{ + { + Kind: api.KindNew, + Event: event.Headered(verRes.RoomVersion), + AuthEventIDs: event.AuthEventIDs(), + SendAsServer: string(cfg.Matrix.ServerName), + TransactionID: nil, + }, }, - cfg.Matrix.ServerName, - nil, - ); err != nil { - util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") + }, &response) + + if response.ErrMsg != "" { + util.GetLogger(httpReq.Context()).WithField(logrus.ErrorKey, response.ErrMsg).WithField("not_allowed", response.NotAllowed).Error("producer.SendEvents failed") + if response.NotAllowed { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Forbidden(response.ErrMsg), + } + } return jsonerror.InternalServerError() } diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 9a1ec1e26..63a367f52 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -55,29 +56,29 @@ func NewOutputEDUConsumer( typingConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/typing", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), Consumer: kafkaConsumer, PartitionStore: store, }, sendToDeviceConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/sendtodevice", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), Consumer: kafkaConsumer, PartitionStore: store, }, receiptConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/receipt", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), Consumer: kafkaConsumer, PartitionStore: store, }, queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - TypingTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), - SendToDeviceTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), + TypingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + SendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), } c.typingConsumer.ProcessMessage = c.onTypingEvent c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index 9e146390a..4226d4922 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -53,7 +54,7 @@ func NewKeyChangeConsumer( consumer: &internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/keychange", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index f9c4a5c27..b145bc506 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -52,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/roomserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 0732c5d38..ee89d7822 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -25,7 +25,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/storage" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -61,7 +61,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + consumer, _ := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationSenderDB, base.ProcessContext, diff --git a/go.mod b/go.mod index a0816012f..b55da5476 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,21 @@ module github.com/matrix-org/dendrite +replace github.com/nats-io/nats-server/v2 => github.com/neilalexander/nats-server/v2 v2.3.3-0.20210714094623-648cf26af922 + +replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.11.1-0.20210715085246-cd5b4d5a89fe + require ( github.com/Arceliar/ironwood v0.0.0-20210619124114-6ad55cae5031 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/Masterminds/semver/v3 v3.1.1 - github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86 + github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32 github.com/Shopify/sarama v1.29.0 github.com/codeclysm/extract v2.2.0+incompatible github.com/containerd/containerd v1.5.2 // indirect github.com/docker/docker v20.10.7+incompatible github.com/docker/go-connections v0.4.0 - github.com/getsentry/sentry-go v0.10.0 + github.com/getsentry/sentry-go v0.7.0 github.com/gologme/log v1.2.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 @@ -19,7 +23,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 // indirect github.com/klauspost/compress v1.13.0 // indirect - github.com/lib/pq v1.9.0 + github.com/lib/pq v1.10.2 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-circuit v0.4.0 github.com/libp2p/go-libp2p-core v0.8.3 @@ -33,13 +37,13 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20210714141824-52d282133140 - github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 + github.com/matrix-org/gomatrixserverlib v0.0.0-20210719140634-f44a103bb12e github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb github.com/morikuni/aec v1.0.0 // indirect - github.com/nats-io/nats.go v1.11.0 + github.com/nats-io/nats-server/v2 v2.3.2 + github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 github.com/neilalexander/utp v0.1.1-0.20210705212447-691f29ad692b github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6 @@ -57,9 +61,10 @@ require ( github.com/uber/jaeger-lib v2.4.0+incompatible github.com/yggdrasil-network/yggdrasil-go v0.4.1-0.20210715083903-52309d094c00 go.uber.org/atomic v1.7.0 - golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a + golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 golang.org/x/mobile v0.0.0-20210220033013-bdb1ca9a1e08 golang.org/x/net v0.0.0-20210610132358-84b48f89b13b + golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 gopkg.in/h2non/bimg.v1 v1.1.5 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 6cb58d930..65b03c5f7 100644 --- a/go.sum +++ b/go.sum @@ -51,13 +51,14 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno= -github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo= +github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw= +github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw= github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= +github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= @@ -82,18 +83,16 @@ github.com/Microsoft/hcsshim/test v0.0.0-20201218223536-d3e5debf77da/go.mod h1:5 github.com/Microsoft/hcsshim/test v0.0.0-20210227013316-43a75bb4edd3/go.mod h1:mw7qgWloBUl75W/gVH3cQszUg1+gUITj7D6NY7ywVnY= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEkc1jNH01qxCWA= github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30= -github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86 h1:ZFbfRbhZDohUiouv361CC0XWTESwUlVicz/zgGSO964= -github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc= +github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32 h1:i3fOph9Hjleo6LbuqN9ODFxnwt7mOtYMpCGeC8qJN50= +github.com/S7evinK/saramajetstream v0.0.0-20210709110708-de6efc8c4a32/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= -github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= github.com/Shopify/sarama v1.29.0 h1:ARid8o8oieau9XrHI55f/L3EoRAhm9px6sonbD7yuUE= github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= @@ -126,7 +125,6 @@ github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY= github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g= github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/utp v0.1.0/go.mod h1:MDwc+vsGEq7RMw6lr2GKOEqjWny5hO5OZXRVNaBJ2Dk= -github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -375,6 +373,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= @@ -394,8 +393,8 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA= github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc= -github.com/getsentry/sentry-go v0.10.0 h1:6gwY+66NHKqyZrdi6O2jGdo7wGdo9b3B69E01NFgT5g= -github.com/getsentry/sentry-go v0.10.0/go.mod h1:kELm/9iCblqUYh+ZRML7PNdCvEuw24wBvJPYyi86cws= +github.com/getsentry/sentry-go v0.7.0 h1:MR2yfR4vFfv/2+iBuSnkdQwVg7N9cJzihZ6KJu7srwQ= +github.com/getsentry/sentry-go v0.7.0/go.mod h1:pLFpD2Y5RHIKF9Bw3KH6/68DeN2K/XBJd8awjdPnUwg= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= @@ -496,7 +495,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gologme/log v1.2.0 h1:Ya5Ip/KD6FX7uH0S31QO87nCCSucKtF44TLbTtO7V4c= @@ -662,8 +660,7 @@ github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbkg6SAB4att1aAwTmVIx/5gCVqeyUdI= github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0= -github.com/iris-contrib/jade v1.1.3/go.mod h1:H/geBymxJhShH5kecoiOCSssPX7QWYH7UaeZTSWddIk= -github.com/iris-contrib/pongo2 v0.0.1/go.mod h1:Ssh+00+3GAZqSQb30AvBRNxBx7rf0GqwkjqxNd0u65g= +github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI= github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= @@ -716,11 +713,13 @@ github.com/juju/clock v0.0.0-20190205081909-9c5c9712527c/go.mod h1:nD0vlnrUjcjJh github.com/juju/cmd v0.0.0-20171107070456-e74f39857ca0/go.mod h1:yWJQHl73rdSX4DHVKGqkAip+huBslxRwS8m9CrOLq18= github.com/juju/collections v0.0.0-20200605021417-0d0ec82b7271/go.mod h1:5XgO71dV1JClcOJE+4dzdn4HrI5LiyKd7PlVG6eZYhY= github.com/juju/errors v0.0.0-20150916125642-1b5e39b83d18/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= +github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/juju/errors v0.0.0-20200330140219-3fe23663418f h1:MCOvExGLpaSIzLYB4iQXEHP4jYVU6vmzLNQPdMVrxnM= github.com/juju/errors v0.0.0-20200330140219-3fe23663418f/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/juju/httpprof v0.0.0-20141217160036-14bf14c30767/go.mod h1:+MaLYz4PumRkkyHYeXJ2G5g5cIW0sli2bOfpmbaMV/g= github.com/juju/loggo v0.0.0-20170605014607-8232ab8918d9/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= +github.com/juju/loggo v0.0.0-20180524022052-584905176618/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= github.com/juju/loggo v0.0.0-20200526014432-9ce3a2e09b5e h1:FdDd7bdI6cjq5vaoYlK1mfQYfF9sF2VZw8VEZMsl5t8= github.com/juju/loggo v0.0.0-20200526014432-9ce3a2e09b5e/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= github.com/juju/mgo/v2 v2.0.0-20210302023703-70d5d206e208 h1:/WiCm+Vpj87e4QWuWwPD/bNE9kDrWCLvPBHOQNcG2+A= @@ -729,6 +728,7 @@ github.com/juju/mutex v0.0.0-20171110020013-1fe2a4bf0a3a/go.mod h1:Y3oOzHH8CQ0Pp github.com/juju/retry v0.0.0-20151029024821-62c620325291/go.mod h1:OohPQGsr4pnxwD5YljhQ+TZnuVRYpa5irjugL1Yuif4= github.com/juju/retry v0.0.0-20180821225755-9058e192b216/go.mod h1:OohPQGsr4pnxwD5YljhQ+TZnuVRYpa5irjugL1Yuif4= github.com/juju/testing v0.0.0-20180402130637-44801989f0f7/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= +github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= github.com/juju/testing v0.0.0-20190723135506-ce30eb24acd2/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 h1:yrhek184cGp0IRyHg0uV1khLaorNg6GtDLkry4oNNjE= github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098/go.mod h1:7lxZW0B50+xdGFkvhAb8bwAGt6IU87JB1H9w4t8MNVM= @@ -744,19 +744,17 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0= github.com/kardianos/minwinsvc v1.0.0/go.mod h1:Bgd0oc+D0Qo3bBytmNtyRKVlp85dAloLKhfxanPFFRc= -github.com/kataras/golog v0.0.10/go.mod h1:yJ8YKCmyL+nWjERB90Qwn+bdyBZsaQwU3bTVFgkFIp8= -github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYbq3UhfoFmE= -github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE= -github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro= -github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8= +github.com/kataras/golog v0.0.9/go.mod h1:12HJgwBIZFNGL0EJnMRhmvGA0PQGx8VFwrZtM4CqbAk= +github.com/kataras/iris/v12 v12.0.1/go.mod h1:udK4vLQKkdDqMGJJVd/msuMtN6hpYJhg/lSzuxjhO+U= +github.com/kataras/neffos v0.0.10/go.mod h1:ZYmJC07hQPW67eKuzlfY7SO3bC0mw83A3j6im82hfqw= +github.com/kataras/pio v0.0.0-20190103105442-ea782b38602d/go.mod h1:NV88laa9UiiDuX9AhMbDPkGYSPugBOV6yTZB1l2K9Z0= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.9.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= @@ -786,8 +784,8 @@ github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvf github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= -github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8= -github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= +github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= @@ -1031,10 +1029,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210625141222-bd2b7124cee8/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20210714141824-52d282133140 h1:EWOcw9M3zoXz45aLgaOCay31Xa2QzzMX6vRLh0xNbzY= -github.com/matrix-org/gomatrixserverlib v0.0.0-20210714141824-52d282133140/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= -github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU= -github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= +github.com/matrix-org/gomatrixserverlib v0.0.0-20210719140634-f44a103bb12e h1:dYPsAU1363AEbcohNzBJHV0CkbfVa8QiMNFib/i5pS8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20210719140634-f44a103bb12e/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b h1:5X5vdWQ13xrNkJVqaJHPsrt7rKkMJH5iac0EtfOuxSg= github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b/go.mod h1:CVlrvs1R5iz7Omy2GqAjJJKbACn07GZgUq1Gli18FYE= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= @@ -1059,7 +1055,6 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= -github.com/mattn/go-sqlite3 v1.14.2/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus= github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb h1:ax2vG2unlxsjwS7PMRo4FECIfAdQLowd6ejWYwPQhBo= github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= @@ -1067,7 +1062,8 @@ github.com/mattomatic/dijkstra v0.0.0-20130617153013-6f6d134eb237/go.mod h1:UOnL github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= +github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg= +github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= @@ -1177,20 +1173,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= -github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= -github.com/nats-io/nats-server/v2 v2.2.3 h1:tCuswUogVn/+SV5qfgOVnuHGBvAW7C1hMdsXghi/5ds= -github.com/nats-io/nats-server/v2 v2.2.3/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI= -github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nats.go v1.11.0 h1:L263PZkrmkRJRJT2YHU8GwWWvEvmr9/LUKuJTXsF32k= -github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= @@ -1201,6 +1187,10 @@ github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uY github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= +github.com/neilalexander/nats-server/v2 v2.3.3-0.20210714094623-648cf26af922 h1:dCWEi9ipR6plTXfj3G8ar0+Ukq6gAp7xvYWfuQQZg5A= +github.com/neilalexander/nats-server/v2 v2.3.3-0.20210714094623-648cf26af922/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY= +github.com/neilalexander/nats.go v1.11.1-0.20210715085246-cd5b4d5a89fe h1:0lmDLHkYtQOyRpX8CkB3h6aQ71soPvoUD/A/7WmkuLw= +github.com/neilalexander/nats.go v1.11.1-0.20210715085246-cd5b4d5a89fe/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/neilalexander/utp v0.1.1-0.20210622132614-ee9a34a30488/go.mod h1:NPHGhPc0/wudcaCqL/H5AOddkRf8GPRhzOujuUKGQu8= github.com/neilalexander/utp v0.1.1-0.20210705212447-691f29ad692b h1:XNm+Ks3bVziRJxcMaIbzumWEw7l52z9Rek6cMHgln1g= github.com/neilalexander/utp v0.1.1-0.20210705212447-691f29ad692b/go.mod h1:ylsx0342RjGHjOoVKhR/wz/7Lhiusonihfj4QLxEMcU= @@ -1284,7 +1274,6 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -1348,7 +1337,6 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -1363,7 +1351,6 @@ github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5P github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -1515,9 +1502,7 @@ github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= @@ -1595,8 +1580,6 @@ golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -1610,8 +1593,10 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= -golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1649,7 +1634,6 @@ golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hM golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180406214816-61147c48b25b/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1806,8 +1790,10 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210611083646-a4fc73990273 h1:faDu4veV+8pcThn4fewv6TVlNCezafGoC1gM/mxQLbQ= golang.org/x/sys v0.0.0-20210611083646-a4fc73990273/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1995,12 +1981,6 @@ gopkg.in/h2non/gock.v1 v1.0.14 h1:fTeu9fcUvSnLNacYvYI54h+1/XEteDyHvrVCZEEEYNM= gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE= gopkg.in/httprequest.v1 v1.1.1/go.mod h1:/CkavNL+g3qLOrpFHVrEx4NKepeqR4XTZWNj4sGGjz0= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/ini.v1 v1.51.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= -gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= -gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= -gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/macaroon.v2 v2.1.0 h1:HZcsjBCzq9t0eBPMKqTN/uSN6JOm78ZJ2INbqcBQOUI= gopkg.in/macaroon.v2 v2.1.0/go.mod h1:OUb+TQP/OP0WOerC2Jp/3CwhIKyIa9kQjuc7H24e6/o= gopkg.in/mgo.v2 v2.0.0-20160818015218-f2b6f6c918c4/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= @@ -2027,7 +2007,6 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= @@ -2086,4 +2065,4 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck= -sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0= +sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0= \ No newline at end of file diff --git a/internal/test/config.go b/internal/test/config.go index 7e68d6d2e..b023426e2 100644 --- a/internal/test/config.go +++ b/internal/test/config.go @@ -81,7 +81,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.MediaAPI.BasePath = config.Path(mediaBasePath) - cfg.Global.Kafka.Addresses = []string{kafkaURI} + cfg.Global.JetStream.Addresses = []string{kafkaURI} // TODO: Use different databases for the different schemas. // Using the same database for every schema currently works because diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 7e8fc2e0d..ed345dcb8 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -23,7 +23,7 @@ import ( "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" ) @@ -38,14 +38,14 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Producer: producer, DB: db, } diff --git a/mediaapi/routing/upload.go b/mediaapi/routing/upload.go index bc0d206b9..ecdab2195 100644 --- a/mediaapi/routing/upload.go +++ b/mediaapi/routing/upload.go @@ -147,20 +147,17 @@ func (r *uploadRequest) doUpload( // r.storeFileAndMetadata(ctx, tmpDir, ...) // before you return from doUpload else we will leak a temp file. We could make this nicer with a `WithTransaction` style of // nested function to guarantee either storage or cleanup. - - // should not happen, but prevents any int overflows - if cfg.MaxFileSizeBytes != nil && *cfg.MaxFileSizeBytes+1 <= 0 { - r.Logger.WithFields(log.Fields{ - "MaxFileSizeBytes": *cfg.MaxFileSizeBytes + 1, - }).Error("Error while transferring file, configured max_file_size_bytes overflows int64") - return &util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.Unknown("Failed to upload"), + if *cfg.MaxFileSizeBytes > 0 { + if *cfg.MaxFileSizeBytes+1 <= 0 { + r.Logger.WithFields(log.Fields{ + "MaxFileSizeBytes": *cfg.MaxFileSizeBytes, + }).Warnf("Configured MaxFileSizeBytes overflows int64, defaulting to %d bytes", config.DefaultMaxFileSizeBytes) + cfg.MaxFileSizeBytes = &config.DefaultMaxFileSizeBytes } + reqReader = io.LimitReader(reqReader, int64(*cfg.MaxFileSizeBytes)+1) } - lr := io.LimitReader(reqReader, int64(*cfg.MaxFileSizeBytes)+1) - hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, lr, cfg.AbsBasePath) + hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, reqReader, cfg.AbsBasePath) if err != nil { r.Logger.WithError(err).WithFields(log.Fields{ "MaxFileSizeBytes": *cfg.MaxFileSizeBytes, diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 396a1defa..192a056b8 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -23,8 +23,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" ) @@ -42,7 +41,7 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { @@ -55,7 +54,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + cfg, roomserverDB, producer, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 5c9540071..335d2a4ca 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -19,6 +19,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -161,12 +162,11 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro cfg := &config.Dendrite{} cfg.Defaults() cfg.Global.ServerName = testOrigin - cfg.Global.Kafka.UseNaffka = true cfg.RoomServer.Database = config.DatabaseOptions{ ConnectionString: roomserverDBFileURI, } dp := &dummyProducer{ - topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), + topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), } cache, err := caching.NewInMemoryLRUCache(false) if err != nil { @@ -181,7 +181,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro logrus.WithError(err).Panicf("failed to connect to room server db") } return internal.NewRoomserverAPI( - &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)), + &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent)), base.Caches, &test.NopJSONVerifier{}, nil, ), dp } diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 0d9511acf..3d71dbb6f 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -424,12 +424,17 @@ func (v *StateResolution) loadStateAfterEventsForNumericTuples( return result, nil } -var calculateStateDurations = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ +var calculateStateDurations = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: "dendrite", Subsystem: "roomserver", - Name: "calculate_state_duration_microseconds", + Name: "calculate_state_duration_milliseconds", Help: "How long it takes to calculate the state after a list of events", + Buckets: []float64{ // milliseconds + 5, 10, 25, 50, 75, 100, 200, 300, 400, 500, + 1000, 2000, 3000, 4000, 5000, 6000, + 7000, 8000, 9000, 10000, 15000, 20000, 30000, + }, }, // Takes two labels: // algorithm: @@ -496,9 +501,8 @@ func (c *calculateStateMetrics) stop(stateNID types.StateSnapshotNID, err error) } else { outcome = "failure" } - endTime := time.Now() calculateStateDurations.WithLabelValues(c.algorithm, outcome).Observe( - float64(endTime.Sub(c.startTime).Nanoseconds()) / 1000., + float64(time.Since(c.startTime).Milliseconds()), ) calculateStatePrevEventLength.WithLabelValues(c.algorithm, outcome).Observe( float64(c.prevEventLength), diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 9d9434cbb..8e787851b 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -866,6 +866,10 @@ func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey s return nil, err } stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, stateKey) + if err == sql.ErrNoRows { + // No rooms have a state event with this state key, otherwise we'd have an state key NID + return nil, nil + } if err != nil { return nil, err } diff --git a/setup/base.go b/setup/base.go index 1a52d1c26..8f41f73fb 100644 --- a/setup/base.go +++ b/setup/base.go @@ -84,8 +84,6 @@ type BaseDendrite struct { Cfg *config.Dendrite Caches *caching.Caches DNSCache *gomatrixserverlib.DNSCache - // KafkaConsumer sarama.Consumer - // KafkaProducer sarama.SyncProducer } const HTTPServerTimeout = time.Minute * 5 diff --git a/setup/config/config.go b/setup/config/config.go index b91144078..0589b5a30 100644 --- a/setup/config/config.go +++ b/setup/config/config.go @@ -296,6 +296,8 @@ func (config *Dendrite) Derive() error { func (c *Dendrite) Defaults() { c.Version = 1 + c.Wiring() + c.Global.Defaults() c.ClientAPI.Defaults() c.EDUServer.Defaults() @@ -309,8 +311,6 @@ func (c *Dendrite) Defaults() { c.UserAPI.Defaults() c.AppServiceAPI.Defaults() c.MSCs.Defaults() - - c.Wiring() } func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { @@ -329,6 +329,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { } func (c *Dendrite) Wiring() { + c.Global.JetStream.Matrix = &c.Global c.ClientAPI.Matrix = &c.Global c.EDUServer.Matrix = &c.Global c.FederationAPI.Matrix = &c.Global diff --git a/setup/config/config_global.go b/setup/config/config_global.go index 90a92f2bc..7b3d583bd 100644 --- a/setup/config/config_global.go +++ b/setup/config/config_global.go @@ -43,8 +43,8 @@ type Global struct { // Defaults to an empty array. TrustedIDServers []string `yaml:"trusted_third_party_id_servers"` - // Kafka/Naffka configuration - Kafka Kafka `yaml:"kafka"` + // JetStream configuration + JetStream JetStream `yaml:"jetstream"` // Metrics configuration Metrics Metrics `yaml:"metrics"` @@ -63,7 +63,7 @@ func (c *Global) Defaults() { c.KeyID = "ed25519:auto" c.KeyValidityPeriod = time.Hour * 24 * 7 - c.Kafka.Defaults() + c.JetStream.Defaults() c.Metrics.Defaults() c.DNSCache.Defaults() c.Sentry.Defaults() @@ -73,7 +73,7 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { checkNotEmpty(configErrs, "global.server_name", string(c.ServerName)) checkNotEmpty(configErrs, "global.private_key", string(c.PrivateKeyPath)) - c.Kafka.Verify(configErrs, isMonolith) + c.JetStream.Verify(configErrs, isMonolith) c.Metrics.Verify(configErrs, isMonolith) c.Sentry.Verify(configErrs, isMonolith) c.DNSCache.Verify(configErrs, isMonolith) diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go new file mode 100644 index 000000000..871f6958e --- /dev/null +++ b/setup/config/config_jetstream.go @@ -0,0 +1,38 @@ +package config + +import ( + "fmt" +) + +type JetStream struct { + Matrix *Global `yaml:"-"` + + // Persistent directory to store JetStream streams in. + StoragePath Path `yaml:"storage_path"` + // A list of NATS addresses to connect to. If none are specified, an + // internal NATS server will be used when running in monolith mode only. + Addresses []string `yaml:"addresses"` + // The prefix to use for stream names for this homeserver - really only + // useful if running more than one Dendrite on the same NATS deployment. + TopicPrefix string `yaml:"topic_prefix"` + // Keep all storage in memory. This is mostly useful for unit tests. + InMemory bool `yaml:"in_memory"` +} + +func (c *JetStream) TopicFor(name string) string { + return fmt.Sprintf("%s%s", c.TopicPrefix, name) +} + +func (c *JetStream) Defaults() { + c.Addresses = []string{} + c.TopicPrefix = "Dendrite" + c.StoragePath = Path("./") +} + +func (c *JetStream) Verify(configErrs *ConfigErrors, isMonolith bool) { + // If we are running in a polylith deployment then we need at least + // one NATS JetStream server to talk to. + if !isMonolith { + checkNotZero(configErrs, "global.jetstream.addresses", int64(len(c.Addresses))) + } +} diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go deleted file mode 100644 index b6ad88c75..000000000 --- a/setup/config/config_kafka.go +++ /dev/null @@ -1,73 +0,0 @@ -package config - -import "fmt" - -// Defined Kafka topics. -const ( - TopicOutputTypingEvent = "OutputTypingEvent" - TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" - TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" - TopicOutputRoomEvent = "OutputRoomEvent" - TopicOutputClientData = "OutputClientData" - TopicOutputReceiptEvent = "OutputReceiptEvent" -) - -// KafkaTopics is a convenience slice to access all defined Kafka topics. -var KafkaTopics = []string{ - TopicOutputTypingEvent, - TopicOutputSendToDeviceEvent, - TopicOutputKeyChangeEvent, - TopicOutputRoomEvent, - TopicOutputClientData, - TopicOutputReceiptEvent, -} - -type Kafka struct { - // A list of kafka/NATS addresses to connect to. - Addresses []string `yaml:"addresses"` - // The prefix to use for Kafka topic names for this homeserver - really only - // useful if running more than one Dendrite on the same Kafka deployment. - TopicPrefix string `yaml:"topic_prefix"` - // Whether to use naffka instead of kafka. - // Naffka can only be used when running dendrite as a single monolithic server. - // Kafka can be used both with a monolithic server and when running the - // components as separate servers. - UseNaffka bool `yaml:"use_naffka"` - // The Naffka database is used internally by the naffka library, if used. - Database DatabaseOptions `yaml:"naffka_database"` - // The max size a Kafka message passed between consumer/producer can have - // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka - MaxMessageBytes *int `yaml:"max_message_bytes"` - // Whether to use NATS instead of kafka/naffka - UseNATS bool `yaml:"use_nats"` -} - -func (k *Kafka) TopicFor(name string) string { - return fmt.Sprintf("%s%s", k.TopicPrefix, name) -} - -func (c *Kafka) Defaults() { - c.UseNaffka = true - c.Database.Defaults(10) - c.Addresses = []string{"localhost:2181"} - c.Database.ConnectionString = DataSource("file:naffka.db") - c.TopicPrefix = "Dendrite" - - maxBytes := 1024 * 1024 * 8 // about 8MB - c.MaxMessageBytes = &maxBytes -} - -func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { - if c.UseNaffka { - if !isMonolith { - configErrs.Add("naffka can only be used in a monolithic server") - } - checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) - } else { - // If we aren't using naffka then we need to have at least one kafka/nats - // server to talk to. - checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) - } - checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) - checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes)) -} diff --git a/setup/config/config_mediaapi.go b/setup/config/config_mediaapi.go index 0943a39e2..c55978e11 100644 --- a/setup/config/config_mediaapi.go +++ b/setup/config/config_mediaapi.go @@ -2,7 +2,6 @@ package config import ( "fmt" - "math" ) type MediaAPI struct { @@ -36,6 +35,9 @@ type MediaAPI struct { ThumbnailSizes []ThumbnailSize `yaml:"thumbnail_sizes"` } +// DefaultMaxFileSizeBytes defines the default file size allowed in transfers +var DefaultMaxFileSizeBytes = FileSizeBytes(10485760) + func (c *MediaAPI) Defaults() { c.InternalAPI.Listen = "http://localhost:7774" c.InternalAPI.Connect = "http://localhost:7774" @@ -43,8 +45,7 @@ func (c *MediaAPI) Defaults() { c.Database.Defaults(5) c.Database.ConnectionString = "file:mediaapi.db" - defaultMaxFileSizeBytes := FileSizeBytes(10485760) - c.MaxFileSizeBytes = &defaultMaxFileSizeBytes + c.MaxFileSizeBytes = &DefaultMaxFileSizeBytes c.MaxThumbnailGenerators = 10 c.BasePath = "./media_store" } @@ -58,11 +59,6 @@ func (c *MediaAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString)) checkNotEmpty(configErrs, "media_api.base_path", string(c.BasePath)) - // allow "unlimited" file size - if c.MaxFileSizeBytes != nil && *c.MaxFileSizeBytes <= 0 { - unlimitedSize := FileSizeBytes(math.MaxInt64 - 1) - c.MaxFileSizeBytes = &unlimitedSize - } checkPositive(configErrs, "media_api.max_file_size_bytes", int64(*c.MaxFileSizeBytes)) checkPositive(configErrs, "media_api.max_thumbnail_generators", int64(c.MaxThumbnailGenerators)) diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go new file mode 100644 index 000000000..1f5a8bf90 --- /dev/null +++ b/setup/jetstream/nats.go @@ -0,0 +1,90 @@ +package jetstream + +import ( + "strings" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/setup/config" + "github.com/sirupsen/logrus" + + saramajs "github.com/S7evinK/saramajetstream" + natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + natsclient "github.com/nats-io/nats.go" +) + +var natsServer *natsserver.Server +var natsServerMutex sync.Mutex + +func SetupConsumerProducer(cfg *config.JetStream) (sarama.Consumer, sarama.SyncProducer) { + natsServerMutex.Lock() + if natsServer == nil { + var err error + natsServer, err = natsserver.NewServer(&natsserver.Options{ + ServerName: "monolith", + DontListen: true, + JetStream: true, + StoreDir: string(cfg.StoragePath), + NoSystemAccount: true, + AllowNewAccounts: false, + }) + if err != nil { + panic(err) + } + natsServer.ConfigureLogger() + go natsServer.Start() + } + natsServerMutex.Unlock() + if !natsServer.ReadyForConnections(time.Second * 10) { + logrus.Fatalln("NATS did not start in time") + } + nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer)) + if err != nil { + logrus.Fatalln("Failed to create NATS client") + } + return setupNATS(cfg, nc) +} + +func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (sarama.Consumer, sarama.SyncProducer) { + if nc == nil { + var err error + nc, err = nats.Connect(strings.Join(cfg.Addresses, ",")) + if err != nil { + logrus.WithError(err).Panic("Unable to connect to NATS") + return nil, nil + } + } + + s, err := nc.JetStream() + if err != nil { + logrus.WithError(err).Panic("Unable to get JetStream context") + return nil, nil + } + + for _, stream := range streams { + info, err := s.StreamInfo(stream.Name) + if err != nil && err != natsclient.ErrStreamNotFound { + logrus.WithError(err).Fatal("Unable to get stream info") + } + if info == nil { + stream.Name = cfg.TopicFor(stream.Name) + stream.Subjects = []string{stream.Name} + + // If we're trying to keep everything in memory (e.g. unit tests) + // then overwrite the storage policy. + if cfg.InMemory { + stream.Storage = nats.MemoryStorage + } + + if _, err = s.AddStream(stream); err != nil { + logrus.WithError(err).WithField("stream", stream.Name).Fatal("Unable to add stream") + } + } + } + + consumer := saramajs.NewJetStreamConsumer(nc, s, "") + producer := saramajs.NewJetStreamProducer(nc, s, "") + return consumer, producer +} diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go new file mode 100644 index 000000000..326e62a93 --- /dev/null +++ b/setup/jetstream/streams.go @@ -0,0 +1,50 @@ +package jetstream + +import ( + "time" + + "github.com/nats-io/nats.go" +) + +var ( + OutputRoomEvent = "OutputRoomEvent" + OutputSendToDeviceEvent = "OutputSendToDeviceEvent" + OutputKeyChangeEvent = "OutputKeyChangeEvent" + OutputTypingEvent = "OutputTypingEvent" + OutputClientData = "OutputClientData" + OutputReceiptEvent = "OutputReceiptEvent" +) + +var streams = []*nats.StreamConfig{ + { + Name: OutputRoomEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputSendToDeviceEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputKeyChangeEvent, + Retention: nats.LimitsPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputTypingEvent, + Retention: nats.InterestPolicy, + Storage: nats.MemoryStorage, + MaxAge: time.Second * 60, + }, + { + Name: OutputClientData, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputReceiptEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, +} diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go deleted file mode 100644 index e3ef50c5f..000000000 --- a/setup/kafka/kafka.go +++ /dev/null @@ -1,114 +0,0 @@ -package kafka - -import ( - "strings" - "time" - - js "github.com/S7evinK/saramajetstream" - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - if cfg.UseNaffka { - return setupNaffka(cfg) - } - if cfg.UseNATS { - return setupNATS(cfg) - } - return setupKafka(cfg) -} - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - sCfg := sarama.NewConfig() - sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes - sCfg.Producer.Return.Successes = true - sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes) - - consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// In monolith mode with Naffka, we don't have the same constraints about -// consuming the same topic from more than one place like we do with Kafka. -// Therefore, we will only open one Naffka connection in case Naffka is -// running on SQLite. -var naffkaInstance *naffka.Naffka - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - if naffkaInstance != nil { - return naffkaInstance, naffkaInstance - } - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naffkaInstance, err = naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naffkaInstance, naffkaInstance -} - -func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - logrus.WithField("servers", cfg.Addresses).Debug("connecting to nats") - nc, err := nats.Connect(strings.Join(cfg.Addresses, ",")) - if err != nil { - logrus.WithError(err).Panic("failed to connect to nats") - return nil, nil - } - - s, err := nc.JetStream() - if err != nil { - logrus.WithError(err).Panic("unable to get jetstream context") - return nil, nil - } - - // create a stream for every topic - for _, topic := range config.KafkaTopics { - sn := cfg.TopicFor(topic) - stream, err := s.StreamInfo(sn) - if err != nil { - logrus.WithError(err).Warn("unable to get stream info") - } - - if stream == nil { - maxLifeTime := time.Second * 0 - - // Typing events can be removed from the stream, as they are only relevant for a short time - if topic == config.TopicOutputTypingEvent { - maxLifeTime = time.Second * 60 - } - _, err = s.AddStream(&nats.StreamConfig{ - Name: sn, - Subjects: []string{topic}, - MaxBytes: int64(*cfg.MaxMessageBytes), - MaxMsgSize: int32(*cfg.MaxMessageBytes), - MaxAge: maxLifeTime, - Duplicates: maxLifeTime / 2, - }) - if err != nil { - logrus.WithError(err).WithField("stream", sn).Fatal("unable to add nats stream") - } - } - } - - consumer := js.NewJetStreamConsumer(nc, s, cfg.TopicPrefix) - producer := js.NewJetStreamProducer(nc, s, cfg.TopicPrefix) - return consumer, producer -} diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index a166ae14d..a50e9c8f9 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -50,7 +51,7 @@ func NewOutputClientDataConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/clientapi", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 668f945bc..2b25985f9 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -52,7 +53,7 @@ func NewOutputReceiptEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/receipt", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 5e626aefe..4de8eeb77 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -55,7 +56,7 @@ func NewOutputSendToDeviceEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/sendtodevice", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index c10d1e94e..343f777e1 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -53,7 +54,7 @@ func NewOutputTypingEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/typing", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index a6aeee3a8..68a80952d 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -58,7 +59,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/roomserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 2ef25e032..9bb8c6d24 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -393,7 +393,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st start = *r.from if events[len(events)-1].Type() == gomatrixserverlib.MRoomCreate { // NOTSPEC: We've hit the beginning of the room so there's really nowhere - // else to go. This seems to fix Riot iOS from looping on /messages endlessly. + // else to go. This seems to fix Element iOS from looping on /messages endlessly. end = types.TopologyToken{} } else { end, err = r.db.EventPositionInTopology( diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 84c7140ca..16e222cb0 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -24,7 +24,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -48,7 +48,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + consumer, _ := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -65,7 +65,7 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), + process, cfg.Matrix.ServerName, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil { diff --git a/sytest-whitelist b/sytest-whitelist index f6a051bda..4d0b9fcf5 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -520,6 +520,8 @@ Inviting an AS-hosted user asks the AS server Can generate a openid access_token that can be exchanged for information about a user Invalid openid access tokens are rejected Requests to userinfo without access tokens are rejected +'ban' event respects room powerlevel +Non-present room members cannot ban others POST /_synapse/admin/v1/register with shared secret POST /_synapse/admin/v1/register admin with shared secret POST /_synapse/admin/v1/register with shared secret downcases capitals