End of day

This commit is contained in:
Neil Alexander 2021-01-06 17:29:07 +00:00
parent af6b07e1b4
commit 3843d076bc
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 106 additions and 11 deletions

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// SyncServerDatasource represents a sync server datasource which manages
@ -38,7 +39,7 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database
// nolint:gocyclo
func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
@ -108,6 +109,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Receipts: receipts,
EDUCache: cache.New(),
}
d.Database.ConfigureProviders()
d.Database.ConfigureProviders(userAPI)
return &d, nil
}

View file

@ -4,10 +4,13 @@ import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type AccountDataStreamProvider struct {
StreamProvider
userAPI userapi.UserInternalAPI
}
func (p *AccountDataStreamProvider) Setup() {
@ -18,7 +21,36 @@ func (p *AccountDataStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
dataReq := &userapi.QueryAccountDataRequest{
UserID: req.Device.UserID,
}
dataRes := &userapi.QueryAccountDataResponse{}
if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil {
return p.LatestPosition(ctx) // nil, err
}
for datatype, databody := range dataRes.GlobalAccountData {
req.Response.AccountData.Events = append(
req.Response.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
}
for r, j := range req.Response.Rooms.Join {
for datatype, databody := range dataRes.RoomAccountData[r] {
j.AccountData.Events = append(
j.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
req.Response.Rooms.Join[r] = j
}
}
return p.LatestPosition(ctx)
}
func (p *AccountDataStreamProvider) IncrementalSync(
@ -26,6 +58,63 @@ func (p *AccountDataStreamProvider) IncrementalSync(
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
r := types.Range{
From: from,
To: to,
}
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
dataTypes, err := p.DB.GetAccountDataInRange(
ctx, req.Device.UserID, r, &accountDataFilter,
)
if err != nil {
return to // nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
}
if len(dataTypes) == 0 {
// TODO: this fixes the sytest but is it the right thing to do?
dataTypes[""] = []string{"m.push_rules"}
}
// Iterate over the rooms
for roomID, dataTypes := range dataTypes {
// Request the missing data from the database
for _, dataType := range dataTypes {
dataReq := userapi.QueryAccountDataRequest{
UserID: req.Device.UserID,
RoomID: roomID,
DataType: dataType,
}
dataRes := userapi.QueryAccountDataResponse{}
err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
if err != nil {
continue
}
if roomID == "" {
if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
req.Response.AccountData.Events = append(
req.Response.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(globalData),
},
)
}
} else {
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
joinData := req.Response.Rooms.Join[roomID]
joinData.AccountData.Events = append(
joinData.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(roomData),
},
)
req.Response.Rooms.Join[roomID] = joinData
}
}
}
}
return to
}

View file

@ -64,13 +64,16 @@ type Database struct {
// ConfigureProviders creates instances of the various
// stream and topology providers provided by the storage
// packages.
func (d *Database) ConfigureProviders() {
func (d *Database) ConfigureProviders(userAPI userapi.UserInternalAPI) {
d.PDUStreamProvider = &PDUStreamProvider{StreamProvider{DB: d}}
d.TypingStreamProvider = &TypingStreamProvider{StreamProvider{DB: d}}
d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}}
d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}}
d.SendToDeviceStreamProvider = &SendToDeviceStreamProvider{StreamProvider{DB: d}}
d.AccountDataStreamProvider = &AccountDataStreamProvider{StreamProvider{DB: d}}
d.AccountDataStreamProvider = &AccountDataStreamProvider{
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
}
d.DeviceListStreamProvider = &DeviceListStreamProvider{StreamLogProvider{DB: d}}
d.PDUStreamProvider.Setup()

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// SyncServerDatasource represents a sync server datasource which manages
@ -40,7 +41,7 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database
// nolint: gocyclo
func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
@ -50,6 +51,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err = d.prepare(dbProperties); err != nil {
return nil, err
}
d.ConfigureProviders(userAPI)
return &d, nil
}
@ -121,6 +123,5 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Receipts: receipts,
EDUCache: cache.New(),
}
d.Database.ConfigureProviders()
return nil
}

View file

@ -22,15 +22,16 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// NewSyncServerDatasource opens a database connection.
func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) {
func NewSyncServerDatasource(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties)
return sqlite3.NewDatabase(dbProperties, userAPI)
case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(dbProperties)
return postgres.NewDatabase(dbProperties, userAPI)
default:
return nil, fmt.Errorf("unexpected database type")
}

View file

@ -43,7 +43,7 @@ func AddPublicRoutes(
) {
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database, userAPI)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}