diff --git a/CHANGES.md b/CHANGES.md index 6d5c0fcb6..b11c3d7ac 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ # Changelog +## Dendrite 0.3.8 (2021-01-28) + +### Fixes + +* A well-known lookup regression in version 0.3.7 has been fixed + ## Dendrite 0.3.7 (2021-01-26) ### Features diff --git a/go.mod b/go.mod index cbf3f597f..ff8291218 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd - github.com/matrix-org/gomatrixserverlib v0.0.0-20210122154608-a38974bd8a37 + github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 github.com/matrix-org/pinecone v0.0.0-00010101000000-000000000000 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 diff --git a/go.sum b/go.sum index 4e797a178..92dd0b821 100644 --- a/go.sum +++ b/go.sum @@ -579,8 +579,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20210122154608-a38974bd8a37 h1:si2CZZpwOLWZfDXfgHPkaTlaAkdJvpJzr1zVqyKXd0I= -github.com/matrix-org/gomatrixserverlib v0.0.0-20210122154608-a38974bd8a37/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead h1:VmGJybKUQin8+NyA9ZkrHJpE8ygXzcON9peQH9LC92c= +github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/internal/version.go b/internal/version.go index 639fd3df0..f5c6a423f 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 3 - VersionPatch = 7 + VersionPatch = 8 VersionTag = "" // example: "rc1" ) diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go index 085cb02ed..93a52350c 100644 --- a/roomserver/internal/perform/perform_invite.go +++ b/roomserver/internal/perform/perform_invite.go @@ -225,7 +225,7 @@ func buildInviteStrippedState( for _, t := range []string{ gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias, gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules, - "m.room.avatar", "m.room.encryption", + "m.room.avatar", "m.room.encryption", gomatrixserverlib.MRoomCreate, } { stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{ EventType: t, diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index c43d50a49..9ea9d088f 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -19,7 +19,7 @@ func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.LogPosition { - return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx)) + return p.LatestPosition(ctx) } func (p *DeviceListStreamProvider) IncrementalSync( diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index d6d7ff444..ae38dc30e 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -2,18 +2,54 @@ package streams import ( "context" + "sync" + "time" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" ) +// The max number of per-room goroutines to have running. +// Too high and this will consume lots of CPU, too low and complete +// sync responses will take longer to process. +const PDU_STREAM_WORKERS = 256 + +// The maximum number of tasks that can be queued in total before +// backpressure will build up and the rests will start to block. +const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8 + type PDUStreamProvider struct { StreamProvider + + tasks chan func() + workers atomic.Int32 +} + +func (p *PDUStreamProvider) worker() { + defer p.workers.Dec() + for { + select { + case f := <-p.tasks: + f() + case <-time.After(time.Second * 10): + return + } + } +} + +func (p *PDUStreamProvider) queue(f func()) { + if p.workers.Load() < PDU_STREAM_WORKERS { + p.workers.Inc() + go p.worker() + } + p.tasks <- f } func (p *PDUStreamProvider) Setup() { p.StreamProvider.Setup() + p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE) p.latestMutex.Lock() defer p.latestMutex.Unlock() @@ -52,19 +88,32 @@ func (p *PDUStreamProvider) CompleteSync( eventFilter := req.Filter.Room.Timeline // Build up a /sync response. Add joined rooms. - for _, roomID := range joinedRoomIDs { - var jr *types.JoinResponse - jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.Device, - ) - if err != nil { - req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return from - } - req.Response.Rooms.Join[roomID] = *jr - req.Rooms[roomID] = gomatrixserverlib.Join + var reqMutex sync.Mutex + var reqWaitGroup sync.WaitGroup + reqWaitGroup.Add(len(joinedRoomIDs)) + for _, room := range joinedRoomIDs { + roomID := room + p.queue(func() { + defer reqWaitGroup.Done() + + var jr *types.JoinResponse + jr, err = p.getJoinResponseForCompleteSync( + ctx, roomID, r, &stateFilter, &eventFilter, req.Device, + ) + if err != nil { + req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") + return + } + + reqMutex.Lock() + defer reqMutex.Unlock() + req.Response.Rooms.Join[roomID] = *jr + req.Rooms[roomID] = gomatrixserverlib.Join + }) } + reqWaitGroup.Wait() + // Add peeked rooms. peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil {