Merge branch 'main' into swedgwood/space-summaries

This commit is contained in:
Sam Wedgwood 2023-07-19 16:53:04 +01:00 committed by GitHub
commit 3498b46408
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 184 additions and 96 deletions

View file

@ -126,7 +126,7 @@ func (p *P2PMonolith) SetupPinecone(sk ed25519.PrivateKey) {
} }
func (p *P2PMonolith) SetupDendrite( func (p *P2PMonolith) SetupDendrite(
processCtx *process.ProcessContext, cfg *config.Dendrite, cm sqlutil.Connections, routers httputil.Routers, processCtx *process.ProcessContext, cfg *config.Dendrite, cm *sqlutil.Connections, routers httputil.Routers,
port int, enableRelaying bool, enableMetrics bool, enableWebsockets bool) { port int, enableRelaying bool, enableMetrics bool, enableWebsockets bool) {
p.port = port p.port = port

View file

@ -16,7 +16,9 @@ package consumers
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strconv" "strconv"
"time" "time"
@ -411,13 +413,26 @@ func JoinedHostsFromEvents(ctx context.Context, evs []gomatrixserverlib.PDU, rsA
if err != nil { if err != nil {
return nil, err return nil, err
} }
var domain spec.ServerName
userID, err := rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(*ev.StateKey())) userID, err := rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(*ev.StateKey()))
if err != nil { if err != nil {
return nil, err if errors.As(err, new(base64.CorruptInputError)) {
// Fallback to using the "old" way of getting the user domain, avoids
// "illegal base64 data at input byte 0" errors
// FIXME: we should do this in QueryUserIDForSender instead
_, domain, err = gomatrixserverlib.SplitID('@', *ev.StateKey())
if err != nil {
return nil, err
}
} else {
return nil, err
}
} else {
domain = userID.Domain()
} }
joinedHosts = append(joinedHosts, types.JoinedHost{ joinedHosts = append(joinedHosts, types.JoinedHost{
MemberEventID: ev.EventID(), ServerName: userID.Domain(), MemberEventID: ev.EventID(), ServerName: domain,
}) })
} }
return joinedHosts, nil return joinedHosts, nil

View file

