mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-29 01:33:10 -06:00
Rename some things
This commit is contained in:
parent
903044819a
commit
4e2a5ac2dc
|
|
@ -308,7 +308,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
|
|
||||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||||
rsAPI.SetFederationSenderAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI)
|
||||||
rsAPI.SetKeyring(keyRing)
|
rsAPI.SetKeyring(keyRing)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
||||||
|
|
@ -130,7 +130,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
|
|
||||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||||
rsAPI.SetFederationSenderAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI)
|
||||||
rsAPI.SetKeyring(keyRing)
|
rsAPI.SetKeyring(keyRing)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
||||||
|
|
@ -161,7 +161,7 @@ func main() {
|
||||||
&base.Base, federation, rsAPI, base.Base.Caches, true,
|
&base.Base, federation, rsAPI, base.Base.Caches, true,
|
||||||
)
|
)
|
||||||
keyRing := fsAPI.KeyRing()
|
keyRing := fsAPI.KeyRing()
|
||||||
rsAPI.SetFederationSenderAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI)
|
||||||
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
|
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
|
||||||
err = provider.Start()
|
err = provider.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -186,7 +186,7 @@ func main() {
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||||
|
|
||||||
rsComponent.SetFederationSenderAPI(fsAPI)
|
rsComponent.SetFederationAPI(fsAPI)
|
||||||
rsComponent.SetKeyring(keyRing)
|
rsComponent.SetKeyring(keyRing)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
||||||
|
|
@ -121,7 +121,7 @@ func main() {
|
||||||
base, federation, rsAPI, base.Caches, true,
|
base, federation, rsAPI, base.Caches, true,
|
||||||
)
|
)
|
||||||
|
|
||||||
rsComponent.SetFederationSenderAPI(fsAPI)
|
rsComponent.SetFederationAPI(fsAPI)
|
||||||
rsComponent.SetKeyring(keyRing)
|
rsComponent.SetKeyring(keyRing)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,7 @@ func main() {
|
||||||
|
|
||||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||||
rsImpl.SetFederationSenderAPI(fsAPI)
|
rsImpl.SetFederationAPI(fsAPI)
|
||||||
|
|
||||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ func RoomServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
|
||||||
asAPI := base.AppserviceHTTPClient()
|
asAPI := base.AppserviceHTTPClient()
|
||||||
fsAPI := base.FederationAPIHTTPClient()
|
fsAPI := base.FederationAPIHTTPClient()
|
||||||
rsAPI := roomserver.NewInternalAPI(base)
|
rsAPI := roomserver.NewInternalAPI(base)
|
||||||
rsAPI.SetFederationSenderAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI)
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
|
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -199,7 +199,7 @@ func startup() {
|
||||||
)
|
)
|
||||||
rsAPI.SetAppserviceAPI(asQuery)
|
rsAPI.SetAppserviceAPI(asQuery)
|
||||||
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
|
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
|
||||||
rsAPI.SetFederationSenderAPI(fedSenderAPI)
|
rsAPI.SetFederationAPI(fedSenderAPI)
|
||||||
rsAPI.SetKeyring(keyRing)
|
rsAPI.SetKeyring(keyRing)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
||||||
|
|
@ -211,7 +211,7 @@ func main() {
|
||||||
)
|
)
|
||||||
rsAPI.SetAppserviceAPI(asQuery)
|
rsAPI.SetAppserviceAPI(asQuery)
|
||||||
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
|
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
|
||||||
rsAPI.SetFederationSenderAPI(fedSenderAPI)
|
rsAPI.SetFederationAPI(fedSenderAPI)
|
||||||
rsAPI.SetKeyring(keyRing)
|
rsAPI.SetKeyring(keyRing)
|
||||||
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ type Database struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (*Database, error) {
|
||||||
var d Database
|
var d Database
|
||||||
var err error
|
var err error
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
|
|
@ -88,19 +88,19 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
Cache: cache,
|
Cache: cache,
|
||||||
Writer: d.writer,
|
Writer: d.writer,
|
||||||
FederationSenderJoinedHosts: joinedHosts,
|
FederationJoinedHosts: joinedHosts,
|
||||||
FederationSenderQueuePDUs: queuePDUs,
|
FederationQueuePDUs: queuePDUs,
|
||||||
FederationSenderQueueEDUs: queueEDUs,
|
FederationQueueEDUs: queueEDUs,
|
||||||
FederationSenderQueueJSON: queueJSON,
|
FederationQueueJSON: queueJSON,
|
||||||
FederationSenderBlacklist: blacklist,
|
FederationBlacklist: blacklist,
|
||||||
FederationSenderInboundPeeks: inboundPeeks,
|
FederationInboundPeeks: inboundPeeks,
|
||||||
FederationSenderOutboundPeeks: outboundPeeks,
|
FederationOutboundPeeks: outboundPeeks,
|
||||||
NotaryServerKeysJSON: notaryJSON,
|
NotaryServerKeysJSON: notaryJSON,
|
||||||
NotaryServerKeysMetadata: notaryMetadata,
|
NotaryServerKeysMetadata: notaryMetadata,
|
||||||
ServerSigningKeys: serverSigningKeys,
|
ServerSigningKeys: serverSigningKeys,
|
||||||
}
|
}
|
||||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -28,19 +28,19 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
Cache caching.FederationSenderCache
|
Cache caching.FederationCache
|
||||||
Writer sqlutil.Writer
|
Writer sqlutil.Writer
|
||||||
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
|
FederationQueuePDUs tables.FederationQueuePDUs
|
||||||
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
|
FederationQueueEDUs tables.FederationQueueEDUs
|
||||||
FederationSenderQueueJSON tables.FederationSenderQueueJSON
|
FederationQueueJSON tables.FederationQueueJSON
|
||||||
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
|
FederationJoinedHosts tables.FederationJoinedHosts
|
||||||
FederationSenderBlacklist tables.FederationSenderBlacklist
|
FederationBlacklist tables.FederationBlacklist
|
||||||
FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
|
FederationOutboundPeeks tables.FederationOutboundPeeks
|
||||||
FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
|
FederationInboundPeeks tables.FederationInboundPeeks
|
||||||
NotaryServerKeysJSON tables.FederationSenderNotaryServerKeysJSON
|
NotaryServerKeysJSON tables.FederationNotaryServerKeysJSON
|
||||||
NotaryServerKeysMetadata tables.FederationSenderNotaryServerKeysMetadata
|
NotaryServerKeysMetadata tables.FederationNotaryServerKeysMetadata
|
||||||
ServerSigningKeys tables.FederationSenderServerSigningKeys
|
ServerSigningKeys tables.FederationServerSigningKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
|
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
|
||||||
|
|
@ -67,18 +67,18 @@ func (d *Database) UpdateRoom(
|
||||||
removeHosts []string,
|
removeHosts []string,
|
||||||
) (joinedHosts []types.JoinedHost, err error) {
|
) (joinedHosts []types.JoinedHost, err error) {
|
||||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
joinedHosts, err = d.FederationSenderJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID)
|
joinedHosts, err = d.FederationJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, add := range addHosts {
|
for _, add := range addHosts {
|
||||||
err = d.FederationSenderJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
|
err = d.FederationJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = d.FederationSenderJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil {
|
if err = d.FederationJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -92,18 +92,18 @@ func (d *Database) UpdateRoom(
|
||||||
func (d *Database) GetJoinedHosts(
|
func (d *Database) GetJoinedHosts(
|
||||||
ctx context.Context, roomID string,
|
ctx context.Context, roomID string,
|
||||||
) ([]types.JoinedHost, error) {
|
) ([]types.JoinedHost, error) {
|
||||||
return d.FederationSenderJoinedHosts.SelectJoinedHosts(ctx, roomID)
|
return d.FederationJoinedHosts.SelectJoinedHosts(ctx, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAllJoinedHosts returns the currently joined hosts for
|
// GetAllJoinedHosts returns the currently joined hosts for
|
||||||
// all rooms known to the federation sender.
|
// all rooms known to the federation sender.
|
||||||
// Returns an error if something goes wrong.
|
// Returns an error if something goes wrong.
|
||||||
func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
|
func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
|
||||||
return d.FederationSenderJoinedHosts.SelectAllJoinedHosts(ctx)
|
return d.FederationJoinedHosts.SelectAllJoinedHosts(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) {
|
func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) {
|
||||||
return d.FederationSenderJoinedHosts.SelectJoinedHostsForRooms(ctx, roomIDs)
|
return d.FederationJoinedHosts.SelectJoinedHostsForRooms(ctx, roomIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreJSON adds a JSON blob into the queue JSON table and returns
|
// StoreJSON adds a JSON blob into the queue JSON table and returns
|
||||||
|
|
@ -115,7 +115,7 @@ func (d *Database) StoreJSON(
|
||||||
var nid int64
|
var nid int64
|
||||||
var err error
|
var err error
|
||||||
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js)
|
nid, err = d.FederationQueueJSON.InsertQueueJSON(ctx, txn, js)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -133,8 +133,8 @@ func (d *Database) PurgeRoomState(
|
||||||
// If the event is a create event then we'll delete all of the existing
|
// If the event is a create event then we'll delete all of the existing
|
||||||
// data for the room. The only reason that a create event would be replayed
|
// data for the room. The only reason that a create event would be replayed
|
||||||
// to us in this way is if we're about to receive the entire room state.
|
// to us in this way is if we're about to receive the entire room state.
|
||||||
if err := d.FederationSenderJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
|
if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
|
||||||
return fmt.Errorf("d.FederationSenderJoinedHosts.DeleteJoinedHosts: %w", err)
|
return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
@ -142,64 +142,64 @@ func (d *Database) PurgeRoomState(
|
||||||
|
|
||||||
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
|
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName)
|
return d.FederationBlacklist.InsertBlacklist(context.TODO(), txn, serverName)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error {
|
func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), txn, serverName)
|
return d.FederationBlacklist.DeleteBlacklist(context.TODO(), txn, serverName)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RemoveAllServersFromBlacklist() error {
|
func (d *Database) RemoveAllServersFromBlacklist() error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
return d.FederationSenderBlacklist.DeleteAllBlacklist(context.TODO(), txn)
|
return d.FederationBlacklist.DeleteAllBlacklist(context.TODO(), txn)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
|
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
|
||||||
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
|
return d.FederationBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
return d.FederationOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
return d.FederationOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
|
func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
|
||||||
return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
|
return d.FederationOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
|
func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
|
||||||
return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
|
return d.FederationOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
return d.FederationInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
return d.FederationInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
|
func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
|
||||||
return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
|
return d.FederationInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
|
func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
|
||||||
return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
|
return d.FederationInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) UpdateNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, serverKeys gomatrixserverlib.ServerKeys) error {
|
func (d *Database) UpdateNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, serverKeys gomatrixserverlib.ServerKeys) error {
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ func (d *Database) AssociateEDUWithDestination(
|
||||||
receipt *Receipt,
|
receipt *Receipt,
|
||||||
) error {
|
) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
|
if err := d.FederationQueueEDUs.InsertQueueEDU(
|
||||||
ctx, // context
|
ctx, // context
|
||||||
txn, // SQL transaction
|
txn, // SQL transaction
|
||||||
"", // TODO: EDU type for coalescing
|
"", // TODO: EDU type for coalescing
|
||||||
|
|
@ -58,21 +58,21 @@ func (d *Database) GetPendingEDUs(
|
||||||
) {
|
) {
|
||||||
edus = make(map[*Receipt]*gomatrixserverlib.EDU)
|
edus = make(map[*Receipt]*gomatrixserverlib.EDU)
|
||||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
|
nids, err := d.FederationQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueueEDUs: %w", err)
|
return fmt.Errorf("SelectQueueEDUs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
retrieve := make([]int64, 0, len(nids))
|
retrieve := make([]int64, 0, len(nids))
|
||||||
for _, nid := range nids {
|
for _, nid := range nids {
|
||||||
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
|
if edu, ok := d.Cache.GetFederationQueuedEDU(nid); ok {
|
||||||
edus[&Receipt{nid}] = edu
|
edus[&Receipt{nid}] = edu
|
||||||
} else {
|
} else {
|
||||||
retrieve = append(retrieve, nid)
|
retrieve = append(retrieve, nid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
blobs, err := d.FederationQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueueJSON: %w", err)
|
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -107,24 +107,24 @@ func (d *Database) CleanEDUs(
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil {
|
if err := d.FederationQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var deleteNIDs []int64
|
var deleteNIDs []int64
|
||||||
for _, nid := range nids {
|
for _, nid := range nids {
|
||||||
count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid)
|
count, err := d.FederationQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err)
|
return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err)
|
||||||
}
|
}
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
deleteNIDs = append(deleteNIDs, nid)
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
d.Cache.EvictFederationSenderQueuedEDU(nid)
|
d.Cache.EvictFederationQueuedEDU(nid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(deleteNIDs) > 0 {
|
if len(deleteNIDs) > 0 {
|
||||||
if err := d.FederationSenderQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
if err := d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||||
return fmt.Errorf("DeleteQueueJSON: %w", err)
|
return fmt.Errorf("DeleteQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -139,7 +139,7 @@ func (d *Database) GetPendingEDUCount(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
) (int64, error) {
|
) (int64, error) {
|
||||||
return d.FederationSenderQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName)
|
return d.FederationQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPendingServerNames returns the server names that have EDUs
|
// GetPendingServerNames returns the server names that have EDUs
|
||||||
|
|
@ -147,5 +147,5 @@ func (d *Database) GetPendingEDUCount(
|
||||||
func (d *Database) GetPendingEDUServerNames(
|
func (d *Database) GetPendingEDUServerNames(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
) ([]gomatrixserverlib.ServerName, error) {
|
) ([]gomatrixserverlib.ServerName, error) {
|
||||||
return d.FederationSenderQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
|
return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ func (d *Database) AssociatePDUWithDestination(
|
||||||
receipt *Receipt,
|
receipt *Receipt,
|
||||||
) error {
|
) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
|
if err := d.FederationQueuePDUs.InsertQueuePDU(
|
||||||
ctx, // context
|
ctx, // context
|
||||||
txn, // SQL transaction
|
txn, // SQL transaction
|
||||||
transactionID, // transaction ID
|
transactionID, // transaction ID
|
||||||
|
|
@ -64,21 +64,21 @@ func (d *Database) GetPendingPDUs(
|
||||||
// the database.
|
// the database.
|
||||||
events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
|
events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
|
||||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
|
nids, err := d.FederationQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueuePDUs: %w", err)
|
return fmt.Errorf("SelectQueuePDUs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
retrieve := make([]int64, 0, len(nids))
|
retrieve := make([]int64, 0, len(nids))
|
||||||
for _, nid := range nids {
|
for _, nid := range nids {
|
||||||
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
|
if event, ok := d.Cache.GetFederationQueuedPDU(nid); ok {
|
||||||
events[&Receipt{nid}] = event
|
events[&Receipt{nid}] = event
|
||||||
} else {
|
} else {
|
||||||
retrieve = append(retrieve, nid)
|
retrieve = append(retrieve, nid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
blobs, err := d.FederationQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueueJSON: %w", err)
|
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -89,7 +89,7 @@ func (d *Database) GetPendingPDUs(
|
||||||
return fmt.Errorf("json.Unmarshal: %w", err)
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
}
|
}
|
||||||
events[&Receipt{nid}] = &event
|
events[&Receipt{nid}] = &event
|
||||||
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
|
d.Cache.StoreFederationQueuedPDU(nid, &event)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -115,24 +115,24 @@ func (d *Database) CleanPDUs(
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil {
|
if err := d.FederationQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var deleteNIDs []int64
|
var deleteNIDs []int64
|
||||||
for _, nid := range nids {
|
for _, nid := range nids {
|
||||||
count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
|
count, err := d.FederationQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err)
|
return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err)
|
||||||
}
|
}
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
deleteNIDs = append(deleteNIDs, nid)
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
d.Cache.EvictFederationSenderQueuedPDU(nid)
|
d.Cache.EvictFederationQueuedPDU(nid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(deleteNIDs) > 0 {
|
if len(deleteNIDs) > 0 {
|
||||||
if err := d.FederationSenderQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
if err := d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||||
return fmt.Errorf("DeleteQueueJSON: %w", err)
|
return fmt.Errorf("DeleteQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -147,7 +147,7 @@ func (d *Database) GetPendingPDUCount(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
) (int64, error) {
|
) (int64, error) {
|
||||||
return d.FederationSenderQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName)
|
return d.FederationQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPendingServerNames returns the server names that have PDUs
|
// GetPendingServerNames returns the server names that have PDUs
|
||||||
|
|
@ -155,5 +155,5 @@ func (d *Database) GetPendingPDUCount(
|
||||||
func (d *Database) GetPendingPDUServerNames(
|
func (d *Database) GetPendingPDUServerNames(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
) ([]gomatrixserverlib.ServerName, error) {
|
) ([]gomatrixserverlib.ServerName, error) {
|
||||||
return d.FederationSenderQueuePDUs.SelectQueuePDUServerNames(ctx, nil)
|
return d.FederationQueuePDUs.SelectQueuePDUServerNames(ctx, nil)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ type Database struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (*Database, error) {
|
||||||
var d Database
|
var d Database
|
||||||
var err error
|
var err error
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
|
|
@ -87,19 +87,19 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
Cache: cache,
|
Cache: cache,
|
||||||
Writer: d.writer,
|
Writer: d.writer,
|
||||||
FederationSenderJoinedHosts: joinedHosts,
|
FederationJoinedHosts: joinedHosts,
|
||||||
FederationSenderQueuePDUs: queuePDUs,
|
FederationQueuePDUs: queuePDUs,
|
||||||
FederationSenderQueueEDUs: queueEDUs,
|
FederationQueueEDUs: queueEDUs,
|
||||||
FederationSenderQueueJSON: queueJSON,
|
FederationQueueJSON: queueJSON,
|
||||||
FederationSenderBlacklist: blacklist,
|
FederationBlacklist: blacklist,
|
||||||
FederationSenderOutboundPeeks: outboundPeeks,
|
FederationOutboundPeeks: outboundPeeks,
|
||||||
FederationSenderInboundPeeks: inboundPeeks,
|
FederationInboundPeeks: inboundPeeks,
|
||||||
NotaryServerKeysJSON: notaryKeys,
|
NotaryServerKeysJSON: notaryKeys,
|
||||||
NotaryServerKeysMetadata: notaryKeysMetadata,
|
NotaryServerKeysMetadata: notaryKeysMetadata,
|
||||||
ServerSigningKeys: serverSigningKeys,
|
ServerSigningKeys: serverSigningKeys,
|
||||||
}
|
}
|
||||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (Database, error) {
|
||||||
switch {
|
switch {
|
||||||
case dbProperties.ConnectionString.IsSQLite():
|
case dbProperties.ConnectionString.IsSQLite():
|
||||||
return sqlite3.NewDatabase(dbProperties, cache)
|
return sqlite3.NewDatabase(dbProperties, cache)
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
|
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (Database, error) {
|
||||||
switch {
|
switch {
|
||||||
case dbProperties.ConnectionString.IsSQLite():
|
case dbProperties.ConnectionString.IsSQLite():
|
||||||
return sqlite3.NewDatabase(dbProperties, cache)
|
return sqlite3.NewDatabase(dbProperties, cache)
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import (
|
||||||
|
|
||||||
type NotaryID int64
|
type NotaryID int64
|
||||||
|
|
||||||
type FederationSenderQueuePDUs interface {
|
type FederationQueuePDUs interface {
|
||||||
InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error
|
InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error
|
||||||
DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
|
DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
|
||||||
SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error)
|
SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error)
|
||||||
|
|
@ -33,7 +33,7 @@ type FederationSenderQueuePDUs interface {
|
||||||
SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
|
SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationSenderQueueEDUs interface {
|
type FederationQueueEDUs interface {
|
||||||
InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64) error
|
InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64) error
|
||||||
DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
|
DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
|
||||||
SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error)
|
SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error)
|
||||||
|
|
@ -42,13 +42,13 @@ type FederationSenderQueueEDUs interface {
|
||||||
SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
|
SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationSenderQueueJSON interface {
|
type FederationQueueJSON interface {
|
||||||
InsertQueueJSON(ctx context.Context, txn *sql.Tx, json string) (int64, error)
|
InsertQueueJSON(ctx context.Context, txn *sql.Tx, json string) (int64, error)
|
||||||
DeleteQueueJSON(ctx context.Context, txn *sql.Tx, nids []int64) error
|
DeleteQueueJSON(ctx context.Context, txn *sql.Tx, nids []int64) error
|
||||||
SelectQueueJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []int64) (map[int64][]byte, error)
|
SelectQueueJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []int64) (map[int64][]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationSenderJoinedHosts interface {
|
type FederationJoinedHosts interface {
|
||||||
InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error
|
InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error
|
||||||
DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error
|
DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error
|
||||||
DeleteJoinedHostsForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
|
DeleteJoinedHostsForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
|
|
@ -58,14 +58,14 @@ type FederationSenderJoinedHosts interface {
|
||||||
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
|
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationSenderBlacklist interface {
|
type FederationBlacklist interface {
|
||||||
InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
|
InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
|
||||||
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
|
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
|
||||||
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
|
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
|
||||||
DeleteAllBlacklist(ctx context.Context, txn *sql.Tx) error
|
DeleteAllBlacklist(ctx context.Context, txn *sql.Tx) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationSenderOutboundPeeks interface {
|
type FederationOutboundPeeks interface {
|
||||||
InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
||||||
RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
||||||
SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
|
SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
|
||||||
|
|
@ -74,7 +74,7 @@ type FederationSenderOutboundPeeks interface {
|
||||||
DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
|
DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationSenderInboundPeeks interface {
|
type FederationInboundPeeks interface {
|
||||||
InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
||||||
RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
|
||||||
SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
|
SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
|
||||||
|
|
@ -83,9 +83,9 @@ type FederationSenderInboundPeeks interface {
|
||||||
DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
|
DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FederationSenderNotaryServerKeysJSON contains the byte-for-byte responses from servers which contain their keys and is signed by them.
|
// FederationNotaryServerKeysJSON contains the byte-for-byte responses from servers which contain their keys and is signed by them.
|
||||||
type FederationSenderNotaryServerKeysJSON interface {
|
type FederationNotaryServerKeysJSON interface {
|
||||||
// InsertJSONResponse inserts a new response JSON. Useless on its own, needs querying via FederationSenderNotaryServerKeysMetadata
|
// InsertJSONResponse inserts a new response JSON. Useless on its own, needs querying via FederationNotaryServerKeysMetadata
|
||||||
// `validUntil` should be the value of `valid_until_ts` with the 7-day check applied from:
|
// `validUntil` should be the value of `valid_until_ts` with the 7-day check applied from:
|
||||||
// "Servers MUST use the lesser of this field and 7 days into the future when determining if a key is valid.
|
// "Servers MUST use the lesser of this field and 7 days into the future when determining if a key is valid.
|
||||||
// This is to avoid a situation where an attacker publishes a key which is valid for a significant amount of time
|
// This is to avoid a situation where an attacker publishes a key which is valid for a significant amount of time
|
||||||
|
|
@ -93,19 +93,19 @@ type FederationSenderNotaryServerKeysJSON interface {
|
||||||
InsertJSONResponse(ctx context.Context, txn *sql.Tx, keyQueryResponseJSON gomatrixserverlib.ServerKeys, serverName gomatrixserverlib.ServerName, validUntil gomatrixserverlib.Timestamp) (NotaryID, error)
|
InsertJSONResponse(ctx context.Context, txn *sql.Tx, keyQueryResponseJSON gomatrixserverlib.ServerKeys, serverName gomatrixserverlib.ServerName, validUntil gomatrixserverlib.Timestamp) (NotaryID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FederationSenderNotaryServerKeysMetadata persists the metadata for FederationSenderNotaryServerKeysJSON
|
// FederationNotaryServerKeysMetadata persists the metadata for FederationNotaryServerKeysJSON
|
||||||
type FederationSenderNotaryServerKeysMetadata interface {
|
type FederationNotaryServerKeysMetadata interface {
|
||||||
// UpsertKey updates or inserts a (server_name, key_id) tuple, pointing it via NotaryID at the the response which has the longest valid_until_ts
|
// UpsertKey updates or inserts a (server_name, key_id) tuple, pointing it via NotaryID at the the response which has the longest valid_until_ts
|
||||||
// `newNotaryID` and `newValidUntil` should be the notary ID / valid_until which has this (server_name, key_id) tuple already, e.g one you just inserted.
|
// `newNotaryID` and `newValidUntil` should be the notary ID / valid_until which has this (server_name, key_id) tuple already, e.g one you just inserted.
|
||||||
UpsertKey(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyID gomatrixserverlib.KeyID, newNotaryID NotaryID, newValidUntil gomatrixserverlib.Timestamp) (NotaryID, error)
|
UpsertKey(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyID gomatrixserverlib.KeyID, newNotaryID NotaryID, newValidUntil gomatrixserverlib.Timestamp) (NotaryID, error)
|
||||||
// SelectKeys returns the signed JSON objects which contain the given key IDs. This will be at most the length of `keyIDs` and at least 1 (assuming
|
// SelectKeys returns the signed JSON objects which contain the given key IDs. This will be at most the length of `keyIDs` and at least 1 (assuming
|
||||||
// the keys exist in the first place). If `keyIDs` is empty, the signed JSON object with the longest valid_until_ts will be returned.
|
// the keys exist in the first place). If `keyIDs` is empty, the signed JSON object with the longest valid_until_ts will be returned.
|
||||||
SelectKeys(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error)
|
SelectKeys(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error)
|
||||||
// DeleteOldJSONResponses removes all responses which are not referenced in FederationSenderNotaryServerKeysMetadata
|
// DeleteOldJSONResponses removes all responses which are not referenced in FederationNotaryServerKeysMetadata
|
||||||
DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error
|
DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationSenderServerSigningKeys interface {
|
type FederationServerSigningKeys interface {
|
||||||
BulkSelectServerKeys(ctx context.Context, txn *sql.Tx, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error)
|
BulkSelectServerKeys(ctx context.Context, txn *sql.Tx, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error)
|
||||||
UpsertServerKeys(ctx context.Context, txn *sql.Tx, request gomatrixserverlib.PublicKeyLookupRequest, key gomatrixserverlib.PublicKeyLookupResult) error
|
UpsertServerKeys(ctx context.Context, txn *sql.Tx, request gomatrixserverlib.PublicKeyLookupRequest, key gomatrixserverlib.PublicKeyLookupResult) error
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,19 +12,19 @@ const (
|
||||||
FederationEventCacheMutable = true // to allow use of Unset only
|
FederationEventCacheMutable = true // to allow use of Unset only
|
||||||
)
|
)
|
||||||
|
|
||||||
// FederationSenderCache contains the subset of functions needed for
|
// FederationCache contains the subset of functions needed for
|
||||||
// a federation event cache.
|
// a federation event cache.
|
||||||
type FederationSenderCache interface {
|
type FederationCache interface {
|
||||||
GetFederationSenderQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool)
|
GetFederationQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool)
|
||||||
StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent)
|
StoreFederationQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent)
|
||||||
EvictFederationSenderQueuedPDU(eventNID int64)
|
EvictFederationQueuedPDU(eventNID int64)
|
||||||
|
|
||||||
GetFederationSenderQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool)
|
GetFederationQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool)
|
||||||
StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU)
|
StoreFederationQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU)
|
||||||
EvictFederationSenderQueuedEDU(eventNID int64)
|
EvictFederationQueuedEDU(eventNID int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) {
|
func (c Caches) GetFederationQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) {
|
||||||
key := fmt.Sprintf("%d", eventNID)
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
val, found := c.FederationEvents.Get(key)
|
val, found := c.FederationEvents.Get(key)
|
||||||
if found && val != nil {
|
if found && val != nil {
|
||||||
|
|
@ -35,17 +35,17 @@ func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Caches) StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) {
|
func (c Caches) StoreFederationQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) {
|
||||||
key := fmt.Sprintf("%d", eventNID)
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
c.FederationEvents.Set(key, event)
|
c.FederationEvents.Set(key, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Caches) EvictFederationSenderQueuedPDU(eventNID int64) {
|
func (c Caches) EvictFederationQueuedPDU(eventNID int64) {
|
||||||
key := fmt.Sprintf("%d", eventNID)
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
c.FederationEvents.Unset(key)
|
c.FederationEvents.Unset(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) {
|
func (c Caches) GetFederationQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) {
|
||||||
key := fmt.Sprintf("%d", eventNID)
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
val, found := c.FederationEvents.Get(key)
|
val, found := c.FederationEvents.Get(key)
|
||||||
if found && val != nil {
|
if found && val != nil {
|
||||||
|
|
@ -56,12 +56,12 @@ func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Caches) StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) {
|
func (c Caches) StoreFederationQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) {
|
||||||
key := fmt.Sprintf("%d", eventNID)
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
c.FederationEvents.Set(key, event)
|
c.FederationEvents.Set(key, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Caches) EvictFederationSenderQueuedEDU(eventNID int64) {
|
func (c Caches) EvictFederationQueuedEDU(eventNID int64) {
|
||||||
key := fmt.Sprintf("%d", eventNID)
|
key := fmt.Sprintf("%d", eventNID)
|
||||||
c.FederationEvents.Unset(key)
|
c.FederationEvents.Unset(key)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ import (
|
||||||
type RoomserverInternalAPI interface {
|
type RoomserverInternalAPI interface {
|
||||||
// needed to avoid chicken and egg scenario when setting up the
|
// needed to avoid chicken and egg scenario when setting up the
|
||||||
// interdependencies between the roomserver and other input APIs
|
// interdependencies between the roomserver and other input APIs
|
||||||
SetFederationSenderAPI(fsAPI fsAPI.FederationInternalAPI)
|
SetFederationAPI(fsAPI fsAPI.FederationInternalAPI)
|
||||||
SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI)
|
SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI)
|
||||||
SetKeyring(keyRing *gomatrixserverlib.KeyRing)
|
SetKeyring(keyRing *gomatrixserverlib.KeyRing)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,8 @@ func (t *RoomserverInternalAPITrace) SetKeyring(keyRing *gomatrixserverlib.KeyRi
|
||||||
t.Impl.SetKeyring(keyRing)
|
t.Impl.SetKeyring(keyRing)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *RoomserverInternalAPITrace) SetFederationSenderAPI(fsAPI fsAPI.FederationInternalAPI) {
|
func (t *RoomserverInternalAPITrace) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) {
|
||||||
t.Impl.SetFederationSenderAPI(fsAPI)
|
t.Impl.SetFederationAPI(fsAPI)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *RoomserverInternalAPITrace) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {
|
func (t *RoomserverInternalAPITrace) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {
|
||||||
|
|
|
||||||
|
|
@ -79,10 +79,10 @@ func (r *RoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
|
||||||
r.KeyRing = keyRing
|
r.KeyRing = keyRing
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetFederationSenderInputAPI passes in a federation sender input API reference
|
// SetFederationInputAPI passes in a federation input API reference so that we can
|
||||||
// so that we can avoid the chicken-and-egg problem of both the roomserver input API
|
// avoid the chicken-and-egg problem of both the roomserver input API and the
|
||||||
// and the federation sender input API being interdependent.
|
// federation input API being interdependent.
|
||||||
func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationInternalAPI) {
|
func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) {
|
||||||
r.fsAPI = fsAPI
|
r.fsAPI = fsAPI
|
||||||
r.SetKeyring(fsAPI.KeyRing())
|
r.SetKeyring(fsAPI.KeyRing())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -87,8 +87,8 @@ func NewRoomserverClient(
|
||||||
func (h *httpRoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
|
func (h *httpRoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetFederationSenderInputAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
// SetFederationInputAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
||||||
func (h *httpRoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsInputAPI.FederationInternalAPI) {
|
func (h *httpRoomserverInternalAPI) SetFederationAPI(fsAPI fsInputAPI.FederationInternalAPI) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAppserviceAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
// SetAppserviceAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue