Merge branch 'master' of https://github.com/matrix-org/dendrite into add-receipts-2
This commit is contained in:
commit
0b5801af3e
|
@ -30,6 +30,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/appservice/workers"
|
"github.com/matrix-org/dendrite/appservice/workers"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/setup"
|
"github.com/matrix-org/dendrite/internal/setup"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup/kafka"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -47,6 +48,8 @@ func NewInternalAPI(
|
||||||
userAPI userapi.UserInternalAPI,
|
userAPI userapi.UserInternalAPI,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
) appserviceAPI.AppServiceQueryAPI {
|
) appserviceAPI.AppServiceQueryAPI {
|
||||||
|
consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
|
||||||
|
|
||||||
// Create a connection to the appservice postgres DB
|
// Create a connection to the appservice postgres DB
|
||||||
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
|
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -86,7 +89,7 @@ func NewInternalAPI(
|
||||||
// We can't add ASes at runtime so this is safe to do.
|
// We can't add ASes at runtime so this is safe to do.
|
||||||
if len(workerStates) > 0 {
|
if len(workerStates) > 0 {
|
||||||
consumer := consumers.NewOutputRoomEventConsumer(
|
consumer := consumers.NewOutputRoomEventConsumer(
|
||||||
base.Cfg, base.KafkaConsumer, appserviceDB,
|
base.Cfg, consumer, appserviceDB,
|
||||||
rsAPI, workerStates,
|
rsAPI, workerStates,
|
||||||
)
|
)
|
||||||
if err := consumer.Start(); err != nil {
|
if err := consumer.Start(); err != nil {
|
||||||
|
|
|
@ -29,6 +29,8 @@ services:
|
||||||
KAFKA_ADVERTISED_HOST_NAME: "kafka"
|
KAFKA_ADVERTISED_HOST_NAME: "kafka"
|
||||||
KAFKA_DELETE_TOPIC_ENABLE: "true"
|
KAFKA_DELETE_TOPIC_ENABLE: "true"
|
||||||
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
|
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
|
||||||
|
ports:
|
||||||
|
- 9092:9092
|
||||||
depends_on:
|
depends_on:
|
||||||
- zookeeper
|
- zookeeper
|
||||||
networks:
|
networks:
|
||||||
|
|
|
@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
|
|
||||||
serverKeyAPI := &signing.YggdrasilKeys{}
|
serverKeyAPI := &signing.YggdrasilKeys{}
|
||||||
keyRing := serverKeyAPI.KeyRing()
|
keyRing := serverKeyAPI.KeyRing()
|
||||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
|
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
|
||||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
||||||
keyAPI.SetUserAPI(userAPI)
|
keyAPI.SetUserAPI(userAPI)
|
||||||
|
|
||||||
|
@ -151,8 +151,6 @@ func (m *DendriteMonolith) Start() {
|
||||||
Client: ygg.CreateClient(base),
|
Client: ygg.CreateClient(base),
|
||||||
FedClient: federation,
|
FedClient: federation,
|
||||||
KeyRing: keyRing,
|
KeyRing: keyRing,
|
||||||
KafkaConsumer: base.KafkaConsumer,
|
|
||||||
KafkaProducer: base.KafkaProducer,
|
|
||||||
|
|
||||||
AppserviceAPI: asAPI,
|
AppserviceAPI: asAPI,
|
||||||
EDUInternalAPI: eduInputAPI,
|
EDUInternalAPI: eduInputAPI,
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
package clientapi
|
package clientapi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi/api"
|
"github.com/matrix-org/dendrite/clientapi/api"
|
||||||
|
@ -24,6 +23,7 @@ import (
|
||||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup/kafka"
|
||||||
"github.com/matrix-org/dendrite/internal/transactions"
|
"github.com/matrix-org/dendrite/internal/transactions"
|
||||||
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
|
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -36,7 +36,6 @@ import (
|
||||||
func AddPublicRoutes(
|
func AddPublicRoutes(
|
||||||
router *mux.Router,
|
router *mux.Router,
|
||||||
cfg *config.ClientAPI,
|
cfg *config.ClientAPI,
|
||||||
producer sarama.SyncProducer,
|
|
||||||
accountsDB accounts.Database,
|
accountsDB accounts.Database,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
|
@ -48,6 +47,8 @@ func AddPublicRoutes(
|
||||||
keyAPI keyserverAPI.KeyInternalAPI,
|
keyAPI keyserverAPI.KeyInternalAPI,
|
||||||
extRoomsProvider api.ExtraPublicRoomsProvider,
|
extRoomsProvider api.ExtraPublicRoomsProvider,
|
||||||
) {
|
) {
|
||||||
|
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||||
|
|
||||||
syncProducer := &producers.SyncAPIProducer{
|
syncProducer := &producers.SyncAPIProducer{
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
|
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
|
||||||
|
|
|
@ -37,7 +37,7 @@ func main() {
|
||||||
keyAPI := base.KeyServerHTTPClient()
|
keyAPI := base.KeyServerHTTPClient()
|
||||||
|
|
||||||
clientapi.AddPublicRoutes(
|
clientapi.AddPublicRoutes(
|
||||||
base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation,
|
base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation,
|
||||||
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
|
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ func main() {
|
||||||
|
|
||||||
accountDB := base.Base.CreateAccountsDB()
|
accountDB := base.Base.CreateAccountsDB()
|
||||||
federation := createFederationClient(base)
|
federation := createFederationClient(base)
|
||||||
keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer)
|
keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation)
|
||||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||||
keyAPI.SetUserAPI(userAPI)
|
keyAPI.SetUserAPI(userAPI)
|
||||||
|
|
||||||
|
@ -174,8 +174,6 @@ func main() {
|
||||||
Client: createClient(base),
|
Client: createClient(base),
|
||||||
FedClient: federation,
|
FedClient: federation,
|
||||||
KeyRing: keyRing,
|
KeyRing: keyRing,
|
||||||
KafkaConsumer: base.Base.KafkaConsumer,
|
|
||||||
KafkaProducer: base.Base.KafkaProducer,
|
|
||||||
|
|
||||||
AppserviceAPI: asAPI,
|
AppserviceAPI: asAPI,
|
||||||
EDUInternalAPI: eduInputAPI,
|
EDUInternalAPI: eduInputAPI,
|
||||||
|
|
|
@ -96,7 +96,7 @@ func main() {
|
||||||
serverKeyAPI := &signing.YggdrasilKeys{}
|
serverKeyAPI := &signing.YggdrasilKeys{}
|
||||||
keyRing := serverKeyAPI.KeyRing()
|
keyRing := serverKeyAPI.KeyRing()
|
||||||
|
|
||||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
|
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
|
||||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||||
keyAPI.SetUserAPI(userAPI)
|
keyAPI.SetUserAPI(userAPI)
|
||||||
|
|
||||||
|
@ -134,8 +134,6 @@ func main() {
|
||||||
Client: ygg.CreateClient(base),
|
Client: ygg.CreateClient(base),
|
||||||
FedClient: federation,
|
FedClient: federation,
|
||||||
KeyRing: keyRing,
|
KeyRing: keyRing,
|
||||||
KafkaConsumer: base.KafkaConsumer,
|
|
||||||
KafkaProducer: base.KafkaProducer,
|
|
||||||
|
|
||||||
AppserviceAPI: asAPI,
|
AppserviceAPI: asAPI,
|
||||||
EDUInternalAPI: eduInputAPI,
|
EDUInternalAPI: eduInputAPI,
|
||||||
|
|
|
@ -24,7 +24,7 @@ func main() {
|
||||||
base := setup.NewBaseDendrite(cfg, "KeyServer", true)
|
base := setup.NewBaseDendrite(cfg, "KeyServer", true)
|
||||||
defer base.Close() // nolint: errcheck
|
defer base.Close() // nolint: errcheck
|
||||||
|
|
||||||
intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer)
|
intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient())
|
||||||
intAPI.SetUserAPI(base.UserAPIClient())
|
intAPI.SetUserAPI(base.UserAPIClient())
|
||||||
|
|
||||||
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
||||||
|
|
|
@ -108,7 +108,7 @@ func main() {
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||||
rsImpl.SetFederationSenderAPI(fsAPI)
|
rsImpl.SetFederationSenderAPI(fsAPI)
|
||||||
|
|
||||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer)
|
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
|
||||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
||||||
keyAPI.SetUserAPI(userAPI)
|
keyAPI.SetUserAPI(userAPI)
|
||||||
|
|
||||||
|
@ -132,8 +132,6 @@ func main() {
|
||||||
Client: base.CreateClient(),
|
Client: base.CreateClient(),
|
||||||
FedClient: federation,
|
FedClient: federation,
|
||||||
KeyRing: keyRing,
|
KeyRing: keyRing,
|
||||||
KafkaConsumer: base.KafkaConsumer,
|
|
||||||
KafkaProducer: base.KafkaProducer,
|
|
||||||
|
|
||||||
AppserviceAPI: asAPI,
|
AppserviceAPI: asAPI,
|
||||||
EDUInternalAPI: eduInputAPI,
|
EDUInternalAPI: eduInputAPI,
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
cfg := setup.ParseFlags(false)
|
cfg := setup.ParseFlags(false)
|
||||||
|
|
||||||
base := setup.NewBaseDendrite(cfg, "SyncAPI", true)
|
base := setup.NewBaseDendrite(cfg, "SyncAPI", true)
|
||||||
defer base.Close() // nolint: errcheck
|
defer base.Close() // nolint: errcheck
|
||||||
|
|
||||||
|
@ -30,7 +31,7 @@ func main() {
|
||||||
rsAPI := base.RoomserverHTTPClient()
|
rsAPI := base.RoomserverHTTPClient()
|
||||||
|
|
||||||
syncapi.AddPublicRoutes(
|
syncapi.AddPublicRoutes(
|
||||||
base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI,
|
base.PublicClientAPIMux, userAPI, rsAPI,
|
||||||
base.KeyServerHTTPClient(),
|
base.KeyServerHTTPClient(),
|
||||||
federation, &cfg.SyncAPI,
|
federation, &cfg.SyncAPI,
|
||||||
)
|
)
|
||||||
|
|
|
@ -190,7 +190,7 @@ func main() {
|
||||||
|
|
||||||
accountDB := base.CreateAccountsDB()
|
accountDB := base.CreateAccountsDB()
|
||||||
federation := createFederationClient(cfg, node)
|
federation := createFederationClient(cfg, node)
|
||||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
|
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
|
||||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||||
keyAPI.SetUserAPI(userAPI)
|
keyAPI.SetUserAPI(userAPI)
|
||||||
|
|
||||||
|
@ -217,8 +217,6 @@ func main() {
|
||||||
Client: createClient(node),
|
Client: createClient(node),
|
||||||
FedClient: federation,
|
FedClient: federation,
|
||||||
KeyRing: &keyRing,
|
KeyRing: &keyRing,
|
||||||
KafkaConsumer: base.KafkaConsumer,
|
|
||||||
KafkaProducer: base.KafkaProducer,
|
|
||||||
|
|
||||||
AppserviceAPI: asQuery,
|
AppserviceAPI: asQuery,
|
||||||
EDUInternalAPI: eduInputAPI,
|
EDUInternalAPI: eduInputAPI,
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/eduserver/inthttp"
|
"github.com/matrix-org/dendrite/eduserver/inthttp"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/setup"
|
"github.com/matrix-org/dendrite/internal/setup"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup/kafka"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,10 +42,13 @@ func NewInternalAPI(
|
||||||
userAPI userapi.UserInternalAPI,
|
userAPI userapi.UserInternalAPI,
|
||||||
) api.EDUServerInputAPI {
|
) api.EDUServerInputAPI {
|
||||||
cfg := &base.Cfg.EDUServer
|
cfg := &base.Cfg.EDUServer
|
||||||
|
|
||||||
|
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||||
|
|
||||||
return &input.EDUServerInputAPI{
|
return &input.EDUServerInputAPI{
|
||||||
Cache: eduCache,
|
Cache: eduCache,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
Producer: base.KafkaProducer,
|
Producer: producer,
|
||||||
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
|
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
|
||||||
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
|
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
|
||||||
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
|
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
|
||||||
|
|
|
@ -289,6 +289,15 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
||||||
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal typing event")
|
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal typing event")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
_, domain, err := gomatrixserverlib.SplitID('@', typingPayload.UserID)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(ctx).WithError(err).Error("Failed to split domain from typing event sender")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if domain != t.Origin {
|
||||||
|
util.GetLogger(ctx).Warnf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
|
||||||
|
continue
|
||||||
|
}
|
||||||
if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
|
if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server")
|
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server")
|
||||||
}
|
}
|
||||||
|
@ -467,6 +476,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
|
||||||
return gomatrixserverlib.Allowed(e, &authUsingState)
|
return gomatrixserverlib.Allowed(e, &authUsingState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nolint:gocyclo
|
||||||
func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
|
func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
|
||||||
// Do this with a fresh context, so that we keep working even if the
|
// Do this with a fresh context, so that we keep working even if the
|
||||||
// original request times out. With any luck, by the time the remote
|
// original request times out. With any luck, by the time the remote
|
||||||
|
@ -504,36 +514,71 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
|
||||||
backwardsExtremity := &newEvents[0]
|
backwardsExtremity := &newEvents[0]
|
||||||
newEvents = newEvents[1:]
|
newEvents = newEvents[1:]
|
||||||
|
|
||||||
|
type respState struct {
|
||||||
|
// A snapshot is considered trustworthy if it came from our own roomserver.
|
||||||
|
// That's because the state will have been through state resolution once
|
||||||
|
// already in QueryStateAfterEvent.
|
||||||
|
trustworthy bool
|
||||||
|
*gomatrixserverlib.RespState
|
||||||
|
}
|
||||||
|
|
||||||
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
|
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
|
||||||
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
|
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
|
||||||
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
|
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
|
||||||
var states []*gomatrixserverlib.RespState
|
var states []*respState
|
||||||
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
|
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
|
||||||
// Look up what the state is after the backward extremity. This will either
|
// Look up what the state is after the backward extremity. This will either
|
||||||
// come from the roomserver, if we know all the required events, or it will
|
// come from the roomserver, if we know all the required events, or it will
|
||||||
// come from a remote server via /state_ids if not.
|
// come from a remote server via /state_ids if not.
|
||||||
var prevState *gomatrixserverlib.RespState
|
prevState, trustworthy, lerr := t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
|
||||||
prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
|
if lerr != nil {
|
||||||
if err != nil {
|
util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
|
||||||
util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
|
return lerr
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
// Append the state onto the collected state. We'll run this through the
|
// Append the state onto the collected state. We'll run this through the
|
||||||
// state resolution next.
|
// state resolution next.
|
||||||
states = append(states, prevState)
|
states = append(states, &respState{trustworthy, prevState})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we have collected all of the state from the prev_events, we'll
|
// Now that we have collected all of the state from the prev_events, we'll
|
||||||
// run the state through the appropriate state resolution algorithm for the
|
// run the state through the appropriate state resolution algorithm for the
|
||||||
// room. This does a couple of things:
|
// room if needed. This does a couple of things:
|
||||||
// 1. Ensures that the state is deduplicated fully for each state-key tuple
|
// 1. Ensures that the state is deduplicated fully for each state-key tuple
|
||||||
// 2. Ensures that we pick the latest events from both sets, in the case that
|
// 2. Ensures that we pick the latest events from both sets, in the case that
|
||||||
// one of the prev_events is quite a bit older than the others
|
// one of the prev_events is quite a bit older than the others
|
||||||
resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity)
|
resolvedState := &gomatrixserverlib.RespState{}
|
||||||
|
switch len(states) {
|
||||||
|
case 0:
|
||||||
|
extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
|
||||||
|
if !extremityIsCreate {
|
||||||
|
// There are no previous states and this isn't the beginning of the
|
||||||
|
// room - this is an error condition!
|
||||||
|
util.GetLogger(ctx).Errorf("Failed to lookup any state after prev_events")
|
||||||
|
return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states))
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
// There's only one previous state - if it's trustworthy (came from a
|
||||||
|
// local state snapshot which will already have been through state res),
|
||||||
|
// use it as-is. There's no point in resolving it again.
|
||||||
|
if states[0].trustworthy {
|
||||||
|
resolvedState = states[0].RespState
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Otherwise, if it isn't trustworthy (came from federation), run it through
|
||||||
|
// state resolution anyway for safety, in case there are duplicates.
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
respStates := make([]*gomatrixserverlib.RespState, len(states))
|
||||||
|
for i := range states {
|
||||||
|
respStates[i] = states[i].RespState
|
||||||
|
}
|
||||||
|
// There's more than one previous state - run them all through state res
|
||||||
|
resolvedState, err = t.resolveStatesAndCheck(gmectx, roomVersion, respStates, backwardsExtremity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
|
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// First of all, send the backward extremity into the roomserver with the
|
// First of all, send the backward extremity into the roomserver with the
|
||||||
// newly resolved state. This marks the "oldest" point in the backfill and
|
// newly resolved state. This marks the "oldest" point in the backfill and
|
||||||
|
@ -572,16 +617,16 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
|
||||||
|
|
||||||
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
|
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
|
||||||
// added into the mix.
|
// added into the mix.
|
||||||
func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, error) {
|
func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) {
|
||||||
// try doing all this locally before we resort to querying federation
|
// try doing all this locally before we resort to querying federation
|
||||||
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
|
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
|
||||||
if respState != nil {
|
if respState != nil {
|
||||||
return respState, nil
|
return respState, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
|
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
|
return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
servers := t.getServers(ctx, roomID)
|
servers := t.getServers(ctx, roomID)
|
||||||
|
@ -593,11 +638,11 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
|
||||||
h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
|
h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
case verifySigError:
|
case verifySigError:
|
||||||
return respState, nil
|
return respState, false, nil
|
||||||
case nil:
|
case nil:
|
||||||
// do nothing
|
// do nothing
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("t.lookupEvent: %w", err)
|
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
|
||||||
}
|
}
|
||||||
t.haveEvents[h.EventID()] = h
|
t.haveEvents[h.EventID()] = h
|
||||||
if h.StateKey() != nil {
|
if h.StateKey() != nil {
|
||||||
|
@ -615,7 +660,7 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return respState, nil
|
return respState, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
|
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/federationsender/statistics"
|
"github.com/matrix-org/dendrite/federationsender/statistics"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/internal/setup"
|
"github.com/matrix-org/dendrite/internal/setup"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup/kafka"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -55,6 +56,8 @@ func NewInternalAPI(
|
||||||
FailuresUntilBlacklist: cfg.FederationMaxRetries,
|
FailuresUntilBlacklist: cfg.FederationMaxRetries,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||||
|
|
||||||
queues := queue.NewOutgoingQueues(
|
queues := queue.NewOutgoingQueues(
|
||||||
federationSenderDB, cfg.Matrix.ServerName, federation,
|
federationSenderDB, cfg.Matrix.ServerName, federation,
|
||||||
rsAPI, stats,
|
rsAPI, stats,
|
||||||
|
@ -66,7 +69,7 @@ func NewInternalAPI(
|
||||||
)
|
)
|
||||||
|
|
||||||
rsConsumer := consumers.NewOutputRoomEventConsumer(
|
rsConsumer := consumers.NewOutputRoomEventConsumer(
|
||||||
cfg, base.KafkaConsumer, queues,
|
cfg, consumer, queues,
|
||||||
federationSenderDB, rsAPI,
|
federationSenderDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err = rsConsumer.Start(); err != nil {
|
if err = rsConsumer.Start(); err != nil {
|
||||||
|
@ -74,13 +77,13 @@ func NewInternalAPI(
|
||||||
}
|
}
|
||||||
|
|
||||||
tsConsumer := consumers.NewOutputEDUConsumer(
|
tsConsumer := consumers.NewOutputEDUConsumer(
|
||||||
cfg, base.KafkaConsumer, queues, federationSenderDB,
|
cfg, consumer, queues, federationSenderDB,
|
||||||
)
|
)
|
||||||
if err := tsConsumer.Start(); err != nil {
|
if err := tsConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start typing server consumer")
|
logrus.WithError(err).Panic("failed to start typing server consumer")
|
||||||
}
|
}
|
||||||
keyConsumer := consumers.NewKeyChangeConsumer(
|
keyConsumer := consumers.NewKeyChangeConsumer(
|
||||||
&base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI,
|
&base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err := keyConsumer.Start(); err != nil {
|
if err := keyConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start key server consumer")
|
logrus.WithError(err).Panic("failed to start key server consumer")
|
||||||
|
|
|
@ -26,13 +26,9 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
|
||||||
"github.com/matrix-org/naffka"
|
|
||||||
naffkaStorage "github.com/matrix-org/naffka/storage"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
|
@ -73,8 +69,8 @@ type BaseDendrite struct {
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
Cfg *config.Dendrite
|
Cfg *config.Dendrite
|
||||||
Caches *caching.Caches
|
Caches *caching.Caches
|
||||||
KafkaConsumer sarama.Consumer
|
// KafkaConsumer sarama.Consumer
|
||||||
KafkaProducer sarama.SyncProducer
|
// KafkaProducer sarama.SyncProducer
|
||||||
}
|
}
|
||||||
|
|
||||||
const HTTPServerTimeout = time.Minute * 5
|
const HTTPServerTimeout = time.Minute * 5
|
||||||
|
@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
|
||||||
logrus.WithError(err).Panicf("failed to start opentracing")
|
logrus.WithError(err).Panicf("failed to start opentracing")
|
||||||
}
|
}
|
||||||
|
|
||||||
var kafkaConsumer sarama.Consumer
|
|
||||||
var kafkaProducer sarama.SyncProducer
|
|
||||||
if cfg.Global.Kafka.UseNaffka {
|
|
||||||
kafkaConsumer, kafkaProducer = setupNaffka(cfg)
|
|
||||||
} else {
|
|
||||||
kafkaConsumer, kafkaProducer = setupKafka(cfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
cache, err := caching.NewInMemoryLRUCache(true)
|
cache, err := caching.NewInMemoryLRUCache(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Warnf("Failed to create cache")
|
logrus.WithError(err).Warnf("Failed to create cache")
|
||||||
|
@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
|
||||||
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
|
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
|
||||||
apiHttpClient: &apiClient,
|
apiHttpClient: &apiClient,
|
||||||
httpClient: &client,
|
httpClient: &client,
|
||||||
KafkaConsumer: kafkaConsumer,
|
|
||||||
KafkaProducer: kafkaProducer,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setupKafka creates kafka consumer/producer pair from the config.
|
|
||||||
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
|
||||||
consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panic("failed to start kafka consumer")
|
|
||||||
}
|
|
||||||
|
|
||||||
producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panic("failed to setup kafka producers")
|
|
||||||
}
|
|
||||||
|
|
||||||
return consumer, producer
|
|
||||||
}
|
|
||||||
|
|
||||||
// setupNaffka creates kafka consumer/producer pair from the config.
|
|
||||||
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
|
||||||
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString))
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka database")
|
|
||||||
}
|
|
||||||
naff, err := naffka.New(naffkaDB)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka")
|
|
||||||
}
|
|
||||||
return naff, naff
|
|
||||||
}
|
|
||||||
|
|
53
internal/setup/kafka/kafka.go
Normal file
53
internal/setup/kafka/kafka.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
|
"github.com/matrix-org/naffka"
|
||||||
|
naffkaStorage "github.com/matrix-org/naffka/storage"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
|
if cfg.UseNaffka {
|
||||||
|
return setupNaffka(cfg)
|
||||||
|
}
|
||||||
|
return setupKafka(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// setupKafka creates kafka consumer/producer pair from the config.
|
||||||
|
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
|
consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Panic("failed to start kafka consumer")
|
||||||
|
}
|
||||||
|
|
||||||
|
producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Panic("failed to setup kafka producers")
|
||||||
|
}
|
||||||
|
|
||||||
|
return consumer, producer
|
||||||
|
}
|
||||||
|
|
||||||
|
// In monolith mode with Naffka, we don't have the same constraints about
|
||||||
|
// consuming the same topic from more than one place like we do with Kafka.
|
||||||
|
// Therefore, we will only open one Naffka connection in case Naffka is
|
||||||
|
// running on SQLite.
|
||||||
|
var naffkaInstance *naffka.Naffka
|
||||||
|
|
||||||
|
// setupNaffka creates kafka consumer/producer pair from the config.
|
||||||
|
func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
|
if naffkaInstance != nil {
|
||||||
|
return naffkaInstance, naffkaInstance
|
||||||
|
}
|
||||||
|
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Panic("Failed to setup naffka database")
|
||||||
|
}
|
||||||
|
naffkaInstance, err = naffka.New(naffkaDB)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Panic("Failed to setup naffka")
|
||||||
|
}
|
||||||
|
return naffkaInstance, naffkaInstance
|
||||||
|
}
|
|
@ -15,7 +15,6 @@
|
||||||
package setup
|
package setup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi"
|
"github.com/matrix-org/dendrite/clientapi"
|
||||||
|
@ -43,8 +42,6 @@ type Monolith struct {
|
||||||
KeyRing *gomatrixserverlib.KeyRing
|
KeyRing *gomatrixserverlib.KeyRing
|
||||||
Client *gomatrixserverlib.Client
|
Client *gomatrixserverlib.Client
|
||||||
FedClient *gomatrixserverlib.FederationClient
|
FedClient *gomatrixserverlib.FederationClient
|
||||||
KafkaConsumer sarama.Consumer
|
|
||||||
KafkaProducer sarama.SyncProducer
|
|
||||||
|
|
||||||
AppserviceAPI appserviceAPI.AppServiceQueryAPI
|
AppserviceAPI appserviceAPI.AppServiceQueryAPI
|
||||||
EDUInternalAPI eduServerAPI.EDUServerInputAPI
|
EDUInternalAPI eduServerAPI.EDUServerInputAPI
|
||||||
|
@ -61,7 +58,7 @@ type Monolith struct {
|
||||||
// AddAllPublicRoutes attaches all public paths to the given router
|
// AddAllPublicRoutes attaches all public paths to the given router
|
||||||
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
|
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
|
||||||
clientapi.AddPublicRoutes(
|
clientapi.AddPublicRoutes(
|
||||||
csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
|
csMux, &m.Config.ClientAPI, m.AccountDB,
|
||||||
m.FedClient, m.RoomserverAPI,
|
m.FedClient, m.RoomserverAPI,
|
||||||
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
|
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
|
||||||
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
|
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
|
||||||
|
@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
|
||||||
)
|
)
|
||||||
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
|
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
|
||||||
syncapi.AddPublicRoutes(
|
syncapi.AddPublicRoutes(
|
||||||
csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
|
csMux, m.UserAPI, m.RoomserverAPI,
|
||||||
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
|
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,10 +15,10 @@
|
||||||
package keyserver
|
package keyserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
|
fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup/kafka"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/internal"
|
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||||
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
||||||
|
@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
|
||||||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
||||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||||
func NewInternalAPI(
|
func NewInternalAPI(
|
||||||
cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer,
|
cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
|
||||||
) api.KeyInternalAPI {
|
) api.KeyInternalAPI {
|
||||||
|
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||||
|
|
||||||
db, err := storage.NewDatabase(&cfg.Database)
|
db, err := storage.NewDatabase(&cfg.Database)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to connect to key server database")
|
logrus.WithError(err).Panicf("failed to connect to key server database")
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/setup"
|
"github.com/matrix-org/dendrite/internal/setup"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup/kafka"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal"
|
"github.com/matrix-org/dendrite/roomserver/internal"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -41,6 +42,8 @@ func NewInternalAPI(
|
||||||
) api.RoomserverInternalAPI {
|
) api.RoomserverInternalAPI {
|
||||||
cfg := &base.Cfg.RoomServer
|
cfg := &base.Cfg.RoomServer
|
||||||
|
|
||||||
|
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||||
|
|
||||||
var perspectiveServerNames []gomatrixserverlib.ServerName
|
var perspectiveServerNames []gomatrixserverlib.ServerName
|
||||||
for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives {
|
for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives {
|
||||||
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
|
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
|
||||||
|
@ -52,7 +55,7 @@ func NewInternalAPI(
|
||||||
}
|
}
|
||||||
|
|
||||||
return internal.NewRoomserverAPI(
|
return internal.NewRoomserverAPI(
|
||||||
cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
|
cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
|
||||||
base.Caches, keyRing, perspectiveServerNames,
|
base.Caches, keyRing, perspectiveServerNames,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/setup"
|
"github.com/matrix-org/dendrite/internal/setup"
|
||||||
"github.com/matrix-org/dendrite/internal/test"
|
"github.com/matrix-org/dendrite/internal/test"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/internal"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -160,7 +163,9 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
|
||||||
cfg.Defaults()
|
cfg.Defaults()
|
||||||
cfg.Global.ServerName = testOrigin
|
cfg.Global.ServerName = testOrigin
|
||||||
cfg.Global.Kafka.UseNaffka = true
|
cfg.Global.Kafka.UseNaffka = true
|
||||||
cfg.RoomServer.Database.ConnectionString = config.DataSource(roomserverDBFileURI)
|
cfg.RoomServer.Database = config.DatabaseOptions{
|
||||||
|
ConnectionString: roomserverDBFileURI,
|
||||||
|
}
|
||||||
dp := &dummyProducer{
|
dp := &dummyProducer{
|
||||||
topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
|
topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
|
||||||
}
|
}
|
||||||
|
@ -169,12 +174,17 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
|
||||||
t.Fatalf("failed to make caches: %s", err)
|
t.Fatalf("failed to make caches: %s", err)
|
||||||
}
|
}
|
||||||
base := &setup.BaseDendrite{
|
base := &setup.BaseDendrite{
|
||||||
KafkaProducer: dp,
|
|
||||||
Caches: cache,
|
Caches: cache,
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
}
|
}
|
||||||
|
roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches)
|
||||||
return NewInternalAPI(base, &test.NopJSONVerifier{}), dp
|
if err != nil {
|
||||||
|
logrus.WithError(err).Panicf("failed to connect to room server db")
|
||||||
|
}
|
||||||
|
return internal.NewRoomserverAPI(
|
||||||
|
&cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)),
|
||||||
|
base.Caches, &test.NopJSONVerifier{}, nil,
|
||||||
|
), dp
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) {
|
func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) {
|
||||||
|
|
|
@ -39,7 +39,8 @@ CREATE INDEX IF NOT EXISTS roomserver_redactions_redacts_event_id ON roomserver_
|
||||||
|
|
||||||
const insertRedactionSQL = "" +
|
const insertRedactionSQL = "" +
|
||||||
"INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" +
|
"INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" +
|
||||||
" VALUES ($1, $2, $3)"
|
" VALUES ($1, $2, $3)" +
|
||||||
|
" ON CONFLICT DO NOTHING"
|
||||||
|
|
||||||
const selectRedactionInfoByRedactionEventIDSQL = "" +
|
const selectRedactionInfoByRedactionEventIDSQL = "" +
|
||||||
"SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" +
|
"SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" +
|
||||||
|
|
|
@ -18,23 +18,30 @@ type LatestEventsUpdater struct {
|
||||||
currentStateSnapshotNID types.StateSnapshotNID
|
currentStateSnapshotNID types.StateSnapshotNID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func rollback(txn *sql.Tx) {
|
||||||
|
if txn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
txn.Rollback() // nolint: errcheck
|
||||||
|
}
|
||||||
|
|
||||||
func NewLatestEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo types.RoomInfo) (*LatestEventsUpdater, error) {
|
func NewLatestEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo types.RoomInfo) (*LatestEventsUpdater, error) {
|
||||||
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
|
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
|
||||||
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID)
|
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
txn.Rollback() // nolint: errcheck
|
rollback(txn)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
stateAndRefs, err := d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs)
|
stateAndRefs, err := d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
txn.Rollback() // nolint: errcheck
|
rollback(txn)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var lastEventIDSent string
|
var lastEventIDSent string
|
||||||
if lastEventNIDSent != 0 {
|
if lastEventNIDSent != 0 {
|
||||||
lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent)
|
lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
txn.Rollback() // nolint: errcheck
|
rollback(txn)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -458,7 +458,7 @@ func (d *Database) StoreEvent(
|
||||||
eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID())
|
eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID())
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("d.EventsTable.SelectEvent: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -467,6 +467,9 @@ func (d *Database) StoreEvent(
|
||||||
}
|
}
|
||||||
if !isRejected { // ignore rejected redaction events
|
if !isRejected { // ignore rejected redaction events
|
||||||
redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event)
|
redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.handleRedactions: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -627,6 +630,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) (
|
||||||
// to cross-reference with other tables when loading.
|
// to cross-reference with other tables when loading.
|
||||||
//
|
//
|
||||||
// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction.
|
// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction.
|
||||||
|
// nolint:gocyclo
|
||||||
func (d *Database) handleRedactions(
|
func (d *Database) handleRedactions(
|
||||||
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event,
|
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event,
|
||||||
) (*gomatrixserverlib.Event, string, error) {
|
) (*gomatrixserverlib.Event, string, error) {
|
||||||
|
@ -644,13 +648,13 @@ func (d *Database) handleRedactions(
|
||||||
RedactsEventID: event.Redacts(),
|
RedactsEventID: event.Redacts(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", fmt.Errorf("d.RedactionsTable.InsertRedaction: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event)
|
redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err)
|
||||||
}
|
}
|
||||||
if validated || redactedEvent == nil || redactionEvent == nil {
|
if validated || redactedEvent == nil || redactionEvent == nil {
|
||||||
// we've seen this redaction before or there is nothing to redact
|
// we've seen this redaction before or there is nothing to redact
|
||||||
|
@ -664,7 +668,7 @@ func (d *Database) handleRedactions(
|
||||||
// mark the event as redacted
|
// mark the event as redacted
|
||||||
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
|
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err)
|
||||||
}
|
}
|
||||||
if redactionsArePermanent {
|
if redactionsArePermanent {
|
||||||
redactedEvent.Event = redactedEvent.Redact()
|
redactedEvent.Event = redactedEvent.Redact()
|
||||||
|
@ -672,10 +676,15 @@ func (d *Database) handleRedactions(
|
||||||
// overwrite the eventJSON table
|
// overwrite the eventJSON table
|
||||||
err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON())
|
err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &redactionEvent.Event, redactedEvent.EventID(), d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true)
|
err = d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("d.RedactionsTable.MarkRedactionValidated: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &redactionEvent.Event, redactedEvent.EventID(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadRedactionPair returns both the redaction event and the redacted event, else nil.
|
// loadRedactionPair returns both the redaction event and the redacted event, else nil.
|
||||||
|
|
|
@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS roomserver_redactions (
|
||||||
`
|
`
|
||||||
|
|
||||||
const insertRedactionSQL = "" +
|
const insertRedactionSQL = "" +
|
||||||
"INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" +
|
"INSERT OR IGNORE INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" +
|
||||||
" VALUES ($1, $2, $3)"
|
" VALUES ($1, $2, $3)"
|
||||||
|
|
||||||
const selectRedactionInfoByRedactionEventIDSQL = "" +
|
const selectRedactionInfoByRedactionEventIDSQL = "" +
|
||||||
|
|
|
@ -17,11 +17,11 @@ package syncapi
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
|
"github.com/matrix-org/dendrite/internal/setup/kafka"
|
||||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
@ -37,13 +37,14 @@ import (
|
||||||
// component.
|
// component.
|
||||||
func AddPublicRoutes(
|
func AddPublicRoutes(
|
||||||
router *mux.Router,
|
router *mux.Router,
|
||||||
consumer sarama.Consumer,
|
|
||||||
userAPI userapi.UserInternalAPI,
|
userAPI userapi.UserInternalAPI,
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
keyAPI keyapi.KeyInternalAPI,
|
keyAPI keyapi.KeyInternalAPI,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
) {
|
) {
|
||||||
|
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||||
|
|
||||||
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
|
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to connect to sync db")
|
logrus.WithError(err).Panicf("failed to connect to sync db")
|
||||||
|
|
|
@ -482,3 +482,5 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Real user
|
||||||
POST rejects invalid utf-8 in JSON
|
POST rejects invalid utf-8 in JSON
|
||||||
Users cannot kick users who have already left a room
|
Users cannot kick users who have already left a room
|
||||||
A prev_batch token from incremental sync can be used in the v1 messages API
|
A prev_batch token from incremental sync can be used in the v1 messages API
|
||||||
|
Event with an invalid signature in the send_join response should not cause room join to fail
|
||||||
|
Inbound federation rejects typing notifications from wrong remote
|
||||||
|
|
Loading…
Reference in a new issue