dendrite/syncapi/streams/stream_accountdata.go
Till e95fc5c5e3
Use provided filter for account_data (#2372)
* Reuse IncrementalSync, use provided filter

* Inform SyncAPI about newly created push_rules
2022-04-25 19:04:46 +02:00

100 lines
2.4 KiB
Go

package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type AccountDataStreamProvider struct {
StreamProvider
userAPI userapi.UserInternalAPI
}
func (p *AccountDataStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForAccountData(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *AccountDataStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *AccountDataStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
r := types.Range{
From: from,
To: to,
}
dataTypes, err := p.DB.GetAccountDataInRange(
ctx, req.Device.UserID, r, &req.Filter.AccountData,
)
if err != nil {
req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
return from
}
// Iterate over the rooms
for roomID, dataTypes := range dataTypes {
// Request the missing data from the database
for _, dataType := range dataTypes {
dataReq := userapi.QueryAccountDataRequest{
UserID: req.Device.UserID,
RoomID: roomID,
DataType: dataType,
}
dataRes := userapi.QueryAccountDataResponse{}
err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
if err != nil {
req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
continue
}
if roomID == "" {
if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
req.Response.AccountData.Events = append(
req.Response.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(globalData),
},
)
}
} else {
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
joinData := *types.NewJoinResponse()
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
joinData = existing
}
joinData.AccountData.Events = append(
joinData.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(roomData),
},
)
req.Response.Rooms.Join[roomID] = joinData
}
}
}
}
return to
}