From 75aa316a6a722bf1d30d837db8f08a9d112b24a5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Dec 2017 09:37:18 +0000 Subject: [PATCH] Write and read transaction id from sync DB (#367) --- .../dendrite/syncapi/consumers/roomserver.go | 1 + .../storage/output_room_events_table.go | 49 +++++++++++++++---- .../dendrite/syncapi/storage/syncserver.go | 8 ++- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 677eeb42b..273b6aea1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -132,6 +132,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, + msg.TransactionID, ) if err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index fb00ad842..333f608d2 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" @@ -44,7 +46,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- if there is no delta. add_state_ids TEXT[], - remove_state_ids TEXT[] + remove_state_ids TEXT[], + device_id TEXT, -- The local device that sent the event, if any + transaction_id TEXT -- The transaction id used to send the event, if any ); -- for event selection CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); @@ -52,14 +56,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - " room_id, event_id, event_json, add_state_ids, remove_state_ids" + - ") VALUES ($1, $2, $3, $4, $5) RETURNING id" + " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT id, event_json FROM syncapi_output_room_events" + + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" @@ -164,7 +168,10 @@ func (s *outputRoomEventsStatements) selectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)} + eventIDToEvent[ev.EventID()] = streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + } } return stateNeeded, eventIDToEvent, nil @@ -190,7 +197,14 @@ func (s *outputRoomEventsStatements) selectMaxEventID( func (s *outputRoomEventsStatements) insertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string, + transactionID *api.TransactionID, ) (streamPos int64, err error) { + var deviceID, txnID *string + if transactionID != nil { + deviceID = &transactionID.DeviceID + txnID = &transactionID.TransactionID + } + stmt := common.TxStmt(txn, s.insertEventStmt) err = stmt.QueryRowContext( ctx, @@ -199,6 +213,8 @@ func (s *outputRoomEventsStatements) insertEvent( event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), + deviceID, + txnID, ).Scan(&streamPos) return } @@ -241,10 +257,13 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { var result []streamEvent for rows.Next() { var ( - streamPos int64 - eventBytes []byte + streamPos int64 + eventBytes []byte + deviceID *string + txnID *string + transactionID *api.TransactionID ) - if err := rows.Scan(&streamPos, &eventBytes); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil { return nil, err } // TODO: Handle redacted events @@ -252,7 +271,19 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { if err != nil { return nil, err } - result = append(result, streamEvent{ev, types.StreamPosition(streamPos)}) + + if deviceID != nil && txnID != nil { + transactionID = &api.TransactionID{ + DeviceID: *deviceID, + TransactionID: *txnID, + } + } + + result = append(result, streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + transactionID: transactionID, + }) } return result, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 1a18d9374..8a5b9648d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" "fmt" + + "github.com/matrix-org/dendrite/roomserver/api" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -38,6 +40,7 @@ type stateDelta struct { type streamEvent struct { gomatrixserverlib.Event streamPosition types.StreamPosition + transactionID *api.TransactionID } // SyncServerDatabase represents a sync server database @@ -100,10 +103,11 @@ func (d *SyncServerDatabase) WriteEvent( ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, + transactionID *api.TransactionID, ) (streamPos types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs) + pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID) if err != nil { return err } @@ -565,7 +569,7 @@ func (d *SyncServerDatabase) getStateDeltas( } s := make([]streamEvent, len(allState)) for i := 0; i < len(s); i++ { - s[i] = streamEvent{allState[i], types.StreamPosition(0)} + s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} } state[roomID] = s continue // we'll add this room in when we do joined rooms