mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 11:23:11 -06:00
sqlite: room creation works
This commit is contained in:
parent
9bb33c4d24
commit
3a79ea983c
|
|
@ -67,8 +67,8 @@ const selectMembershipForUpdateSQL = "" +
|
||||||
" WHERE room_nid = $1 AND target_nid = $2"
|
" WHERE room_nid = $1 AND target_nid = $2"
|
||||||
|
|
||||||
const updateMembershipSQL = "" +
|
const updateMembershipSQL = "" +
|
||||||
"UPDATE roomserver_membership SET sender_nid = $3, membership_nid = $4, event_nid = $5" +
|
"UPDATE roomserver_membership SET sender_nid = $1, membership_nid = $2, event_nid = $3" +
|
||||||
" WHERE room_nid = $1 AND target_nid = $2"
|
" WHERE room_nid = $4 AND target_nid = $5"
|
||||||
|
|
||||||
type membershipStatements struct {
|
type membershipStatements struct {
|
||||||
insertMembershipStmt *sql.Stmt
|
insertMembershipStmt *sql.Stmt
|
||||||
|
|
@ -186,7 +186,7 @@ func (s *membershipStatements) updateMembership(
|
||||||
) error {
|
) error {
|
||||||
stmt := common.TxStmt(txn, s.updateMembershipStmt)
|
stmt := common.TxStmt(txn, s.updateMembershipStmt)
|
||||||
_, err := stmt.ExecContext(
|
_, err := stmt.ExecContext(
|
||||||
ctx, roomNID, targetUserNID, senderUserNID, membership, eventNID,
|
ctx, senderUserNID, membership, eventNID, roomNID, targetUserNID,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("updateMembership common.TxStmt.ExecContent:", err)
|
fmt.Println("updateMembership common.TxStmt.ExecContent:", err)
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ const selectLatestEventNIDsForUpdateSQL = "" +
|
||||||
"SELECT latest_event_nids, last_event_sent_nid, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
|
"SELECT latest_event_nids, last_event_sent_nid, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
|
||||||
|
|
||||||
const updateLatestEventNIDsSQL = "" +
|
const updateLatestEventNIDsSQL = "" +
|
||||||
"UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1"
|
"UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4"
|
||||||
|
|
||||||
type roomStatements struct {
|
type roomStatements struct {
|
||||||
insertRoomNIDStmt *sql.Stmt
|
insertRoomNIDStmt *sql.Stmt
|
||||||
|
|
@ -145,10 +145,10 @@ func (s *roomStatements) updateLatestEventNIDs(
|
||||||
stmt := common.TxStmt(txn, s.updateLatestEventNIDsStmt)
|
stmt := common.TxStmt(txn, s.updateLatestEventNIDsStmt)
|
||||||
_, err := stmt.ExecContext(
|
_, err := stmt.ExecContext(
|
||||||
ctx,
|
ctx,
|
||||||
roomNID,
|
|
||||||
eventNIDsAsArray(eventNIDs),
|
eventNIDsAsArray(eventNIDs),
|
||||||
int64(lastEventSentNID),
|
int64(lastEventSentNID),
|
||||||
int64(stateSnapshotNID),
|
int64(stateSnapshotNID),
|
||||||
|
roomNID,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("updateLatestEventNIDs stmt.ExecContext:", err)
|
fmt.Println("updateLatestEventNIDs stmt.ExecContext:", err)
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
@ -256,15 +255,11 @@ func (d *Database) Events(
|
||||||
var eventJSONs []eventJSONPair
|
var eventJSONs []eventJSONPair
|
||||||
var err error
|
var err error
|
||||||
results := make([]types.Event, len(eventNIDs))
|
results := make([]types.Event, len(eventNIDs))
|
||||||
fmt.Println("pre txn")
|
|
||||||
common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
fmt.Println("in txn", txn)
|
|
||||||
eventJSONs, err = d.statements.bulkSelectEventJSON(ctx, txn, eventNIDs)
|
eventJSONs, err = d.statements.bulkSelectEventJSON(ctx, txn, eventNIDs)
|
||||||
if err != nil || len(eventJSONs) == 0 {
|
if err != nil || len(eventJSONs) == 0 {
|
||||||
fmt.Println("d.statements.bulkSelectEventJSON:", err)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
fmt.Println("selected txn")
|
|
||||||
for i, eventJSON := range eventJSONs {
|
for i, eventJSON := range eventJSONs {
|
||||||
result := &results[i]
|
result := &results[i]
|
||||||
result.EventNID = eventJSON.EventNID
|
result.EventNID = eventJSON.EventNID
|
||||||
|
|
@ -276,7 +271,6 @@ func (d *Database) Events(
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
fmt.Println("post txn")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []types.Event{}, err
|
return []types.Event{}, err
|
||||||
}
|
}
|
||||||
|
|
@ -290,9 +284,7 @@ func (d *Database) AddState(
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
stateBlockNIDs []types.StateBlockNID,
|
||||||
state []types.StateEntry,
|
state []types.StateEntry,
|
||||||
) (stateNID types.StateSnapshotNID, err error) {
|
) (stateNID types.StateSnapshotNID, err error) {
|
||||||
fmt.Println("AddState INSERT STATE START", stateBlockNIDs)
|
|
||||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
fmt.Println("insert state txn created")
|
|
||||||
if len(state) > 0 {
|
if len(state) > 0 {
|
||||||
stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx, txn)
|
stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx, txn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -304,10 +296,8 @@ func (d *Database) AddState(
|
||||||
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
|
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
|
||||||
}
|
}
|
||||||
stateNID, err = d.statements.insertState(ctx, txn, roomNID, stateBlockNIDs)
|
stateNID, err = d.statements.insertState(ctx, txn, roomNID, stateBlockNIDs)
|
||||||
fmt.Println("AddState: completing txn", time.Now(), "err=", err)
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
fmt.Println("AddState INSERT STATE END pkey=", stateNID, time.Now(), "err=", err)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
@ -318,11 +308,9 @@ func (d *Database) AddState(
|
||||||
func (d *Database) SetState(
|
func (d *Database) SetState(
|
||||||
ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID,
|
ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID,
|
||||||
) error {
|
) error {
|
||||||
fmt.Println("SetState event NID:", eventNID, "state NID:", stateNID)
|
|
||||||
e := common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
e := common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
return d.statements.updateEventState(ctx, txn, eventNID, stateNID)
|
return d.statements.updateEventState(ctx, txn, eventNID, stateNID)
|
||||||
})
|
})
|
||||||
fmt.Println("SetState finish", e)
|
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -341,13 +329,10 @@ func (d *Database) StateAtEventIDs(
|
||||||
func (d *Database) StateBlockNIDs(
|
func (d *Database) StateBlockNIDs(
|
||||||
ctx context.Context, stateNIDs []types.StateSnapshotNID,
|
ctx context.Context, stateNIDs []types.StateSnapshotNID,
|
||||||
) (sl []types.StateBlockNIDList, err error) {
|
) (sl []types.StateBlockNIDList, err error) {
|
||||||
fmt.Println("StateBlockNIDs SELECT STATE START", stateNIDs)
|
|
||||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
fmt.Println(" in txn")
|
|
||||||
sl, err = d.statements.bulkSelectStateBlockNIDs(ctx, txn, stateNIDs)
|
sl, err = d.statements.bulkSelectStateBlockNIDs(ctx, txn, stateNIDs)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
fmt.Println("StateBlockNIDs SELECT STATE END", sl)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -388,7 +373,6 @@ func (d *Database) EventIDs(
|
||||||
func (d *Database) GetLatestEventsForUpdate(
|
func (d *Database) GetLatestEventsForUpdate(
|
||||||
ctx context.Context, roomNID types.RoomNID,
|
ctx context.Context, roomNID types.RoomNID,
|
||||||
) (types.RoomRecentEventsUpdater, error) {
|
) (types.RoomRecentEventsUpdater, error) {
|
||||||
fmt.Println("=============== GetLatestEventsForUpdate BEGIN TXN")
|
|
||||||
txn, err := d.db.Begin()
|
txn, err := d.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -412,7 +396,6 @@ func (d *Database) GetLatestEventsForUpdate(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fmt.Println("GetLatestEventsForUpdate returning updater")
|
|
||||||
|
|
||||||
// FIXME: we probably want to support long-lived txns in sqlite somehow, but we don't because we get
|
// FIXME: we probably want to support long-lived txns in sqlite somehow, but we don't because we get
|
||||||
// 'database is locked' errors caused by multiple write txns (one being the long-lived txn created here)
|
// 'database is locked' errors caused by multiple write txns (one being the long-lived txn created here)
|
||||||
|
|
@ -475,7 +458,6 @@ func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, p
|
||||||
|
|
||||||
// IsReferenced implements types.RoomRecentEventsUpdater
|
// IsReferenced implements types.RoomRecentEventsUpdater
|
||||||
func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (res bool, err error) {
|
func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (res bool, err error) {
|
||||||
fmt.Println("[[TXN]] IsReferenced")
|
|
||||||
err = common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
||||||
err := u.d.statements.selectPreviousEventExists(u.ctx, txn, eventReference.EventID, eventReference.EventSHA256)
|
err := u.d.statements.selectPreviousEventExists(u.ctx, txn, eventReference.EventID, eventReference.EventSHA256)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
@ -508,8 +490,6 @@ func (u *roomRecentEventsUpdater) SetLatestEvents(
|
||||||
|
|
||||||
// HasEventBeenSent implements types.RoomRecentEventsUpdater
|
// HasEventBeenSent implements types.RoomRecentEventsUpdater
|
||||||
func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (res bool, err error) {
|
func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (res bool, err error) {
|
||||||
// TODO: transaction was removed here - is this wise?
|
|
||||||
fmt.Println("[[TXN]] HasEventBeenSent")
|
|
||||||
err = common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
||||||
res, err = u.d.statements.selectEventSentToOutput(u.ctx, txn, eventNID)
|
res, err = u.d.statements.selectEventSentToOutput(u.ctx, txn, eventNID)
|
||||||
return err
|
return err
|
||||||
|
|
@ -519,8 +499,6 @@ func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (res
|
||||||
|
|
||||||
// MarkEventAsSent implements types.RoomRecentEventsUpdater
|
// MarkEventAsSent implements types.RoomRecentEventsUpdater
|
||||||
func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error {
|
func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error {
|
||||||
// TODO: transaction was removed here - is this wise?
|
|
||||||
fmt.Println("[[TXN]] updateEventSentToOutput")
|
|
||||||
err := common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
err := common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
||||||
return u.d.statements.updateEventSentToOutput(u.ctx, txn, eventNID)
|
return u.d.statements.updateEventSentToOutput(u.ctx, txn, eventNID)
|
||||||
})
|
})
|
||||||
|
|
@ -528,8 +506,6 @@ func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (mu types.MembershipUpdater, err error) {
|
func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (mu types.MembershipUpdater, err error) {
|
||||||
// TODO: transaction was removed here - is this wise?
|
|
||||||
fmt.Println("[[TXN]] membershipUpdaterTxn")
|
|
||||||
err = common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(u.d.db, func(txn *sql.Tx) error {
|
||||||
mu, err = u.d.membershipUpdaterTxn(u.ctx, txn, u.roomNID, targetUserNID)
|
mu, err = u.d.membershipUpdaterTxn(u.ctx, txn, u.roomNID, targetUserNID)
|
||||||
return err
|
return err
|
||||||
|
|
@ -625,14 +601,12 @@ func (d *Database) MembershipUpdater(
|
||||||
ctx context.Context, roomID, targetUserID string,
|
ctx context.Context, roomID, targetUserID string,
|
||||||
) (types.MembershipUpdater, error) {
|
) (types.MembershipUpdater, error) {
|
||||||
txn, err := d.db.Begin()
|
txn, err := d.db.Begin()
|
||||||
fmt.Println("=== UPDATER TXN START ====")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
succeeded := false
|
succeeded := false
|
||||||
defer func() {
|
defer func() {
|
||||||
if !succeeded {
|
if !succeeded {
|
||||||
fmt.Println("=== UPDATER TXN ROLLBACK ====")
|
|
||||||
txn.Rollback() // nolint: errcheck
|
txn.Rollback() // nolint: errcheck
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue