mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-17 19:13:10 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/usagestats
This commit is contained in:
commit
c7fe2b7b2f
|
|
@ -103,15 +103,15 @@ func (m *DendriteMonolith) SessionCount() int {
|
|||
|
||||
func (m *DendriteMonolith) RegisterNetworkInterface(name string, index int, mtu int, up bool, broadcast bool, loopback bool, pointToPoint bool, multicast bool, addrs string) {
|
||||
m.PineconeMulticast.RegisterInterface(pineconeMulticast.InterfaceInfo{
|
||||
Name: name,
|
||||
Index: index,
|
||||
Mtu: mtu,
|
||||
Up: up,
|
||||
Broadcast: broadcast,
|
||||
Loopback: loopback,
|
||||
Name: name,
|
||||
Index: index,
|
||||
Mtu: mtu,
|
||||
Up: up,
|
||||
Broadcast: broadcast,
|
||||
Loopback: loopback,
|
||||
PointToPoint: pointToPoint,
|
||||
Multicast: multicast,
|
||||
Addrs: addrs,
|
||||
Multicast: multicast,
|
||||
Addrs: addrs,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -126,7 +126,9 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
|
|||
|
||||
func (m *DendriteMonolith) SetStaticPeer(uri string) {
|
||||
m.PineconeManager.RemovePeers()
|
||||
m.PineconeManager.AddPeer(strings.TrimSpace(uri))
|
||||
for _, uri := range strings.Split(uri, ",") {
|
||||
m.PineconeManager.AddPeer(strings.TrimSpace(uri))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *DendriteMonolith) DisconnectType(peertype int) {
|
||||
|
|
@ -279,19 +281,21 @@ func (m *DendriteMonolith) Start() {
|
|||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||
cfg.Global.PrivateKey = sk
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||
cfg.Global.JetStream.InMemory = true
|
||||
cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/%s", m.StorageDirectory, prefix))
|
||||
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-account.db", m.StorageDirectory, prefix))
|
||||
cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-mediaapi.db", m.StorageDirectory))
|
||||
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-syncapi.db", m.StorageDirectory, prefix))
|
||||
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-roomserver.db", m.StorageDirectory, prefix))
|
||||
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-keyserver.db", m.StorageDirectory, prefix))
|
||||
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-federationsender.db", m.StorageDirectory, prefix))
|
||||
cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory))
|
||||
cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory))
|
||||
cfg.Global.JetStream.InMemory = false
|
||||
cfg.Global.JetStream.StoragePath = config.Path(filepath.Join(m.CacheDirectory, prefix))
|
||||
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(m.StorageDirectory, prefix)))
|
||||
cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(m.StorageDirectory, prefix)))
|
||||
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(m.StorageDirectory, prefix)))
|
||||
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(m.StorageDirectory, prefix)))
|
||||
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(m.StorageDirectory, prefix)))
|
||||
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", filepath.Join(m.StorageDirectory, prefix)))
|
||||
cfg.MediaAPI.BasePath = config.Path(filepath.Join(m.CacheDirectory, "media"))
|
||||
cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(m.CacheDirectory, "media"))
|
||||
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
|
||||
cfg.ClientAPI.RegistrationDisabled = false
|
||||
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
|
||||
cfg.SyncAPI.Fulltext.Enabled = true
|
||||
cfg.SyncAPI.Fulltext.IndexPath = config.Path(filepath.Join(m.CacheDirectory, "search"))
|
||||
if err = cfg.Derive(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ var (
|
|||
pwdStdin = flag.Bool("passwordstdin", false, "Reads the password from stdin")
|
||||
isAdmin = flag.Bool("admin", false, "Create an admin account")
|
||||
resetPassword = flag.Bool("reset-password", false, "Deprecated")
|
||||
serverURL = flag.String("url", "https://localhost:8448", "The URL to connect to.")
|
||||
serverURL = flag.String("url", "http://localhost:8008", "The URL to connect to.")
|
||||
validUsernameRegex = regexp.MustCompile(`^[0-9a-z_\-=./]+$`)
|
||||
timeout = flag.Duration("timeout", time.Second*30, "Timeout for the http client when connecting to the server")
|
||||
)
|
||||
|
|
|
|||
|
|
@ -89,6 +89,7 @@ func main() {
|
|||
if configFlagSet {
|
||||
cfg = setup.ParseFlags(true)
|
||||
sk = cfg.Global.PrivateKey
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
} else {
|
||||
keyfile := filepath.Join(*instanceDir, *instanceName) + ".pem"
|
||||
if _, err := os.Stat(keyfile); os.IsNotExist(err) {
|
||||
|
|
@ -142,6 +143,9 @@ func main() {
|
|||
cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(*instanceDir, *instanceName)))
|
||||
cfg.ClientAPI.RegistrationDisabled = false
|
||||
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
|
||||
cfg.MediaAPI.BasePath = config.Path(*instanceDir)
|
||||
cfg.SyncAPI.Fulltext.Enabled = true
|
||||
cfg.SyncAPI.Fulltext.IndexPath = config.Path(*instanceDir)
|
||||
if err := cfg.Derive(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -134,6 +134,9 @@ func main() {
|
|||
cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(*instanceDir, *instanceName)))
|
||||
cfg.ClientAPI.RegistrationDisabled = false
|
||||
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
|
||||
cfg.MediaAPI.BasePath = config.Path(*instanceDir)
|
||||
cfg.SyncAPI.Fulltext.Enabled = true
|
||||
cfg.SyncAPI.Fulltext.IndexPath = config.Path(*instanceDir)
|
||||
if err := cfg.Derive(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ User accounts can be created on a Dendrite instance in a number of ways.
|
|||
The `create-account` tool is built in the `bin` folder when building Dendrite with
|
||||
the `build.sh` script.
|
||||
|
||||
It uses the `dendrite.yaml` configuration file to connect to a running Dendrite instance and requires
|
||||
It uses the `dendrite.yaml` configuration file to connect to a running Dendrite instance and requires
|
||||
shared secret registration to be enabled as explained below.
|
||||
|
||||
An example of using `create-account` to create a **normal account**:
|
||||
|
|
@ -31,11 +31,11 @@ To create a new **admin account**, add the `-admin` flag:
|
|||
./bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin
|
||||
```
|
||||
|
||||
By default `create-account` uses `https://localhost:8448` to connect to Dendrite, this can be overwritten using
|
||||
By default `create-account` uses `http://localhost:8008` to connect to Dendrite, this can be overwritten using
|
||||
the `-url` flag:
|
||||
|
||||
```bash
|
||||
./bin/create-account -config /path/to/dendrite.yaml -username USERNAME -url http://localhost:8008
|
||||
./bin/create-account -config /path/to/dendrite.yaml -username USERNAME -url https://localhost:8448
|
||||
```
|
||||
|
||||
An example of using `create-account` when running in **Docker**, having found the `CONTAINERNAME` from `docker ps`:
|
||||
|
|
@ -43,6 +43,7 @@ An example of using `create-account` when running in **Docker**, having found th
|
|||
```bash
|
||||
docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME
|
||||
```
|
||||
|
||||
```bash
|
||||
docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin
|
||||
```
|
||||
|
|
|
|||
|
|
@ -233,7 +233,7 @@ func (a *KeyInternalAPI) PerformMarkAsStaleIfNeeded(ctx context.Context, req *ap
|
|||
return err
|
||||
}
|
||||
if len(knownDevices) == 0 {
|
||||
return fmt.Errorf("unknown user %s", req.UserID)
|
||||
return nil // fmt.Errorf("unknown user %s", req.UserID)
|
||||
}
|
||||
|
||||
for i := range knownDevices {
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import (
|
|||
|
||||
type DatabaseTransaction interface {
|
||||
sqlutil.Transaction
|
||||
Reset() (err error)
|
||||
SharedUsers
|
||||
|
||||
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
|
||||
|
|
|
|||
|
|
@ -104,12 +104,7 @@ const selectStateEventSQL = "" +
|
|||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because
|
||||
// the rowsToStreamEvents expects there to be exactly seven columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
|
||||
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||
"SELECT event_id, added_at, headered_event_json, history_visibility FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
|
||||
|
|
@ -365,7 +360,36 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
|||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
|
||||
return rowsToStreamEvents(rows)
|
||||
return currentRoomStateRowsToStreamEvents(rows)
|
||||
}
|
||||
|
||||
func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||
var events []types.StreamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &historyVisibility); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
events = append(events, types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
StreamPosition: streamPos,
|
||||
})
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
|
|
|
|||
|
|
@ -31,19 +31,6 @@ func (d *DatabaseTransaction) Rollback() error {
|
|||
return d.txn.Rollback()
|
||||
}
|
||||
|
||||
func (d *DatabaseTransaction) Reset() (err error) {
|
||||
if d.txn == nil {
|
||||
return nil
|
||||
}
|
||||
if err = d.txn.Rollback(); err != nil {
|
||||
return err
|
||||
}
|
||||
if d.txn, err = d.DB.BeginTx(d.ctx, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (d *DatabaseTransaction) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) {
|
||||
id, err := d.OutputEvents.SelectMaxEventID(ctx, d.txn)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -88,12 +88,7 @@ const selectStateEventSQL = "" +
|
|||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because
|
||||
// the rowsToStreamEvents expects there to be exactly seven columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
|
||||
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
|
||||
"SELECT event_id, added_at, headered_event_json, history_visibility FROM syncapi_current_room_state WHERE event_id IN ($1)"
|
||||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id IN(" +
|
||||
|
|
@ -378,7 +373,7 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
|||
return nil, err
|
||||
}
|
||||
start = start + n
|
||||
events, err := rowsToStreamEvents(rows)
|
||||
events, err := currentRoomStateRowsToStreamEvents(rows)
|
||||
internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -388,6 +383,35 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||
var events []types.StreamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &historyVisibility); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
events = append(events, types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
StreamPosition: streamPos,
|
||||
})
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
result := []*gomatrixserverlib.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
|
|
|
|||
88
syncapi/storage/tables/current_room_state_test.go
Normal file
88
syncapi/storage/tables/current_room_state_test.go
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
package tables_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
)
|
||||
|
||||
func newCurrentRoomStateTable(t *testing.T, dbType test.DBType) (tables.CurrentRoomState, *sql.DB, func()) {
|
||||
t.Helper()
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := sqlutil.Open(&config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
}, sqlutil.NewExclusiveWriter())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open db: %s", err)
|
||||
}
|
||||
|
||||
var tab tables.CurrentRoomState
|
||||
switch dbType {
|
||||
case test.DBTypePostgres:
|
||||
tab, err = postgres.NewPostgresCurrentRoomStateTable(db)
|
||||
case test.DBTypeSQLite:
|
||||
var stream sqlite3.StreamIDStatements
|
||||
if err = stream.Prepare(db); err != nil {
|
||||
t.Fatalf("failed to prepare stream stmts: %s", err)
|
||||
}
|
||||
tab, err = sqlite3.NewSqliteCurrentRoomStateTable(db, &stream)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("failed to make new table: %s", err)
|
||||
}
|
||||
return tab, db, close
|
||||
}
|
||||
|
||||
func TestCurrentRoomStateTable(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
alice := test.NewUser(t)
|
||||
room := test.NewRoom(t, alice)
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
tab, db, close := newCurrentRoomStateTable(t, dbType)
|
||||
defer close()
|
||||
events := room.CurrentState()
|
||||
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
|
||||
for i, ev := range events {
|
||||
err := tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to UpsertRoomState: %w", err)
|
||||
}
|
||||
}
|
||||
wantEventIDs := []string{
|
||||
events[0].EventID(), events[1].EventID(), events[2].EventID(), events[3].EventID(),
|
||||
}
|
||||
gotEvents, err := tab.SelectEventsWithEventIDs(ctx, txn, wantEventIDs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to SelectEventsWithEventIDs: %w", err)
|
||||
}
|
||||
if len(gotEvents) != len(wantEventIDs) {
|
||||
return fmt.Errorf("SelectEventsWithEventIDs\ngot %d, want %d results", len(gotEvents), len(wantEventIDs))
|
||||
}
|
||||
gotEventIDs := make(map[string]struct{}, len(gotEvents))
|
||||
for _, event := range gotEvents {
|
||||
if event.ExcludeFromSync {
|
||||
return fmt.Errorf("SelectEventsWithEventIDs ExcludeFromSync should be false for current room state event %+v", event)
|
||||
}
|
||||
gotEventIDs[event.EventID()] = struct{}{}
|
||||
}
|
||||
for _, id := range wantEventIDs {
|
||||
if _, ok := gotEventIDs[id]; !ok {
|
||||
return fmt.Errorf("SelectEventsWithEventIDs\nexpected id %q not returned", id)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -54,7 +54,6 @@ func (p *AccountDataStreamProvider) IncrementalSync(
|
|||
)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,13 +34,11 @@ func (p *DeviceListStreamProvider) IncrementalSync(
|
|||
to, _, err = internal.DeviceListCatchup(context.Background(), snapshot, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -56,7 +56,6 @@ func (p *InviteStreamProvider) IncrementalSync(
|
|||
)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@ func (p *NotificationDataStreamProvider) IncrementalSync(
|
|||
countsByRoom, err := snapshot.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -75,7 +75,6 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
joinedRoomIDs, err := snapshot.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
@ -102,10 +101,10 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
)
|
||||
if jerr != nil {
|
||||
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
|
||||
if err = snapshot.Reset(); err != nil {
|
||||
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
|
||||
return from
|
||||
}
|
||||
continue // return from
|
||||
continue
|
||||
}
|
||||
req.Response.Rooms.Join[roomID] = *jr
|
||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||
|
|
@ -115,7 +114,6 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
for _, peek := range peeks {
|
||||
|
|
@ -126,10 +124,10 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||
if err = snapshot.Reset(); err != nil {
|
||||
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
|
||||
return from
|
||||
}
|
||||
continue // return from
|
||||
continue
|
||||
}
|
||||
req.Response.Rooms.Peek[peek.RoomID] = *jr
|
||||
}
|
||||
|
|
@ -160,14 +158,12 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
if req.WantFullState {
|
||||
if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
|
||||
_ = snapshot.Reset()
|
||||
return
|
||||
return from
|
||||
}
|
||||
} else {
|
||||
if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
|
||||
_ = snapshot.Reset()
|
||||
return
|
||||
return from
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -181,7 +177,6 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
|
||||
if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil {
|
||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||
_ = snapshot.Reset()
|
||||
}
|
||||
|
||||
newPos = from
|
||||
|
|
@ -201,13 +196,10 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
var pos types.StreamPosition
|
||||
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
|
||||
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
||||
if err == context.DeadlineExceeded || err == context.Canceled {
|
||||
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
|
||||
return newPos
|
||||
}
|
||||
if err = snapshot.Reset(); err != nil {
|
||||
return from
|
||||
}
|
||||
continue // return to
|
||||
continue
|
||||
}
|
||||
// Reset the position, as it is only for the special case of newly joined rooms
|
||||
if delta.NewlyJoined {
|
||||
|
|
@ -307,7 +299,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|||
events, err := applyHistoryVisibilityFilter(ctx, snapshot, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("unable to apply history visibility filter")
|
||||
_ = snapshot.Reset()
|
||||
}
|
||||
|
||||
if len(delta.StateEvents) > 0 {
|
||||
|
|
|
|||
|
|
@ -67,7 +67,6 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
|||
presences, err := snapshot.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000})
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -52,7 +52,6 @@ func (p *ReceiptStreamProvider) IncrementalSync(
|
|||
lastPos, receipts, err := snapshot.RoomReceiptsAfter(ctx, joinedRooms, from)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -44,7 +44,6 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
|||
lastPos, events, err := snapshot.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
|
||||
_ = snapshot.Reset()
|
||||
return from
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -306,13 +306,19 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|||
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
|
||||
}
|
||||
|
||||
snapshot, err := rp.db.NewDatabaseSnapshot(req.Context())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to acquire database snapshot for sync request")
|
||||
return jsonerror.InternalServerError()
|
||||
withTransaction := func(from types.StreamPosition, f func(snapshot storage.DatabaseTransaction) types.StreamPosition) types.StreamPosition {
|
||||
var succeeded bool
|
||||
snapshot, err := rp.db.NewDatabaseSnapshot(req.Context())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to acquire database snapshot for sync request")
|
||||
return from
|
||||
}
|
||||
defer func() {
|
||||
succeeded = err == nil
|
||||
sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
|
||||
}()
|
||||
return f(snapshot)
|
||||
}
|
||||
var succeeded bool
|
||||
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)
|
||||
|
||||
if syncReq.Since.IsEmpty() {
|
||||
// Complete sync
|
||||
|
|
@ -320,72 +326,162 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|||
// Get the current DeviceListPosition first, as the currentPosition
|
||||
// might advance while processing other streams, resulting in flakey
|
||||
// tests.
|
||||
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
DeviceListPosition: withTransaction(
|
||||
syncReq.Since.DeviceListPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.DeviceListStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
PDUPosition: withTransaction(
|
||||
syncReq.Since.PDUPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.PDUStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
TypingPosition: withTransaction(
|
||||
syncReq.Since.TypingPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.TypingStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
ReceiptPosition: withTransaction(
|
||||
syncReq.Since.ReceiptPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.ReceiptStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
InvitePosition: withTransaction(
|
||||
syncReq.Since.InvitePosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.InviteStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
SendToDevicePosition: withTransaction(
|
||||
syncReq.Since.SendToDevicePosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.SendToDeviceStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
AccountDataPosition: withTransaction(
|
||||
syncReq.Since.AccountDataPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.AccountDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
NotificationDataPosition: withTransaction(
|
||||
syncReq.Since.NotificationDataPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.NotificationDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
PresencePosition: withTransaction(
|
||||
syncReq.Since.PresencePosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.PresenceStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
}
|
||||
} else {
|
||||
// Incremental sync
|
||||
syncReq.Response.NextBatch = types.StreamingToken{
|
||||
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.PDUPosition, currentPos.PDUPosition,
|
||||
PDUPosition: withTransaction(
|
||||
syncReq.Since.PDUPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.PDUStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.PDUPosition, currentPos.PDUPosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.TypingPosition, currentPos.TypingPosition,
|
||||
TypingPosition: withTransaction(
|
||||
syncReq.Since.TypingPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.TypingStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.TypingPosition, currentPos.TypingPosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
|
||||
ReceiptPosition: withTransaction(
|
||||
syncReq.Since.ReceiptPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.ReceiptStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.InvitePosition, currentPos.InvitePosition,
|
||||
InvitePosition: withTransaction(
|
||||
syncReq.Since.InvitePosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.InviteStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.InvitePosition, currentPos.InvitePosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
|
||||
SendToDevicePosition: withTransaction(
|
||||
syncReq.Since.SendToDevicePosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
|
||||
AccountDataPosition: withTransaction(
|
||||
syncReq.Since.AccountDataPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.AccountDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
|
||||
NotificationDataPosition: withTransaction(
|
||||
syncReq.Since.NotificationDataPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.NotificationDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
|
||||
DeviceListPosition: withTransaction(
|
||||
syncReq.Since.DeviceListPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.DeviceListStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.PresencePosition, currentPos.PresencePosition,
|
||||
PresencePosition: withTransaction(
|
||||
syncReq.Since.PresencePosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.PresenceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
syncReq.Since.PresencePosition, currentPos.PresencePosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
}
|
||||
// it's possible for there to be no updates for this user even though since < current pos,
|
||||
|
|
@ -411,7 +507,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|||
}
|
||||
}
|
||||
|
||||
succeeded = true
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: syncReq.Response,
|
||||
|
|
|
|||
Loading…
Reference in a new issue