Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/usagestats

This commit is contained in:
Till Faelligen 2022-10-01 08:32:25 +02:00
commit 37a2ef3795
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
23 changed files with 110 additions and 30 deletions

View file

@ -1,5 +1,16 @@
# Changelog
## Dendrite 0.10.1 (2022-09-30)
### Features
* The built-in NATS Server has been updated to version 2.9.2
### Fixes
* A regression introduced in 0.10.0 in `/sync` as a result of transaction errors has been fixed
* Account data updates will no longer send duplicate output events
## Dendrite 0.10.0 (2022-09-30)
### Features

View file

@ -87,6 +87,12 @@ and contain the following JSON document:
For example, this can be done with the following Caddy config:
```
handle /.well-known/matrix/server {
header Content-Type application/json
header Access-Control-Allow-Origin *
respond `"m.server": "matrix.example.com:8448"`
}
handle /.well-known/matrix/client {
header Content-Type application/json
header Access-Control-Allow-Origin *

10
go.mod
View file

@ -26,7 +26,7 @@ require (
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.15
github.com/nats-io/nats-server/v2 v2.9.1-0.20220920152220-52d7b481c4b5
github.com/nats-io/nats-server/v2 v2.9.2
github.com/nats-io/nats.go v1.17.0
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
@ -43,7 +43,7 @@ require (
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
go.uber.org/atomic v1.10.0
golang.org/x/crypto v0.0.0-20220919173607-35f4265a4bc0
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be
golang.org/x/image v0.0.0-20220902085622-e7cb96979f69
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
golang.org/x/net v0.0.0-20220919232410-f2f64ebce3c1
@ -91,7 +91,7 @@ require (
github.com/h2non/filetype v1.1.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v1.0.0 // indirect
github.com/klauspost/compress v1.15.10 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/lucas-clemente/quic-go v0.29.0 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
@ -120,9 +120,9 @@ require (
go.etcd.io/bbolt v1.3.6 // indirect
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
golang.org/x/time v0.0.0-20220920022843-2ce7c2934d45 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/macaroon.v2 v2.1.0 // indirect

20
go.sum
View file

@ -347,8 +347,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.10 h1:Ai8UzuomSCDw90e1qNMtb15msBXsNpH6gzkkENQNcJo=
github.com/klauspost/compress v1.15.10/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@ -422,8 +422,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.1-0.20220920152220-52d7b481c4b5 h1:G/YGSXcJ2bUofD8Ts49it4VNezaJLQldI6fZR+wIUts=
github.com/nats-io/nats-server/v2 v2.9.1-0.20220920152220-52d7b481c4b5/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
github.com/nats-io/nats-server/v2 v2.9.2 h1:XNDgJgOYYaYlquLdbSHI3xssLipfKUOq3EmYIMNCOsE=
github.com/nats-io/nats-server/v2 v2.9.2/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk=
github.com/nats-io/nats.go v1.17.0 h1:1jp5BThsdGlN91hW0k3YEfJbfACjiOYtUiLXG0RL4IE=
github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
@ -625,8 +625,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220919173607-35f4265a4bc0 h1:a5Yg6ylndHHYJqIPrdq0AhvR6KTvDTAvgBtaidhEevY=
golang.org/x/crypto v0.0.0-20220919173607-35f4265a4bc0/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A=
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -810,8 +810,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220730100132-1609e554cd39/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI=
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20220919170432-7a66f970e087 h1:tPwmk4vmvVCMdr98VgL4JH+qZxPL8fqlUOHnyOM8N3w=
@ -829,8 +829,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220920022843-2ce7c2934d45 h1:yuLAip3bfURHClMG9VBdzPrQvCWjWiWUTBGV+/fCbUs=
golang.org/x/time v0.0.0-20220920022843-2ce7c2934d45/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 10
VersionPatch = 0
VersionPatch = 1
VersionTag = "" // example: "rc1"
)

View file

@ -27,6 +27,7 @@ import (
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -454,7 +455,8 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
if err != nil {
return nil, err
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
prevEvent, err := snapshot.GetStateEvent(
s.ctx, event.RoomID(), event.Type(), stateKey,
@ -474,6 +476,7 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
}
event.Event, err = event.SetUnsigned(prev)
succeeded = true
return event, err
}

View file

@ -19,6 +19,7 @@ import (
"sync"
"time"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@ -323,7 +324,8 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
if err != nil {
return err
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
roomToUsers, err := snapshot.AllJoinedUsersInRooms(ctx)
if err != nil {
@ -337,6 +339,7 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
}
n.setPeekingDevices(roomToPeekingDevices)
succeeded = true
return nil
}
@ -349,7 +352,8 @@ func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs [
if err != nil {
return err
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
roomToUsers, err := snapshot.AllJoinedUsersInRoom(ctx, roomIDs)
if err != nil {
@ -357,6 +361,7 @@ func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs [
}
n.setUsersJoinedToRooms(roomToUsers)
succeeded = true
return nil
}

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
roomserver "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
@ -55,7 +56,8 @@ func Context(
if err != nil {
return jsonerror.InternalServerError()
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
filter, err := parseRoomEventFilter(req)
if err != nil {
@ -184,6 +186,7 @@ func Context(
response.End = end.String()
response.Start = start.String()
}
succeeded = true
return util.JSONResponse{
Code: http.StatusOK,
JSON: response,

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/userapi/api"
)
@ -65,7 +66,8 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
if err != nil {
return jsonerror.InternalServerError()
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
// only search rooms the user is actually joined to
joinedRooms, err := snapshot.RoomIDsWithMembership(ctx, device.UserID, "join")
@ -249,6 +251,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
logrus.Debugf("Full search request took %v", time.Since(start))
succeeded = true
return util.JSONResponse{
Code: http.StatusOK,
JSON: res,

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/types"
@ -27,6 +28,8 @@ import (
)
type DatabaseTransaction interface {
sqlutil.Transaction
Reset() (err error)
SharedUsers
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)

View file

@ -77,6 +77,7 @@ func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseTransactio
}
return &DatabaseTransaction{
Database: d,
ctx: ctx,
txn: txn,
}, nil
*/
@ -89,6 +90,7 @@ func (d *Database) NewDatabaseTransaction(ctx context.Context) (*DatabaseTransac
}
return &DatabaseTransaction{
Database: d,
ctx: ctx,
txn: txn,
}, nil
}

View file

@ -13,6 +13,7 @@ import (
type DatabaseTransaction struct {
*Database
ctx context.Context
txn *sql.Tx
}
@ -30,6 +31,19 @@ func (d *DatabaseTransaction) Rollback() error {
return d.txn.Rollback()
}
func (d *DatabaseTransaction) Reset() (err error) {
if d.txn == nil {
return nil
}
if err = d.txn.Rollback(); err != nil {
return err
}
if d.txn, err = d.DB.BeginTx(d.ctx, nil); err != nil {
return err
}
return
}
func (d *DatabaseTransaction) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) {
id, err := d.OutputEvents.SelectMaxEventID(ctx, d.txn)
if err != nil {

View file

@ -54,6 +54,7 @@ func (p *AccountDataStreamProvider) IncrementalSync(
)
if err != nil {
req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
_ = snapshot.Reset()
return from
}

View file

@ -34,11 +34,13 @@ func (p *DeviceListStreamProvider) IncrementalSync(
to, _, err = internal.DeviceListCatchup(context.Background(), snapshot, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
_ = snapshot.Reset()
return from
}
err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response)
if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
_ = snapshot.Reset()
return from
}

View file

@ -56,6 +56,7 @@ func (p *InviteStreamProvider) IncrementalSync(
)
if err != nil {
req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed")
_ = snapshot.Reset()
return from
}

View file

@ -46,6 +46,7 @@ func (p *NotificationDataStreamProvider) IncrementalSync(
countsByRoom, err := snapshot.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms)
if err != nil {
req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed")
_ = snapshot.Reset()
return from
}

View file

@ -75,6 +75,7 @@ func (p *PDUStreamProvider) CompleteSync(
joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
_ = snapshot.Reset()
return from
}
@ -101,6 +102,9 @@ func (p *PDUStreamProvider) CompleteSync(
)
if jerr != nil {
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
if err = snapshot.Reset(); err != nil {
return from
}
continue // return from
}
req.Response.Rooms.Join[roomID] = *jr
@ -111,6 +115,7 @@ func (p *PDUStreamProvider) CompleteSync(
peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil {
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
_ = snapshot.Reset()
return from
}
for _, peek := range peeks {
@ -121,6 +126,9 @@ func (p *PDUStreamProvider) CompleteSync(
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
if err = snapshot.Reset(); err != nil {
return from
}
continue // return from
}
req.Response.Rooms.Peek[peek.RoomID] = *jr
@ -152,11 +160,13 @@ func (p *PDUStreamProvider) IncrementalSync(
if req.WantFullState {
if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
_ = snapshot.Reset()
return
}
} else {
if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
_ = snapshot.Reset()
return
}
}
@ -171,6 +181,7 @@ func (p *PDUStreamProvider) IncrementalSync(
if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil {
req.Log.WithError(err).Error("unable to update event filter with ignored users")
_ = snapshot.Reset()
}
newPos = from
@ -190,7 +201,13 @@ func (p *PDUStreamProvider) IncrementalSync(
var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
if err == context.DeadlineExceeded || err == context.Canceled {
return newPos
}
if err = snapshot.Reset(); err != nil {
return from
}
continue // return to
}
// Reset the position, as it is only for the special case of newly joined rooms
if delta.NewlyJoined {
@ -290,6 +307,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
events, err := applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
_ = snapshot.Reset()
}
if len(delta.StateEvents) > 0 {

View file

@ -67,6 +67,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
presences, err := snapshot.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000})
if err != nil {
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
_ = snapshot.Reset()
return from
}
@ -95,6 +96,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i])
if err != nil {
req.Log.WithError(err).Error("unable to query presence for user")
_ = snapshot.Rollback()
return from
}
if len(presences) > req.Filter.Presence.Limit {

View file

@ -52,6 +52,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
lastPos, receipts, err := snapshot.RoomReceiptsAfter(ctx, joinedRooms, from)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
_ = snapshot.Reset()
return from
}

View file

@ -44,6 +44,7 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
lastPos, events, err := snapshot.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to)
if err != nil {
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
_ = snapshot.Reset()
return from
}

View file

@ -4,6 +4,7 @@ import (
"context"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/notifier"
@ -72,7 +73,8 @@ func NewSyncStreamProviders(
if err != nil {
panic(err)
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
streams.PDUStreamProvider.Setup(ctx, snapshot)
streams.TypingStreamProvider.Setup(ctx, snapshot)
@ -84,6 +86,7 @@ func NewSyncStreamProviders(
streams.DeviceListStreamProvider.Setup(ctx, snapshot)
streams.PresenceStreamProvider.Setup(ctx, snapshot)
succeeded = true
return streams
}

View file

@ -31,6 +31,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
@ -310,7 +311,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
logrus.WithError(err).Error("Failed to acquire database snapshot for sync request")
return jsonerror.InternalServerError()
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
if syncReq.Since.IsEmpty() {
// Complete sync
@ -409,6 +411,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
succeeded = true
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncReq.Response,
@ -449,7 +452,8 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
logrus.WithError(err).Error("Failed to acquire database snapshot for key change")
return jsonerror.InternalServerError()
}
defer snapshot.Rollback() // nolint:errcheck
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), snapshot, syncReq, fromToken.PDUPosition, toToken.PDUPosition)
_, _, err = internal.DeviceListCatchup(
req.Context(), snapshot, rp.keyAPI, rp.rsAPI, syncReq.Device.UserID,
@ -459,6 +463,7 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info")
return jsonerror.InternalServerError()
}
succeeded = true
return util.JSONResponse{
Code: 200,
JSON: struct {

View file

@ -796,11 +796,6 @@ func (a *UserInternalAPI) PerformPushRulesPut(
if err := a.InputAccountData(ctx, &userReq, &userRes); err != nil {
return err
}
if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
Type: pushRulesAccountDataType,
}); err != nil {
util.GetLogger(ctx).WithError(err).Errorf("syncProducer.SendData failed")
}
return nil
}