mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-26 08:13:09 -06:00
Merge branch 'master' into neilalexander/pinecone
This commit is contained in:
commit
d64d859c96
|
|
@ -1,5 +1,11 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## Dendrite 0.3.8 (2021-01-28)
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
* A well-known lookup regression in version 0.3.7 has been fixed
|
||||||
|
|
||||||
## Dendrite 0.3.7 (2021-01-26)
|
## Dendrite 0.3.7 (2021-01-26)
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -25,7 +25,7 @@ require (
|
||||||
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
|
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210122154608-a38974bd8a37
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead
|
||||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
|
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
|
||||||
github.com/matrix-org/pinecone v0.0.0-00010101000000-000000000000
|
github.com/matrix-org/pinecone v0.0.0-00010101000000-000000000000
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -579,8 +579,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
|
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210122154608-a38974bd8a37 h1:si2CZZpwOLWZfDXfgHPkaTlaAkdJvpJzr1zVqyKXd0I=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead h1:VmGJybKUQin8+NyA9ZkrHJpE8ygXzcON9peQH9LC92c=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210122154608-a38974bd8a37/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
||||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
|
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
|
||||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
|
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
|
||||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
|
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ var build string
|
||||||
const (
|
const (
|
||||||
VersionMajor = 0
|
VersionMajor = 0
|
||||||
VersionMinor = 3
|
VersionMinor = 3
|
||||||
VersionPatch = 7
|
VersionPatch = 8
|
||||||
VersionTag = "" // example: "rc1"
|
VersionTag = "" // example: "rc1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -225,7 +225,7 @@ func buildInviteStrippedState(
|
||||||
for _, t := range []string{
|
for _, t := range []string{
|
||||||
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
|
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
|
||||||
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
|
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
|
||||||
"m.room.avatar", "m.room.encryption",
|
"m.room.avatar", "m.room.encryption", gomatrixserverlib.MRoomCreate,
|
||||||
} {
|
} {
|
||||||
stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{
|
stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{
|
||||||
EventType: t,
|
EventType: t,
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ func (p *DeviceListStreamProvider) CompleteSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *types.SyncRequest,
|
req *types.SyncRequest,
|
||||||
) types.LogPosition {
|
) types.LogPosition {
|
||||||
return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx))
|
return p.LatestPosition(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DeviceListStreamProvider) IncrementalSync(
|
func (p *DeviceListStreamProvider) IncrementalSync(
|
||||||
|
|
|
||||||
|
|
@ -2,18 +2,54 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The max number of per-room goroutines to have running.
|
||||||
|
// Too high and this will consume lots of CPU, too low and complete
|
||||||
|
// sync responses will take longer to process.
|
||||||
|
const PDU_STREAM_WORKERS = 256
|
||||||
|
|
||||||
|
// The maximum number of tasks that can be queued in total before
|
||||||
|
// backpressure will build up and the rests will start to block.
|
||||||
|
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
|
||||||
|
|
||||||
type PDUStreamProvider struct {
|
type PDUStreamProvider struct {
|
||||||
StreamProvider
|
StreamProvider
|
||||||
|
|
||||||
|
tasks chan func()
|
||||||
|
workers atomic.Int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PDUStreamProvider) worker() {
|
||||||
|
defer p.workers.Dec()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case f := <-p.tasks:
|
||||||
|
f()
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PDUStreamProvider) queue(f func()) {
|
||||||
|
if p.workers.Load() < PDU_STREAM_WORKERS {
|
||||||
|
p.workers.Inc()
|
||||||
|
go p.worker()
|
||||||
|
}
|
||||||
|
p.tasks <- f
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) Setup() {
|
func (p *PDUStreamProvider) Setup() {
|
||||||
p.StreamProvider.Setup()
|
p.StreamProvider.Setup()
|
||||||
|
p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
|
||||||
|
|
||||||
p.latestMutex.Lock()
|
p.latestMutex.Lock()
|
||||||
defer p.latestMutex.Unlock()
|
defer p.latestMutex.Unlock()
|
||||||
|
|
@ -52,19 +88,32 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
eventFilter := req.Filter.Room.Timeline
|
eventFilter := req.Filter.Room.Timeline
|
||||||
|
|
||||||
// Build up a /sync response. Add joined rooms.
|
// Build up a /sync response. Add joined rooms.
|
||||||
for _, roomID := range joinedRoomIDs {
|
var reqMutex sync.Mutex
|
||||||
var jr *types.JoinResponse
|
var reqWaitGroup sync.WaitGroup
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
reqWaitGroup.Add(len(joinedRoomIDs))
|
||||||
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
|
for _, room := range joinedRoomIDs {
|
||||||
)
|
roomID := room
|
||||||
if err != nil {
|
p.queue(func() {
|
||||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
defer reqWaitGroup.Done()
|
||||||
return from
|
|
||||||
}
|
var jr *types.JoinResponse
|
||||||
req.Response.Rooms.Join[roomID] = *jr
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reqMutex.Lock()
|
||||||
|
defer reqMutex.Unlock()
|
||||||
|
req.Response.Rooms.Join[roomID] = *jr
|
||||||
|
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reqWaitGroup.Wait()
|
||||||
|
|
||||||
// Add peeked rooms.
|
// Add peeked rooms.
|
||||||
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue