Write and read transaction id from sync DB (#367)

This commit is contained in:
Erik Johnston 2017-12-06 09:37:18 +00:00 committed by GitHub
parent 578d8cf492
commit 75aa316a6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 11 deletions

View file

@ -132,6 +132,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
addsStateEvents, addsStateEvents,
msg.AddsStateEventIDs, msg.AddsStateEventIDs,
msg.RemovesStateEventIDs, msg.RemovesStateEventIDs,
msg.TransactionID,
) )
if err != nil { if err != nil {
return err return err

View file

@ -18,6 +18,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -44,7 +46,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
-- if there is no delta. -- if there is no delta.
add_state_ids TEXT[], add_state_ids TEXT[],
remove_state_ids TEXT[] remove_state_ids TEXT[],
device_id TEXT, -- The local device that sent the event, if any
transaction_id TEXT -- The transaction id used to send the event, if any
); );
-- for event selection -- for event selection
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
@ -52,14 +56,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev
const insertEventSQL = "" + const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" + "INSERT INTO syncapi_output_room_events (" +
" room_id, event_id, event_json, add_state_ids, remove_state_ids" + " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" +
") VALUES ($1, $2, $3, $4, $5) RETURNING id" ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id"
const selectEventsSQL = "" + const selectEventsSQL = "" +
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" + const selectRecentEventsSQL = "" +
"SELECT id, event_json FROM syncapi_output_room_events" + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4" " ORDER BY id DESC LIMIT $4"
@ -164,7 +168,10 @@ func (s *outputRoomEventsStatements) selectStateInRange(
} }
stateNeeded[ev.RoomID()] = needSet stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)} eventIDToEvent[ev.EventID()] = streamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
}
} }
return stateNeeded, eventIDToEvent, nil return stateNeeded, eventIDToEvent, nil
@ -190,7 +197,14 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
func (s *outputRoomEventsStatements) insertEvent( func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.Event, addState, removeState []string, event *gomatrixserverlib.Event, addState, removeState []string,
transactionID *api.TransactionID,
) (streamPos int64, err error) { ) (streamPos int64, err error) {
var deviceID, txnID *string
if transactionID != nil {
deviceID = &transactionID.DeviceID
txnID = &transactionID.TransactionID
}
stmt := common.TxStmt(txn, s.insertEventStmt) stmt := common.TxStmt(txn, s.insertEventStmt)
err = stmt.QueryRowContext( err = stmt.QueryRowContext(
ctx, ctx,
@ -199,6 +213,8 @@ func (s *outputRoomEventsStatements) insertEvent(
event.JSON(), event.JSON(),
pq.StringArray(addState), pq.StringArray(addState),
pq.StringArray(removeState), pq.StringArray(removeState),
deviceID,
txnID,
).Scan(&streamPos) ).Scan(&streamPos)
return return
} }
@ -241,10 +257,13 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
var result []streamEvent var result []streamEvent
for rows.Next() { for rows.Next() {
var ( var (
streamPos int64 streamPos int64
eventBytes []byte eventBytes []byte
deviceID *string
txnID *string
transactionID *api.TransactionID
) )
if err := rows.Scan(&streamPos, &eventBytes); err != nil { if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil {
return nil, err return nil, err
} }
// TODO: Handle redacted events // TODO: Handle redacted events
@ -252,7 +271,19 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
result = append(result, streamEvent{ev, types.StreamPosition(streamPos)})
if deviceID != nil && txnID != nil {
transactionID = &api.TransactionID{
DeviceID: *deviceID,
TransactionID: *txnID,
}
}
result = append(result, streamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
transactionID: transactionID,
})
} }
return result, nil return result, nil
} }

View file

@ -18,6 +18,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -38,6 +40,7 @@ type stateDelta struct {
type streamEvent struct { type streamEvent struct {
gomatrixserverlib.Event gomatrixserverlib.Event
streamPosition types.StreamPosition streamPosition types.StreamPosition
transactionID *api.TransactionID
} }
// SyncServerDatabase represents a sync server database // SyncServerDatabase represents a sync server database
@ -100,10 +103,11 @@ func (d *SyncServerDatabase) WriteEvent(
ev *gomatrixserverlib.Event, ev *gomatrixserverlib.Event,
addStateEvents []gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event,
addStateEventIDs, removeStateEventIDs []string, addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID,
) (streamPos types.StreamPosition, returnErr error) { ) (streamPos types.StreamPosition, returnErr error) {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
var err error var err error
pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs) pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID)
if err != nil { if err != nil {
return err return err
} }
@ -565,7 +569,7 @@ func (d *SyncServerDatabase) getStateDeltas(
} }
s := make([]streamEvent, len(allState)) s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
s[i] = streamEvent{allState[i], types.StreamPosition(0)} s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)}
} }
state[roomID] = s state[roomID] = s
continue // we'll add this room in when we do joined rooms continue // we'll add this room in when we do joined rooms