@ -95,7 +95,7 @@ func AddPublicRoutes(
func NewInternalAPI( func NewInternalAPI(
processContext *process.ProcessContext, processContext *process.ProcessContext,
dendriteCfg *config.Dendrite, dendriteCfg *config.Dendrite,
cm sqlutil.Connections, cm *sqlutil.Connections,
natsInstance *jetstream.NATSInstance, natsInstance *jetstream.NATSInstance,
federation fclient.FederationClient, federation fclient.FederationClient,
rsAPI roomserverAPI.FederationRoomserverAPI, rsAPI roomserverAPI.FederationRoomserverAPI,

View file

@ -29,7 +29,7 @@ func (a *FederationInternalAPI) MakeJoin(
func (a *FederationInternalAPI) SendJoin( func (a *FederationInternalAPI) SendJoin(
ctx context.Context, origin, s spec.ServerName, event gomatrixserverlib.PDU, ctx context.Context, origin, s spec.ServerName, event gomatrixserverlib.PDU,
) (res gomatrixserverlib.SendJoinResponse, err error) { ) (res gomatrixserverlib.SendJoinResponse, err error) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout) ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel() defer cancel()
ires, err := a.federation.SendJoin(ctx, origin, s, event) ires, err := a.federation.SendJoin(ctx, origin, s, event)
if err != nil { if err != nil {

View file

@ -15,9 +15,11 @@
package routing package routing
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -52,6 +54,12 @@ func GetProfile(
profile, err := userAPI.QueryProfile(httpReq.Context(), userID) profile, err := userAPI.QueryProfile(httpReq.Context(), userID)
if err != nil { if err != nil {
if errors.Is(err, appserviceAPI.ErrProfileNotExists) {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: spec.NotFound("The user does not exist or does not have a profile."),
}
}
util.GetLogger(httpReq.Context()).WithError(err).Error("userAPI.QueryProfile failed") util.GetLogger(httpReq.Context()).WithError(err).Error("userAPI.QueryProfile failed")
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusInternalServerError, Code: http.StatusInternalServerError,

View file

@ -36,7 +36,7 @@ type Database struct {
} }
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(spec.ServerName) bool) (*Database, error) { func NewDatabase(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(spec.ServerName) bool) (*Database, error) {
var d Database var d Database
var err error var err error
if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil { if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil {

View file

@ -34,7 +34,7 @@ type Database struct {
} }
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(spec.ServerName) bool) (*Database, error) { func NewDatabase(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(spec.ServerName) bool) (*Database, error) {
var d Database var d Database
var err error var err error
if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil { if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil {

View file

@ -30,7 +30,7 @@ import (
) )
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(spec.ServerName) bool) (Database, error) { func NewDatabase(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(spec.ServerName) bool) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(ctx, conMan, dbProperties, cache, isLocalServerName) return sqlite3.NewDatabase(ctx, conMan, dbProperties, cache, isLocalServerName)

View file

@ -29,24 +29,26 @@ type Connections struct {
processContext *process.ProcessContext processContext *process.ProcessContext
} }
func NewConnectionManager(processCtx *process.ProcessContext, globalConfig config.DatabaseOptions) Connections { func NewConnectionManager(processCtx *process.ProcessContext, globalConfig config.DatabaseOptions) *Connections {
return Connections{ return &Connections{
globalConfig: globalConfig, globalConfig: globalConfig,
processContext: processCtx, processContext: processCtx,
} }
} }
func (c *Connections) Connection(dbProperties *config.DatabaseOptions) (*sql.DB, Writer, error) { func (c *Connections) Connection(dbProperties *config.DatabaseOptions) (*sql.DB, Writer, error) {
writer := NewDummyWriter()
if dbProperties.ConnectionString.IsSQLite() {
writer = NewExclusiveWriter()
}
var err error var err error
if dbProperties.ConnectionString == "" { if dbProperties.ConnectionString == "" {
// if no connectionString was provided, try the global one // if no connectionString was provided, try the global one
dbProperties = &c.globalConfig dbProperties = &c.globalConfig
} }
if dbProperties.ConnectionString != "" || c.db == nil {
writer := NewDummyWriter()
if dbProperties.ConnectionString.IsSQLite() {
writer = NewExclusiveWriter()
}
if dbProperties.ConnectionString != "" && c.db == nil {
// Open a new database connection using the supplied config. // Open a new database connection using the supplied config.
c.db, err = Open(dbProperties, writer) c.db, err = Open(dbProperties, writer)
if err != nil { if err != nil {

View file

@ -6,51 +6,113 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
) )
func TestConnectionManager(t *testing.T) { func TestConnectionManager(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
conStr, close := test.PrepareDBConnectionString(t, dbType)
t.Cleanup(close)
cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{})
dbProps := &config.DatabaseOptions{ConnectionString: config.DataSource(conStr)} t.Run("component defined connection string", func(t *testing.T) {
db, writer, err := cm.Connection(dbProps) test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
if err != nil { conStr, close := test.PrepareDBConnectionString(t, dbType)
t.Fatal(err) t.Cleanup(close)
} cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{})
switch dbType { dbProps := &config.DatabaseOptions{ConnectionString: config.DataSource(conStr)}
case test.DBTypeSQLite: db, writer, err := cm.Connection(dbProps)
_, ok := writer.(*sqlutil.ExclusiveWriter) if err != nil {
if !ok { t.Fatal(err)
t.Fatalf("expected exclusive writer")
} }
case test.DBTypePostgres:
_, ok := writer.(*sqlutil.DummyWriter) switch dbType {
if !ok { case test.DBTypeSQLite:
t.Fatalf("expected dummy writer") _, ok := writer.(*sqlutil.ExclusiveWriter)
if !ok {
t.Fatalf("expected exclusive writer")
}
case test.DBTypePostgres:
_, ok := writer.(*sqlutil.DummyWriter)
if !ok {
t.Fatalf("expected dummy writer")
}
} }
}
// test global db pool // reuse existing connection
dbGlobal, writerGlobal, err := cm.Connection(&config.DatabaseOptions{}) db2, writer2, err := cm.Connection(dbProps)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !reflect.DeepEqual(db, dbGlobal) { if !reflect.DeepEqual(db, db2) {
t.Fatalf("expected database connection to be reused") t.Fatalf("expected database connection to be reused")
} }
if !reflect.DeepEqual(writer, writerGlobal) { if !reflect.DeepEqual(writer, writer2) {
t.Fatalf("expected database writer to be reused") t.Fatalf("expected database writer to be reused")
} }
})
// test invalid connection string configured
cm2 := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{})
_, _, err = cm2.Connection(&config.DatabaseOptions{ConnectionString: "http://"})
if err == nil {
t.Fatal("expected an error but got none")
}
}) })
t.Run("global connection pool", func(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
conStr, close := test.PrepareDBConnectionString(t, dbType)
t.Cleanup(close)
cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{ConnectionString: config.DataSource(conStr)})
dbProps := &config.DatabaseOptions{}
db, writer, err := cm.Connection(dbProps)
if err != nil {
t.Fatal(err)
}
switch dbType {
case test.DBTypeSQLite:
_, ok := writer.(*sqlutil.ExclusiveWriter)
if !ok {
t.Fatalf("expected exclusive writer")
}
case test.DBTypePostgres:
_, ok := writer.(*sqlutil.DummyWriter)
if !ok {
t.Fatalf("expected dummy writer")
}
}
// reuse existing connection
db2, writer2, err := cm.Connection(dbProps)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(db, db2) {
t.Fatalf("expected database connection to be reused")
}
if !reflect.DeepEqual(writer, writer2) {
t.Fatalf("expected database writer to be reused")
}
})
})
t.Run("shutdown", func(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
conStr, close := test.PrepareDBConnectionString(t, dbType)
t.Cleanup(close)
processCtx := process.NewProcessContext()
cm := sqlutil.NewConnectionManager(processCtx, config.DatabaseOptions{ConnectionString: config.DataSource(conStr)})
dbProps := &config.DatabaseOptions{}
_, _, err := cm.Connection(dbProps)
if err != nil {
t.Fatal(err)
}
processCtx.ShutdownDendrite()
processCtx.WaitForComponentsToFinish()
})
})
// test invalid connection string configured
cm2 := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{})
_, _, err := cm2.Connection(&config.DatabaseOptions{ConnectionString: "http://"})
if err == nil {
t.Fatal("expected an error but got none")
}
} }

View file

@ -28,7 +28,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the MediaAPI component. // AddPublicRoutes sets up and registers HTTP handlers for the MediaAPI component.
func AddPublicRoutes( func AddPublicRoutes(
mediaRouter *mux.Router, mediaRouter *mux.Router,
cm sqlutil.Connections, cm *sqlutil.Connections,
cfg *config.Dendrite, cfg *config.Dendrite,
userAPI userapi.MediaUserAPI, userAPI userapi.MediaUserAPI,
client *fclient.Client, client *fclient.Client,

View file

@ -24,7 +24,7 @@ import (
) )
// NewDatabase opens a postgres database. // NewDatabase opens a postgres database.
func NewDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.Database, error) { func NewDatabase(conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.Database, error) {
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -23,7 +23,7 @@ import (
) )
// NewDatabase opens a SQLIte database. // NewDatabase opens a SQLIte database.
func NewDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.Database, error) { func NewDatabase(conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.Database, error) {
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -27,7 +27,7 @@ import (
) )
// NewMediaAPIDatasource opens a database connection. // NewMediaAPIDatasource opens a database connection.
func NewMediaAPIDatasource(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) { func NewMediaAPIDatasource(conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(conMan, dbProperties) return sqlite3.NewDatabase(conMan, dbProperties)

View file

@ -53,7 +53,7 @@ func AddPublicRoutes(
func NewRelayInternalAPI( func NewRelayInternalAPI(
dendriteCfg *config.Dendrite, dendriteCfg *config.Dendrite,
cm sqlutil.Connections, cm *sqlutil.Connections,
fedClient fclient.FederationClient, fedClient fclient.FederationClient,
rsAPI rsAPI.RoomserverInternalAPI, rsAPI rsAPI.RoomserverInternalAPI,
keyRing *gomatrixserverlib.KeyRing, keyRing *gomatrixserverlib.KeyRing,

View file

@ -33,7 +33,7 @@ type Database struct {
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase( func NewDatabase(
conMan sqlutil.Connections, conMan *sqlutil.Connections,
dbProperties *config.DatabaseOptions, dbProperties *config.DatabaseOptions,
cache caching.FederationCache, cache caching.FederationCache,
isLocalServerName func(spec.ServerName) bool, isLocalServerName func(spec.ServerName) bool,

View file

@ -33,7 +33,7 @@ type Database struct {
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase( func NewDatabase(
conMan sqlutil.Connections, conMan *sqlutil.Connections,
dbProperties *config.DatabaseOptions, dbProperties *config.DatabaseOptions,
cache caching.FederationCache, cache caching.FederationCache,
isLocalServerName func(spec.ServerName) bool, isLocalServerName func(spec.ServerName) bool,

View file

@ -30,7 +30,7 @@ import (
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase( func NewDatabase(
conMan sqlutil.Connections, conMan *sqlutil.Connections,
dbProperties *config.DatabaseOptions, dbProperties *config.DatabaseOptions,
cache caching.FederationCache, cache caching.FederationCache,
isLocalServerName func(spec.ServerName) bool, isLocalServerName func(spec.ServerName) bool,

View file

@ -34,7 +34,7 @@ import (
func NewInternalAPI( func NewInternalAPI(
processContext *process.ProcessContext, processContext *process.ProcessContext,
cfg *config.Dendrite, cfg *config.Dendrite,
cm sqlutil.Connections, cm *sqlutil.Connections,
natsInstance *jetstream.NATSInstance, natsInstance *jetstream.NATSInstance,
caches caching.RoomServerCaches, caches caching.RoomServerCaches,
enableMetrics bool, enableMetrics bool,

View file

@ -37,7 +37,7 @@ type Database struct {
} }
// Open a postgres database. // Open a postgres database.
func Open(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { func Open(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) {
var d Database var d Database
var err error var err error
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)

View file

@ -36,7 +36,7 @@ type Database struct {
} }
// Open a sqlite database. // Open a sqlite database.
func Open(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { func Open(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) {
var d Database var d Database
var err error var err error
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)

View file

@ -29,7 +29,7 @@ import (
) )
// Open opens a database connection. // Open opens a database connection.
func Open(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { func Open(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.Open(ctx, conMan, dbProperties, cache) return sqlite3.Open(ctx, conMan, dbProperties, cache)

View file

@ -61,7 +61,7 @@ func (m *Monolith) AddAllPublicRoutes(
processCtx *process.ProcessContext, processCtx *process.ProcessContext,
cfg *config.Dendrite, cfg *config.Dendrite,
routers httputil.Routers, routers httputil.Routers,
cm sqlutil.Connections, cm *sqlutil.Connections,
natsInstance *jetstream.NATSInstance, natsInstance *jetstream.NATSInstance,
caches *caching.Caches, caches *caching.Caches,
enableMetrics bool, enableMetrics bool,

View file

@ -105,7 +105,7 @@ func toClientResponse(ctx context.Context, res *MSC2836EventRelationshipsRespons
// Enable this MSC // Enable this MSC
func Enable( func Enable(
cfg *config.Dendrite, cm sqlutil.Connections, routers httputil.Routers, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, cfg *config.Dendrite, cm *sqlutil.Connections, routers httputil.Routers, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI,
userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier, userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier,
) error { ) error {
db, err := NewDatabase(cm, &cfg.MSCs.Database) db, err := NewDatabase(cm, &cfg.MSCs.Database)

View file

@ -59,14 +59,14 @@ type DB struct {
} }
// NewDatabase loads the database for msc2836 // NewDatabase loads the database for msc2836
func NewDatabase(conMan sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) { func NewDatabase(conMan *sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) {
if dbOpts.ConnectionString.IsPostgres() { if dbOpts.ConnectionString.IsPostgres() {
return newPostgresDatabase(conMan, dbOpts) return newPostgresDatabase(conMan, dbOpts)
} }
return newSQLiteDatabase(conMan, dbOpts) return newSQLiteDatabase(conMan, dbOpts)
} }
func newPostgresDatabase(conMan sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) { func newPostgresDatabase(conMan *sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) {
d := DB{} d := DB{}
var err error var err error
if d.db, d.writer, err = conMan.Connection(dbOpts); err != nil { if d.db, d.writer, err = conMan.Connection(dbOpts); err != nil {
@ -144,7 +144,7 @@ func newPostgresDatabase(conMan sqlutil.Connections, dbOpts *config.DatabaseOpti
return &d, err return &d, err
} }
func newSQLiteDatabase(conMan sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) { func newSQLiteDatabase(conMan *sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) {
d := DB{} d := DB{}
var err error var err error
if d.db, d.writer, err = conMan.Connection(dbOpts); err != nil { if d.db, d.writer, err = conMan.Connection(dbOpts); err != nil {

View file

@ -29,7 +29,7 @@ import (
) )
// Enable MSCs - returns an error on unknown MSCs // Enable MSCs - returns an error on unknown MSCs
func Enable(cfg *config.Dendrite, cm sqlutil.Connections, routers httputil.Routers, monolith *setup.Monolith, caches *caching.Caches) error { func Enable(cfg *config.Dendrite, cm *sqlutil.Connections, routers httputil.Routers, monolith *setup.Monolith, caches *caching.Caches) error {
for _, msc := range cfg.MSCs.MSCs { for _, msc := range cfg.MSCs.MSCs {
util.GetLogger(context.Background()).WithField("msc", msc).Info("Enabling MSC") util.GetLogger(context.Background()).WithField("msc", msc).Info("Enabling MSC")
if err := EnableMSC(cfg, cm, routers, monolith, msc, caches); err != nil { if err := EnableMSC(cfg, cm, routers, monolith, msc, caches); err != nil {
@ -39,7 +39,7 @@ func Enable(cfg *config.Dendrite, cm sqlutil.Connections, routers httputil.Route
return nil return nil
} }
func EnableMSC(cfg *config.Dendrite, cm sqlutil.Connections, routers httputil.Routers, monolith *setup.Monolith, msc string, caches *caching.Caches) error { func EnableMSC(cfg *config.Dendrite, cm *sqlutil.Connections, routers httputil.Routers, monolith *setup.Monolith, msc string, caches *caching.Caches) error {
switch msc { switch msc {
case "msc2836": case "msc2836":
return msc2836.Enable(cfg, cm, routers, monolith.RoomserverAPI, monolith.FederationAPI, monolith.UserAPI, monolith.KeyRing) return msc2836.Enable(cfg, cm, routers, monolith.RoomserverAPI, monolith.FederationAPI, monolith.UserAPI, monolith.KeyRing)

View file

@ -17,16 +17,12 @@ package consumers
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/base64"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -38,6 +34,11 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
) )
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
@ -141,7 +142,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
) )
} }
if err != nil { if err != nil {
log.WithError(err).Error("roomserver output log: failed to process event") if errors.As(err, new(base64.CorruptInputError)) {
// no matter how often we retry this event, we will always get this error, discard the event
return true
}
log.WithFields(log.Fields{
"type": output.Type,
}).WithError(err).Error("roomserver output log: failed to process event")
sentry.CaptureException(err)
return false return false
} }
@ -237,21 +245,18 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ev, err := s.updateStateEvent(ev) ev, err := s.updateStateEvent(ev)
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }
for i := range addsStateEvents { for i := range addsStateEvents {
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i]) addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
if err != nil { if err != nil {
sentry.CaptureException(err)
return err return err
} }
} }
if msg.RewritesState { if msg.RewritesState {
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil { if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
sentry.CaptureException(err)
return fmt.Errorf("s.db.PurgeRoom: %w", err) return fmt.Errorf("s.db.PurgeRoom: %w", err)
} }
} }
@ -289,7 +294,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
sentry.CaptureException(err)
return err return err
} }
@ -430,7 +434,6 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil { if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": msg.Event.EventID(), "event_id": msg.Event.EventID(),
@ -452,7 +455,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
// It's possible we just haven't heard of this invite yet, so // It's possible we just haven't heard of this invite yet, so
// we should not panic if we try to retire it. // we should not panic if we try to retire it.
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": msg.EventID, "event_id": msg.EventID,
@ -496,7 +498,6 @@ func (s *OutputRoomEventConsumer) onNewPeek(
) { ) {
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil { if err != nil {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
log.ErrorKey: err, log.ErrorKey: err,

View file

@ -36,7 +36,7 @@ type SyncServerDatasource struct {
} }
// NewDatabase creates a new sync server database // NewDatabase creates a new sync server database
func NewDatabase(ctx context.Context, cm sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { func NewDatabase(ctx context.Context, cm *sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource var d SyncServerDatasource
var err error var err error
if d.db, d.writer, err = cm.Connection(dbProperties); err != nil { if d.db, d.writer, err = cm.Connection(dbProperties); err != nil {

View file

@ -36,7 +36,7 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database // NewDatabase creates a new sync server database
// nolint: gocyclo // nolint: gocyclo
func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { func NewDatabase(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource var d SyncServerDatasource
var err error var err error

View file

@ -28,7 +28,7 @@ import (
) )
// NewSyncServerDatasource opens a database connection. // NewSyncServerDatasource opens a database connection.
func NewSyncServerDatasource(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) { func NewSyncServerDatasource(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(ctx, conMan, dbProperties) return sqlite3.NewDatabase(ctx, conMan, dbProperties)

View file

@ -45,7 +45,7 @@ func AddPublicRoutes(
processContext *process.ProcessContext, processContext *process.ProcessContext,
routers httputil.Routers, routers httputil.Routers,
dendriteCfg *config.Dendrite, dendriteCfg *config.Dendrite,
cm sqlutil.Connections, cm *sqlutil.Connections,
natsInstance *jetstream.NATSInstance, natsInstance *jetstream.NATSInstance,
userAPI userapi.SyncUserAPI, userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI, rsAPI api.SyncRoomserverAPI,

View file

@ -31,7 +31,7 @@ import (
) )
// NewDatabase creates a new accounts and profiles database // NewDatabase creates a new accounts and profiles database
func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName spec.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { func NewDatabase(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName spec.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) {
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err
@ -140,7 +140,7 @@ func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *
}, nil }, nil
} }
func NewKeyDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) { func NewKeyDatabase(conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) {
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -29,7 +29,7 @@ import (
) )
// NewUserDatabase creates a new accounts and profiles database // NewUserDatabase creates a new accounts and profiles database
func NewUserDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName spec.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { func NewUserDatabase(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName spec.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) {
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err
@ -137,7 +137,7 @@ func NewUserDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperti
}, nil }, nil
} }
func NewKeyDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) { func NewKeyDatabase(conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) {
db, writer, err := conMan.Connection(dbProperties) db, writer, err := conMan.Connection(dbProperties)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -34,7 +34,7 @@ import (
// and sets postgres connection parameters // and sets postgres connection parameters
func NewUserDatabase( func NewUserDatabase(
ctx context.Context, ctx context.Context,
conMan sqlutil.Connections, conMan *sqlutil.Connections,
dbProperties *config.DatabaseOptions, dbProperties *config.DatabaseOptions,
serverName spec.ServerName, serverName spec.ServerName,
bcryptCost int, bcryptCost int,
@ -54,7 +54,7 @@ func NewUserDatabase(
// NewKeyDatabase opens a new Postgres or Sqlite database (base on dataSourceName) scheme) // NewKeyDatabase opens a new Postgres or Sqlite database (base on dataSourceName) scheme)
// and sets postgres connection parameters. // and sets postgres connection parameters.
func NewKeyDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (KeyDatabase, error) { func NewKeyDatabase(conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions) (KeyDatabase, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewKeyDatabase(conMan, dbProperties) return sqlite3.NewKeyDatabase(conMan, dbProperties)

View file

@ -42,7 +42,7 @@ import (
func NewInternalAPI( func NewInternalAPI(
processContext *process.ProcessContext, processContext *process.ProcessContext,
dendriteCfg *config.Dendrite, dendriteCfg *config.Dendrite,
cm sqlutil.Connections, cm *sqlutil.Connections,
natsInstance *jetstream.NATSInstance, natsInstance *jetstream.NATSInstance,
rsAPI rsapi.UserRoomserverAPI, rsAPI rsapi.UserRoomserverAPI,
fedClient fedsenderapi.KeyserverFederationAPI, fedClient fedsenderapi.KeyserverFederationAPI,