From f001f0507fdfb51b2096ffc3eaa410b5a43dfc3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Nov 2017 11:41:34 +0000 Subject: [PATCH 01/10] Write and read transaction id from sync DB --- .../dendrite/syncapi/consumers/roomserver.go | 1 + .../storage/output_room_events_table.go | 49 +++++++++++++++---- .../dendrite/syncapi/storage/syncserver.go | 8 ++- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 677eeb42b..273b6aea1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -132,6 +132,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, + msg.TransactionID, ) if err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index fb00ad842..27e44d3dc 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" @@ -44,7 +46,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- if there is no delta. add_state_ids TEXT[], - remove_state_ids TEXT[] + remove_state_ids TEXT[], + device_id TEXT, -- The local device that sent the event, if any + transaction_id TEXT -- The transaction id used to send the event, if any ); -- for event selection CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); @@ -52,14 +56,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - " room_id, event_id, event_json, add_state_ids, remove_state_ids" + - ") VALUES ($1, $2, $3, $4, $5) RETURNING id" + " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT id, event_json FROM syncapi_output_room_events" + + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" @@ -164,7 +168,10 @@ func (s *outputRoomEventsStatements) selectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)} + eventIDToEvent[ev.EventID()] = streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + } } return stateNeeded, eventIDToEvent, nil @@ -190,7 +197,14 @@ func (s *outputRoomEventsStatements) selectMaxEventID( func (s *outputRoomEventsStatements) insertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string, + transactionID *api.TransactionID, ) (streamPos int64, err error) { + var deviceID, txnID *string + if transactionID != nil { + deviceID = &transactionID.DeviceID + txnID = &transactionID.TransactionID + } + stmt := common.TxStmt(txn, s.insertEventStmt) err = stmt.QueryRowContext( ctx, @@ -199,6 +213,8 @@ func (s *outputRoomEventsStatements) insertEvent( event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), + deviceID, + txnID, ).Scan(&streamPos) return } @@ -241,10 +257,13 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { var result []streamEvent for rows.Next() { var ( - streamPos int64 - eventBytes []byte + streamPos int64 + eventBytes []byte + deviceID *string + txnID *string + transactionID *api.TransactionID ) - if err := rows.Scan(&streamPos, &eventBytes); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil { return nil, err } // TODO: Handle redacted events @@ -252,7 +271,19 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { if err != nil { return nil, err } - result = append(result, streamEvent{ev, types.StreamPosition(streamPos)}) + + if deviceID != nil && txnID != nil { + transactionID = &api.TransactionID{ + DeviceID: *deviceID, + TransactionID: *txnID, + } + } + + result = append(result, streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + transactionID: transactionID, + }) } return result, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 1a18d9374..8a5b9648d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" "fmt" + + "github.com/matrix-org/dendrite/roomserver/api" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -38,6 +40,7 @@ type stateDelta struct { type streamEvent struct { gomatrixserverlib.Event streamPosition types.StreamPosition + transactionID *api.TransactionID } // SyncServerDatabase represents a sync server database @@ -100,10 +103,11 @@ func (d *SyncServerDatabase) WriteEvent( ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, + transactionID *api.TransactionID, ) (streamPos types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs) + pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID) if err != nil { return err } @@ -565,7 +569,7 @@ func (d *SyncServerDatabase) getStateDeltas( } s := make([]streamEvent, len(allState)) for i := 0; i < len(s); i++ { - s[i] = streamEvent{allState[i], types.StreamPosition(0)} + s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} } state[roomID] = s continue // we'll add this room in when we do joined rooms From 0cc1c2f90972c84d5ee655577923528fafe7d8f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Nov 2017 14:51:08 +0000 Subject: [PATCH 02/10] Add transaction ID to events if sending device --- .../dendrite/syncapi/storage/syncserver.go | 38 +++++++++++++------ .../dendrite/syncapi/sync/notifier.go | 2 +- .../dendrite/syncapi/sync/request.go | 8 ++-- .../dendrite/syncapi/sync/requestpool.go | 8 ++-- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 8a5b9648d..58884b865 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -19,6 +19,7 @@ import ( "database/sql" "fmt" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" // Import the postgres database driver. _ "github.com/lib/pq" @@ -92,7 +93,7 @@ func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]g if err != nil { return nil, err } - return streamEventsToEvents(streamEvents), nil + return streamEventsToEvents(nil, streamEvents), nil } // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races @@ -211,7 +212,7 @@ func (d *SyncServerDatabase) syncStreamPositionTx( // IncrementalSync returns all the data needed in order to create an incremental sync response. func (d *SyncServerDatabase) IncrementalSync( ctx context.Context, - userID string, + device *authtypes.Device, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int, ) (*types.Response, error) { @@ -226,21 +227,21 @@ func (d *SyncServerDatabase) IncrementalSync( // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, txn, fromPos, toPos, userID) + deltas, err := d.getStateDeltas(ctx, device, txn, fromPos, toPos, device.UserID) if err != nil { return nil, err } res := types.NewResponse(toPos) for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + err = d.addRoomDeltaToResponse(ctx, device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err } } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, userID, fromPos, toPos, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { return nil, err } @@ -292,7 +293,7 @@ func (d *SyncServerDatabase) CompleteSync( if err != nil { return nil, err } - recentEvents := streamEventsToEvents(recentStreamEvents) + recentEvents := streamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() @@ -390,7 +391,9 @@ func (d *SyncServerDatabase) addInvitesToResponse( // addRoomDeltaToResponse adds a room state delta to a sync response func (d *SyncServerDatabase) addRoomDeltaToResponse( - ctx context.Context, txn *sql.Tx, + ctx context.Context, + device *authtypes.Device, + txn *sql.Tx, fromPos, toPos types.StreamPosition, delta stateDelta, numRecentEventsPerRoom int, @@ -412,7 +415,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( if err != nil { return err } - recentEvents := streamEventsToEvents(recentStreamEvents) + recentEvents := streamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back // Don't bother appending empty room entries @@ -529,7 +532,7 @@ func (d *SyncServerDatabase) fetchMissingStateEvents( } func (d *SyncServerDatabase) getStateDeltas( - ctx context.Context, txn *sql.Tx, + ctx context.Context, device *authtypes.Device, txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string, ) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 @@ -578,7 +581,7 @@ func (d *SyncServerDatabase) getStateDeltas( deltas = append(deltas, stateDelta{ membership: membership, membershipPos: ev.streamPosition, - stateEvents: streamEventsToEvents(stateStreamEvents), + stateEvents: streamEventsToEvents(device, stateStreamEvents), roomID: roomID, }) break @@ -594,7 +597,7 @@ func (d *SyncServerDatabase) getStateDeltas( for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, stateDelta{ membership: "join", - stateEvents: streamEventsToEvents(state[joinedRoomID]), + stateEvents: streamEventsToEvents(device, state[joinedRoomID]), roomID: joinedRoomID, }) } @@ -602,10 +605,21 @@ func (d *SyncServerDatabase) getStateDeltas( return deltas, nil } -func streamEventsToEvents(in []streamEvent) []gomatrixserverlib.Event { +func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event { out := make([]gomatrixserverlib.Event, len(in)) for i := 0; i < len(in); i++ { out[i] = in[i].Event + if device != nil && in[i].transactionID != nil { + if device.ID == in[i].transactionID.DeviceID { + // TODO: Don't clobber unsigned + ev, err := out[i].SetUnsigned(map[string]string{ + "transaction_id": in[i].transactionID.TransactionID, + }) + if err == nil { + out[i] = ev + } + } + } } return out } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 4712a2c74..5ed701d8e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -123,7 +123,7 @@ func (n *Notifier) GetListener(req syncRequest) UserStreamListener { n.removeEmptyUserStreams() - return n.fetchUserStream(req.userID, true).GetListener(req.ctx) + return n.fetchUserStream(req.device.UserID, true).GetListener(req.ctx) } // Load the membership states required to notify users correctly. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 7f5259814..3c1befddf 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -20,6 +20,8 @@ import ( "strconv" "time" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" @@ -31,7 +33,7 @@ const defaultTimelineLimit = 20 // syncRequest represents a /sync request, with sensible defaults/sanity checks applied. type syncRequest struct { ctx context.Context - userID string + device authtypes.Device limit int timeout time.Duration since *types.StreamPosition // nil means that no since token was supplied @@ -39,7 +41,7 @@ type syncRequest struct { log *log.Entry } -func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { +func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" @@ -50,7 +52,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { // TODO: Additional query params: set_presence, filter return &syncRequest{ ctx: req.Context(), - userID: userID, + device: device, timeout: timeout, since: since, wantFullState: wantFullState, diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 15993b774..e9600243f 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -48,7 +48,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype // Extract values from request logger := util.GetLogger(req.Context()) userID := device.UserID - syncReq, err := newSyncRequest(req, userID) + syncReq, err := newSyncRequest(req, *device) if err != nil { return util.JSONResponse{ Code: 400, @@ -122,16 +122,16 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { // TODO: handle ignored users if req.since == nil { - res, err = rp.db.CompleteSync(req.ctx, req.userID, req.limit) + res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.userID, *req.since, currentPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, &req.device, *req.since, currentPos, req.limit) } if err != nil { return } - res, err = rp.appendAccountData(res, req.userID, req, currentPos) + res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos) return } From 2e0188ac464f20498059c57b919d3b8c9c9f3c05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Dec 2017 15:58:12 +0000 Subject: [PATCH 03/10] gb vendor update github.com/matrix-org/gomatrixserverlib --- vendor/manifest | 2 +- .../matrix-org/gomatrixserverlib/base64.go | 17 ++++++ .../gomatrixserverlib/base64_test.go | 57 +++++++++++++++++++ .../matrix-org/gomatrixserverlib/client.go | 2 +- .../matrix-org/gomatrixserverlib/event.go | 28 +++++++++ .../gomatrixserverlib/event_test.go | 29 ++++++++++ .../matrix-org/gomatrixserverlib/keys.go | 6 +- .../matrix-org/gomatrixserverlib/travis.sh | 20 +++++++ 8 files changed, 157 insertions(+), 4 deletions(-) create mode 100644 vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh diff --git a/vendor/manifest b/vendor/manifest index 830d3e2d7..29af53b30 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -135,7 +135,7 @@ { "importpath": "github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib", - "revision": "8540d3dfc13c797cd3200640bc06e0286ab355aa", + "revision": "b2704c7d9af54b68a4cf07cf6663ee98174a786c", "branch": "master" }, { diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/base64.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/base64.go index 38ae76951..dc895d179 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/base64.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/base64.go @@ -53,6 +53,12 @@ func (b64 Base64String) MarshalJSON() ([]byte, error) { return json.Marshal(b64.Encode()) } +// MarshalYAML implements yaml.Marshaller +// It just encodes the bytes as base64, which is a valid YAML string +func (b64 Base64String) MarshalYAML() (interface{}, error) { + return b64.Encode(), nil +} + // UnmarshalJSON decodes a JSON string and then decodes the resulting base64. // This takes a pointer receiver because it needs to write the result of decoding. func (b64 *Base64String) UnmarshalJSON(raw []byte) (err error) { @@ -65,3 +71,14 @@ func (b64 *Base64String) UnmarshalJSON(raw []byte) (err error) { err = b64.Decode(str) return } + +// UnmarshalYAML implements yaml.Unmarshaller +// it unmarshals the input as a yaml string and then base64-decodes the result +func (b64 *Base64String) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { + var str string + if err = unmarshal(&str); err != nil { + return + } + err = b64.Decode(str) + return +} diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/base64_test.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/base64_test.go index 9ef94046a..e0a8beb0f 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/base64_test.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/base64_test.go @@ -18,6 +18,8 @@ package gomatrixserverlib import ( "encoding/json" "testing" + + "gopkg.in/yaml.v2" ) func TestMarshalBase64(t *testing.T) { @@ -93,3 +95,58 @@ func TestMarshalBase64Slice(t *testing.T) { t.Fatalf("json.Marshal(%v): wanted %q got %q", input, want, string(got)) } } + +func TestMarshalYAMLBase64(t *testing.T) { + input := Base64String("this\xffis\xffa\xfftest") + want := "dGhpc/9pc/9h/3Rlc3Q\n" + got, err := yaml.Marshal(input) + if err != nil { + t.Fatal(err) + } + if string(got) != want { + t.Fatalf("yaml.Marshal(%v): wanted %q got %q", input, want, string(got)) + } +} + +func TestMarshalYAMLBase64Struct(t *testing.T) { + input := struct{ Value Base64String }{Base64String("this\xffis\xffa\xfftest")} + want := "value: dGhpc/9pc/9h/3Rlc3Q\n" + got, err := yaml.Marshal(input) + if err != nil { + t.Fatal(err) + } + if string(got) != want { + t.Fatalf("yaml.Marshal(%v): wanted %q got %q", input, want, string(got)) + } +} + +func TestUnmarshalYAMLBase64(t *testing.T) { + input := []byte("dGhpc/9pc/9h/3Rlc3Q") + want := Base64String("this\xffis\xffa\xfftest") + var got Base64String + err := yaml.Unmarshal(input, &got) + if err != nil { + t.Fatal(err) + } + if string(got) != string(want) { + t.Fatalf("yaml.Unmarshal(%q): wanted %q got %q", string(input), want, string(got)) + } +} + +func TestUnmarshalYAMLBase64Struct(t *testing.T) { + // var u yaml.Unmarshaler + u := Base64String("this\xffis\xffa\xfftest") + + input := []byte(`value: dGhpc/9pc/9h/3Rlc3Q`) + want := struct{ Value Base64String }{u} + result := struct { + Value Base64String `yaml:"value"` + }{} + err := yaml.Unmarshal(input, &result) + if err != nil { + t.Fatal(err) + } + if string(result.Value) != string(want.Value) { + t.Fatalf("yaml.Unmarshal(%v): wanted %q got %q", input, want, result) + } +} diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go index e47be7fdc..20f84283b 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go @@ -80,7 +80,7 @@ func newFederationTripper() *federationTripper { ServerName: "", // TODO: We should be checking that the TLS certificate we see here matches // one of the allowed SHA-256 fingerprints for the server. - InsecureSkipVerify: true, + InsecureSkipVerify: true, // nolint: gas }) if err := conn.Handshake(); err != nil { return nil, err diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go index f873cb5b1..71ec3b5fc 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go @@ -22,6 +22,7 @@ import ( "strings" "time" + "github.com/tidwall/gjson" "github.com/tidwall/sjson" "golang.org/x/crypto/ed25519" ) @@ -306,6 +307,33 @@ func (e Event) SetUnsigned(unsigned interface{}) (Event, error) { return result, nil } +// SetUnsignedField takes a path and value to insert into the unsigned dict of +// the event. +// path is a dot separated path into the unsigned dict (see gjson package +// for details on format). In particular some characters like '.' and '*' must +// be escaped. +func (e *Event) SetUnsignedField(path string, value interface{}) error { + // The safest way is to change the unsigned json and then reparse the + // event fully. But since we are only changing the unsigned section, + // which doesn't affect the signatures or hashes, we can cheat and + // just fiddle those bits directly. + + path = "unsigned." + path + eventJSON, err := sjson.SetBytes(e.eventJSON, path, value) + if err != nil { + return err + } + eventJSON = CanonicalJSONAssumeValid(eventJSON) + + res := gjson.GetBytes(eventJSON, "unsigned") + unsigned := rawJSONFromResult(res, eventJSON) + + e.eventJSON = eventJSON + e.fields.Unsigned = unsigned + + return nil +} + // EventReference returns an EventReference for the event. // The reference can be used to refer to this event from other events. func (e Event) EventReference() EventReference { diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/event_test.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/event_test.go index d5c715da5..cb5d4f84c 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/event_test.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/event_test.go @@ -50,3 +50,32 @@ func BenchmarkParseSmallerEventFailedHash(b *testing.B) { func BenchmarkParseSmallerEventRedacted(b *testing.B) { benchmarkParse(b, `{"event_id":"$yvN1b43rlmcOs5fY:localhost","sender":"@test:localhost","room_id":"!19Mp0U9hjajeIiw1:localhost","hashes":{"sha256":"Oh1mwI1jEqZ3tgJ+V1Dmu5nOEGpCE4RFUqyJv2gQXKs"},"signatures":{"localhost":{"ed25519:u9kP":"5IzSuRXkxvbTp0vZhhXYZeOe+619iG3AybJXr7zfNn/4vHz4TH7qSJVQXSaHHvcTcDodAKHnTG1WDulgO5okAQ"}},"content":{},"type":"m.room.name","state_key":"","depth":7,"prev_events":[["$FqI6TVvWpcbcnJ97:localhost",{"sha256":"upCsBqUhNUgT2/+zkzg8TbqdQpWWKQnZpGJc6KcbUC4"}]],"prev_state":[],"auth_events":[["$oXL79cT7fFxR7dPH:localhost",{"sha256":"abjkiDSg1RkuZrbj2jZoGMlQaaj1Ue3Jhi7I7NlKfXY"}],["$IVUsaSkm1LBAZYYh:localhost",{"sha256":"X7RUj46hM/8sUHNBIFkStbOauPvbDzjSdH4NibYWnko"}],["$VS2QT0EeArZYi8wf:localhost",{"sha256":"k9eM6utkCH8vhLW9/oRsH74jOBS/6RVK42iGDFbylno"}]],"origin":"localhost","origin_server_ts":1510854416361}`) } + +func TestAddUnsignedField(t *testing.T) { + initialEventJSON := `{"auth_events":[["$oXL79cT7fFxR7dPH:localhost",{"sha256":"abjkiDSg1RkuZrbj2jZoGMlQaaj1Ue3Jhi7I7NlKfXY"}],["$IVUsaSkm1LBAZYYh:localhost",{"sha256":"X7RUj46hM/8sUHNBIFkStbOauPvbDzjSdH4NibYWnko"}],["$VS2QT0EeArZYi8wf:localhost",{"sha256":"k9eM6utkCH8vhLW9/oRsH74jOBS/6RVK42iGDFbylno"}]],"content":{"name":"test3"},"depth":7,"event_id":"$yvN1b43rlmcOs5fY:localhost","hashes":{"sha256":"Oh1mwI1jEqZ3tgJ+V1Dmu5nOEGpCE4RFUqyJv2gQXKs"},"origin":"localhost","origin_server_ts":1510854416361,"prev_events":[["$FqI6TVvWpcbcnJ97:localhost",{"sha256":"upCsBqUhNUgT2/+zkzg8TbqdQpWWKQnZpGJc6KcbUC4"}]],"prev_state":[],"room_id":"!19Mp0U9hjajeIiw1:localhost","sender":"@test:localhost","signatures":{"localhost":{"ed25519:u9kP":"5IzSuRXkxvbTp0vZhhXYZeOe+619iG3AybJXr7zfNn/4vHz4TH7qSJVQXSaHHvcTcDodAKHnTG1WDulgO5okAQ"}},"state_key":"","type":"m.room.name"}` + expectedEventJSON := `{"auth_events":[["$oXL79cT7fFxR7dPH:localhost",{"sha256":"abjkiDSg1RkuZrbj2jZoGMlQaaj1Ue3Jhi7I7NlKfXY"}],["$IVUsaSkm1LBAZYYh:localhost",{"sha256":"X7RUj46hM/8sUHNBIFkStbOauPvbDzjSdH4NibYWnko"}],["$VS2QT0EeArZYi8wf:localhost",{"sha256":"k9eM6utkCH8vhLW9/oRsH74jOBS/6RVK42iGDFbylno"}]],"content":{"name":"test3"},"depth":7,"event_id":"$yvN1b43rlmcOs5fY:localhost","hashes":{"sha256":"Oh1mwI1jEqZ3tgJ+V1Dmu5nOEGpCE4RFUqyJv2gQXKs"},"origin":"localhost","origin_server_ts":1510854416361,"prev_events":[["$FqI6TVvWpcbcnJ97:localhost",{"sha256":"upCsBqUhNUgT2/+zkzg8TbqdQpWWKQnZpGJc6KcbUC4"}]],"prev_state":[],"room_id":"!19Mp0U9hjajeIiw1:localhost","sender":"@test:localhost","signatures":{"localhost":{"ed25519:u9kP":"5IzSuRXkxvbTp0vZhhXYZeOe+619iG3AybJXr7zfNn/4vHz4TH7qSJVQXSaHHvcTcDodAKHnTG1WDulgO5okAQ"}},"state_key":"","type":"m.room.name","unsigned":{"foo":"bar","x":1}}` + + var event Event + if err := json.Unmarshal([]byte(initialEventJSON), &event); err != nil { + t.Error("Failed to parse event") + } + + err := event.SetUnsignedField("foo", "bar") + if err != nil { + t.Error("Failed to insert foo") + } + + err = event.SetUnsignedField("x", 1) + if err != nil { + t.Error("Failed to insert x") + } + + bytes, err := json.Marshal(event) + if err != nil { + t.Error("Failed to marshal x") + } + + if expectedEventJSON != string(bytes) { + t.Fatalf("Serialized event does not match expected: %s != %s", string(bytes), initialEventJSON) + } +} diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/keys.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/keys.go index 98ff87fd6..fab72b3d1 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/keys.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/keys.go @@ -110,8 +110,10 @@ func FetchKeysDirect(serverName ServerName, addr, sni string) (*ServerKeys, *tls } defer tcpconn.Close() // nolint: errcheck tlsconn := tls.Client(tcpconn, &tls.Config{ - ServerName: sni, - InsecureSkipVerify: true, // This must be specified even though the TLS library will ignore it. + ServerName: sni, + + // This must be specified even though the TLS library will ignore it. + InsecureSkipVerify: true, // nolint: gas }) if err = tlsconn.Handshake(); err != nil { return nil, nil, err diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh b/vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh new file mode 100644 index 000000000..3ba23b610 --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +set -eux + +cd `dirname $0` + +# -u so that if this is run on a dev box, we get the latest deps, as +# we do on travis. + +go get -u \ + github.com/alecthomas/gometalinter \ + golang.org/x/crypto/ed25519 \ + github.com/matrix-org/util \ + github.com/matrix-org/gomatrix \ + github.com/tidwall/gjson \ + github.com/tidwall/sjson \ + github.com/pkg/errors \ + gopkg.in/yaml.v2 \ + +./hooks/pre-commit From 683ba088e7e210680f71c0dc53f57774096b0124 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Dec 2017 16:05:00 +0000 Subject: [PATCH 04/10] Fix tests --- .../matrix-org/dendrite/syncapi/sync/notifier_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 79c5a2872..4fa543936 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -262,7 +264,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { select { case <-time.After(5 * time.Second): return types.StreamPosition(0), fmt.Errorf( - "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, + "waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since, ) case <-listener.GetNotifyChannel(*req.since): p := listener.GetStreamPosition() @@ -280,7 +282,7 @@ func waitForBlocking(s *UserStream, numBlocking uint) { func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { return syncRequest{ - userID: userID, + device: authtypes.Device{UserID: userID}, timeout: 1 * time.Minute, since: &since, wantFullState: false, From 6cf9fe946606b47806f331282accbb5619a2dcb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Dec 2017 17:01:05 +0000 Subject: [PATCH 05/10] Add comments --- .../dendrite/syncapi/storage/syncserver.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 58884b865..f5d8f6617 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -87,12 +87,16 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms(ctx context.Context) (map[str // Events lookups a list of event by their event ID. // Returns a list of events matching the requested IDs found in the database. // If an event is not found in the database then it will be omitted from the list. -// Returns an error if there was a problem talking with the database +// Returns an error if there was a problem talking with the database. +// Does not include any transaction IDs in the returned events. func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) { streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) if err != nil { return nil, err } + + // We don't include a device here as we only include transaction IDs in + // incremental syncs. return streamEventsToEvents(nil, streamEvents), nil } @@ -209,7 +213,11 @@ func (d *SyncServerDatabase) syncStreamPositionTx( return types.StreamPosition(maxID), nil } -// IncrementalSync returns all the data needed in order to create an incremental sync response. +// IncrementalSync returns all the data needed in order to create an incremental +// sync response for the given device. If the device has a deviceID events +// returned will include any client transaction IDs associated with the device. +// These transaction IDs come from when the device sent the event via an API +// that included a transaction ID. func (d *SyncServerDatabase) IncrementalSync( ctx context.Context, device *authtypes.Device, @@ -293,6 +301,9 @@ func (d *SyncServerDatabase) CompleteSync( if err != nil { return nil, err } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs recentEvents := streamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) @@ -605,6 +616,9 @@ func (d *SyncServerDatabase) getStateDeltas( return deltas, nil } +// streamEventsToEvents converts streamEvent to Event. If device is non-nil and +// matches the streamevent.transactionID device then the transaction ID gets +// added to the unsigned section of the output event. func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event { out := make([]gomatrixserverlib.Event, len(in)) for i := 0; i < len(in); i++ { From a5f130c6703196d6acf0e5039923af50ddd4396e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Dec 2017 17:01:51 +0000 Subject: [PATCH 06/10] Check that UserID matches too --- .../matrix-org/dendrite/syncapi/storage/syncserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index f5d8f6617..4c2bb4f65 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -624,7 +624,7 @@ func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrix for i := 0; i < len(in); i++ { out[i] = in[i].Event if device != nil && in[i].transactionID != nil { - if device.ID == in[i].transactionID.DeviceID { + if device.UserID == in[i].Sender() && device.ID == in[i].transactionID.DeviceID { // TODO: Don't clobber unsigned ev, err := out[i].SetUnsigned(map[string]string{ "transaction_id": in[i].transactionID.TransactionID, From 387afd3ed76d024ae17e31b4e672d645bde64d1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Dec 2017 17:04:16 +0000 Subject: [PATCH 07/10] Use SetUnsignedField and log error --- .../dendrite/syncapi/storage/syncserver.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 4c2bb4f65..bbd1ce66b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -19,6 +19,8 @@ import ( "database/sql" "fmt" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" // Import the postgres database driver. @@ -625,12 +627,13 @@ func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrix out[i] = in[i].Event if device != nil && in[i].transactionID != nil { if device.UserID == in[i].Sender() && device.ID == in[i].transactionID.DeviceID { - // TODO: Don't clobber unsigned - ev, err := out[i].SetUnsigned(map[string]string{ - "transaction_id": in[i].transactionID.TransactionID, - }) - if err == nil { - out[i] = ev + err := out[i].SetUnsignedField( + "transaction_id", in[i].transactionID.TransactionID, + ) + if err != nil { + logrus.WithFields(logrus.Fields{ + "event_id": out[i].EventID(), + }).WithError(err).Warnf("Failed to add transaction ID to event") } } } From 8bd2dbdb56a28a2389b6196e28c68184e24fcbeb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Dec 2017 10:45:38 +0000 Subject: [PATCH 08/10] Update comment --- .../matrix-org/dendrite/syncapi/storage/syncserver.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index bbd1ce66b..121d87db7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -216,10 +216,10 @@ func (d *SyncServerDatabase) syncStreamPositionTx( } // IncrementalSync returns all the data needed in order to create an incremental -// sync response for the given device. If the device has a deviceID events -// returned will include any client transaction IDs associated with the device. -// These transaction IDs come from when the device sent the event via an API -// that included a transaction ID. +// sync response for tshe given user. Events returned will include any client +// transaction IDs associated with the given device. These transaction IDs come +// from when the device sent the event via an API that included a transaction +// ID. func (d *SyncServerDatabase) IncrementalSync( ctx context.Context, device *authtypes.Device, From 28a0522b8cebafc31a762022863df2740ee53a31 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Dec 2017 13:16:46 +0000 Subject: [PATCH 09/10] Typo --- .../matrix-org/dendrite/syncapi/storage/syncserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 121d87db7..8dc2e7aa5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -216,7 +216,7 @@ func (d *SyncServerDatabase) syncStreamPositionTx( } // IncrementalSync returns all the data needed in order to create an incremental -// sync response for tshe given user. Events returned will include any client +// sync response for the given user. Events returned will include any client // transaction IDs associated with the given device. These transaction IDs come // from when the device sent the event via an API that included a transaction // ID. From 90a37dc766e9656582bc819acd96a683ac4f3eff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Dec 2017 15:06:04 +0000 Subject: [PATCH 10/10] Don't use a pointer for device --- .../matrix-org/dendrite/syncapi/storage/syncserver.go | 6 +++--- .../matrix-org/dendrite/syncapi/sync/requestpool.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 8dc2e7aa5..84417a348 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -222,7 +222,7 @@ func (d *SyncServerDatabase) syncStreamPositionTx( // ID. func (d *SyncServerDatabase) IncrementalSync( ctx context.Context, - device *authtypes.Device, + device authtypes.Device, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int, ) (*types.Response, error) { @@ -237,14 +237,14 @@ func (d *SyncServerDatabase) IncrementalSync( // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, device, txn, fromPos, toPos, device.UserID) + deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) if err != nil { return nil, err } res := types.NewResponse(toPos) for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index e9600243f..703ddd3f1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -124,7 +124,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, &req.device, *req.since, currentPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit) } if err != nil {