mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-09 07:03:10 -06:00
Update GMSL to use sql.Scanner & sql.Valuer
This commit is contained in:
parent
9d42b93027
commit
c66431ab97
2
go.mod
2
go.mod
|
|
@ -25,7 +25,7 @@ require (
|
|||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718071112-6cbd82a2f4ad
|
||||
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.13
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -341,8 +341,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46 h1:5X/kXY3nwqKOwwrE9tnMKrjbsi3PHigQYvrvDBSntO8=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718071112-6cbd82a2f4ad h1:KAOcFZP9g9y/a6W+yBIlDVPhCSGFDR5mQ6GBGuF2o1Y=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718071112-6cbd82a2f4ad/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a h1:DdG8vXMlZ65EAtc4V+3t7zHZ2Gqs24pSnyXS+4BRHUs=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
|
|||
|
|
@ -100,8 +100,8 @@ const selectStateEventSQL = "" +
|
|||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because otherwise
|
||||
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
||||
// TODO: The session_id, transaction_id and history_visibility blanks are here because
|
||||
// the rowsToStreamEvents expects there to be exactly seven columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, 3 as history_visibility" +
|
||||
|
|
|
|||
|
|
@ -254,7 +254,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
excludeFromSync bool
|
||||
addIDs pq.StringArray
|
||||
delIDs pq.StringArray
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil {
|
||||
return nil, nil, err
|
||||
|
|
@ -286,7 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
needSet[id] = true
|
||||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
eventIDToEvent[eventID] = types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
|
|
@ -318,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
|||
func (s *outputRoomEventsStatements) InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility uint8,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
var txnID *string
|
||||
var sessionID *int64
|
||||
|
|
@ -509,7 +509,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
|
|||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||
|
||||
var eventAsString string
|
||||
var historyVisibility uint8
|
||||
var historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
|
|
@ -517,7 +517,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
|
|||
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
evt.Visibility = historyVisibility
|
||||
return id, evt, nil
|
||||
}
|
||||
|
||||
|
|
@ -541,7 +541,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||
return evts, err
|
||||
|
|
@ -549,7 +549,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
|
@ -576,7 +576,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||
return 0, evts, err
|
||||
|
|
@ -584,7 +584,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
|
@ -602,7 +602,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -619,7 +619,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
TransactionID: *txnID,
|
||||
}
|
||||
}
|
||||
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
|
||||
result = append(result, types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
StreamPosition: streamPos,
|
||||
|
|
|
|||
|
|
@ -98,6 +98,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events, err := NewPostgresEventsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
|
|
@ -105,10 +109,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events, err := NewPostgresEventsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
|
|
|
|||
|
|
@ -373,7 +373,7 @@ func (d *Database) WriteEvent(
|
|||
returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
var err error
|
||||
pos, err := d.OutputEvents.InsertEvent(
|
||||
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility.NumericValue(),
|
||||
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
||||
|
|
|
|||
|
|
@ -84,8 +84,8 @@ const selectStateEventSQL = "" +
|
|||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because otherwise
|
||||
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
||||
// TODO: The session_id, transaction_id and history_visibility blanks are here because
|
||||
// the rowsToStreamEvents expects there to be exactly seven columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, 3 AS history_visibility" +
|
||||
|
|
|
|||
|
|
@ -203,7 +203,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
excludeFromSync bool
|
||||
addIDsJSON string
|
||||
delIDsJSON string
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON, &historyVisibility); err != nil {
|
||||
return nil, nil, err
|
||||
|
|
@ -241,7 +241,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
needSet[id] = true
|
||||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
eventIDToEvent[eventID] = types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
|
|
@ -273,7 +273,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
|||
func (s *outputRoomEventsStatements) InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility uint8,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (types.StreamPosition, error) {
|
||||
var txnID *string
|
||||
var sessionID *int64
|
||||
|
|
@ -492,7 +492,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -510,7 +510,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
}
|
||||
}
|
||||
|
||||
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
result = append(result, types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
|
|
@ -526,7 +526,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
|
|||
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
|
||||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||
var eventAsString string
|
||||
var historyVisibility uint8
|
||||
var historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
|
|
@ -534,7 +534,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
|
|||
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
evt.Visibility = historyVisibility
|
||||
return id, evt, nil
|
||||
}
|
||||
|
||||
|
|
@ -561,7 +561,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||
return evts, err
|
||||
|
|
@ -569,7 +569,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
|
@ -599,7 +599,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility uint8
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||
return 0, evts, err
|
||||
|
|
@ -607,7 +607,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
return lastID, evts, rows.Err()
|
||||
|
|
|
|||
|
|
@ -108,6 +108,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
|
|
@ -115,10 +119,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||
return err
|
||||
}
|
||||
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ type Events interface {
|
|||
addState, removeState []string,
|
||||
transactionID *api.TransactionID,
|
||||
excludeFromSync bool,
|
||||
historyVisibility uint8,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (streamPos types.StreamPosition, err error)
|
||||
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
|
|||
events := room.Events()
|
||||
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
|
||||
for _, ev := range events {
|
||||
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, 2)
|
||||
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to InsertEvent: %s", err)
|
||||
}
|
||||
|
|
@ -79,7 +79,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
|
|||
"body": "test.txt",
|
||||
"url": "mxc://test.txt",
|
||||
})
|
||||
if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false, 2); err != nil {
|
||||
if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared); err != nil {
|
||||
return fmt.Errorf("failed to InsertEvent: %s", err)
|
||||
}
|
||||
wantEventID := []string{urlEv.EventID()}
|
||||
|
|
|
|||
Loading…
Reference in a new issue