This commit is contained in:
Neil Alexander 2020-06-18 13:17:29 +01:00
parent e0f4dd094a
commit 226e9f90ec
4 changed files with 25 additions and 12 deletions

View file

@ -37,9 +37,10 @@ func (m *DendriteMonolith) Start() {
logger := logrus.Logger{ logger := logrus.Logger{
Out: BindLogger{}, Out: BindLogger{},
} }
logrus.SetOutput(BindLogger{})
var err error var err error
m.listener, err = net.Listen("tcp", "localhost:0") m.listener, err = net.Listen("tcp", "localhost:65432")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -78,17 +79,15 @@ func (m *DendriteMonolith) Start() {
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB() deviceDB := base.CreateDeviceDB()
federation := ygg.CreateFederationClient(base) federation := base.CreateFederationClient()
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices)
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) rsAPI := roomserver.NewInternalAPI(
rsComponent := roomserver.NewInternalAPI(
base, keyRing, federation, base, keyRing, federation,
) )
rsAPI := rsComponent
eduInputAPI := eduserver.NewInternalAPI( eduInputAPI := eduserver.NewInternalAPI(
base, cache.New(), userAPI, base, cache.New(), userAPI,
@ -100,17 +99,20 @@ func (m *DendriteMonolith) Start() {
base, federation, rsAPI, keyRing, base, federation, rsAPI, keyRing,
) )
rsComponent.SetFederationSenderAPI(fsAPI) // The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsAPI.SetFederationSenderAPI(fsAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil { if err != nil {
logger.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
} }
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB, DeviceDB: deviceDB,
Client: gomatrixserverlib.NewClient(),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer, KafkaConsumer: base.KafkaConsumer,
@ -121,6 +123,7 @@ func (m *DendriteMonolith) Start() {
FederationSenderAPI: fsAPI, FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
UserAPI: userAPI, UserAPI: userAPI,
//ServerKeyAPI: serverKeyAPI,
PublicRoomsDB: publicRoomsDB, PublicRoomsDB: publicRoomsDB,
} }

View file

@ -139,6 +139,9 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) {
panic(err) panic(err)
} }
n.log.Println("Public curve25519:", n.core.EncryptionPublicKey())
n.log.Println("Public ed25519:", n.core.SigningPublicKey())
go n.listenFromYgg() go n.listenFromYgg()
return n, nil return n, nil

View file

@ -62,6 +62,10 @@ func newSyncRequest(req *http.Request, device userapi.Device) (*syncRequest, err
} }
since = &tok since = &tok
} }
if since == nil {
tok := types.NewStreamToken(0, 0)
since = &tok
}
timelineLimit := defaultTimelineLimit timelineLimit := defaultTimelineLimit
// TODO: read from stored filters too // TODO: read from stored filters too
filterQuery := req.URL.Query().Get("filter") filterQuery := req.URL.Query().Get("filter")

View file

@ -98,6 +98,9 @@ func (t *StreamingToken) PDUPosition() StreamPosition {
func (t *StreamingToken) EDUPosition() StreamPosition { func (t *StreamingToken) EDUPosition() StreamPosition {
return t.Positions[1] return t.Positions[1]
} }
func (t *StreamingToken) String() string {
return t.syncToken.String()
}
// IsAfter returns true if ANY position in this token is greater than `other`. // IsAfter returns true if ANY position in this token is greater than `other`.
func (t *StreamingToken) IsAfter(other StreamingToken) bool { func (t *StreamingToken) IsAfter(other StreamingToken) bool {
@ -220,8 +223,8 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
err = fmt.Errorf("token %s is not a topology token", tok) err = fmt.Errorf("token %s is not a topology token", tok)
return return
} }
if len(t.Positions) != 2 { if len(t.Positions) < 2 {
err = fmt.Errorf("token %s wrong number of values, got %d want 2", tok, len(t.Positions)) err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions))
return return
} }
return TopologyToken{ return TopologyToken{
@ -247,8 +250,8 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
err = fmt.Errorf("token %s is not a streaming token", tok) err = fmt.Errorf("token %s is not a streaming token", tok)
return return
} }
if len(t.Positions) != 2 { if len(t.Positions) < 2 {
err = fmt.Errorf("token %s wrong number of values, got %d want 2", tok, len(t.Positions)) err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions))
return return
} }
return StreamingToken{ return StreamingToken{