Wire up publicroomsapi for roomserver events (#851)

* Wire up publicroomsapi to roomserver events

* Remove parameter that was incorrectly brought over from p2p work

* nolint containsBackwardExtremity for now
This commit is contained in:
Neil Alexander 2020-01-24 17:11:20 +00:00 committed by GitHub
parent dece31f41e
commit 37d117f2b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 7 deletions

View file

@ -69,7 +69,7 @@ func main() {
) )
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
mediaapi.SetupMediaAPIComponent(base, deviceDB) mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
httpHandler := common.WrapHandlerInCORS(base.APIMux) httpHandler := common.WrapHandlerInCORS(base.APIMux)

View file

@ -26,7 +26,9 @@ func main() {
deviceDB := base.CreateDeviceDB() deviceDB := base.CreateDeviceDB()
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) _, _, query := base.CreateHTTPRoomserverAPIs()
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query)
base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI))

View file

@ -17,8 +17,10 @@ package publicroomsapi
import ( import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/publicroomsapi/consumers"
"github.com/matrix-org/dendrite/publicroomsapi/routing" "github.com/matrix-org/dendrite/publicroomsapi/routing"
"github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/storage"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -27,11 +29,19 @@ import (
func SetupPublicRoomsAPIComponent( func SetupPublicRoomsAPIComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite,
deviceDB *devices.Database, deviceDB *devices.Database,
rsQueryAPI roomserverAPI.RoomserverQueryAPI,
) { ) {
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
} }
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, publicRoomsDB, rsQueryAPI,
)
if err = rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start public rooms server consumer")
}
routing.Setup(base.APIMux, deviceDB, publicRoomsDB) routing.Setup(base.APIMux, deviceDB, publicRoomsDB)
} }

View file

@ -42,15 +42,16 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerData
if db, err = sql.Open("postgres", dataSourceName); err != nil { if db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err return nil, err
} }
partitions := common.PartitionOffsetStatements{} storage := PublicRoomsServerDatabase{
if err = partitions.Prepare(db, "publicroomsapi"); err != nil { db: db,
}
if err = storage.PartitionOffsetStatements.Prepare(db, "publicroomsapi"); err != nil {
return nil, err return nil, err
} }
statements := publicRoomsStatements{} if err = storage.statements.prepare(db); err != nil {
if err = statements.prepare(db); err != nil {
return nil, err return nil, err
} }
return &PublicRoomsServerDatabase{db, partitions, statements}, nil return &storage, nil
} }
// GetRoomVisibility returns the room visibility as a boolean: true if the room // GetRoomVisibility returns the room visibility as a boolean: true if the room

View file

@ -344,6 +344,11 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// considers the event itself a backward extremity if at least one of the parent // considers the event itself a backward extremity if at least one of the parent
// events doesn't exist in the database. // events doesn't exist in the database.
// Returns an error if there was an issue with talking to the database. // Returns an error if there was an issue with talking to the database.
//
// This function is unused but currently set to nolint for now until we are
// absolutely sure that the changes in matrix-org/dendrite#847 are behaving
// properly.
// nolint:unused
func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) { func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) {
// Select the earliest retrieved event. // Select the earliest retrieved event.
var ev *types.StreamEvent var ev *types.StreamEvent