diff --git a/.travis.yml b/.travis.yml index 0d6a22b7e..37d0d52a4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: - - 1.7 + - 1.8 sudo: false diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index f67bf0e5e..5d1bf8e2e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -78,12 +78,15 @@ func main() { } n := sync.NewNotifier(types.StreamPosition(pos)) - server, err := consumers.NewServer(cfg, n, db) - if err != nil { - log.Panicf("startup: failed to create sync server: %s", err) + if err := n.Load(db); err != nil { + log.Panicf("startup: failed to set up notifier: %s", err) } - if err = server.Start(); err != nil { - log.Panicf("startup: failed to start sync server") + consumer, err := consumers.NewOutputRoomEvent(cfg, n, db) + if err != nil { + log.Panicf("startup: failed to create room server consumer: %s", err) + } + if err = consumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer") } log.Info("Starting sync server on ", *bindAddr) diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index 7e5ae648f..80aaf2427 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -15,6 +15,7 @@ package main import ( + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -26,6 +27,7 @@ import ( "time" "github.com/matrix-org/dendrite/common/test" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -91,6 +93,7 @@ func defaulting(value, defaultValue string) string { } var timeout time.Duration +var clientEventTestData []string func init() { var err error @@ -98,6 +101,10 @@ func init() { if err != nil { panic(err) } + + for _, s := range outputRoomEventTestData { + clientEventTestData = append(clientEventTestData, clientEventJSONForOutputRoomEvent(s)) + } } // TODO: dupes roomserver integration tests. Factor out. @@ -125,6 +132,30 @@ func canonicalJSONInput(jsonData []string) []string { return jsonData } +// clientEventJSONForOutputRoomEvent parses the given output room event and extracts the 'Event' JSON. It is +// trimmed to the client format and then canonicalised and returned as a string. +// Panics if there are any problems. +func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string { + var out api.OutputRoomEvent + if err := json.Unmarshal([]byte(outputRoomEvent), &out); err != nil { + panic("failed to unmarshal output room event: " + err.Error()) + } + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(out.Event, false) + if err != nil { + panic("failed to convert event field in output room event to Event: " + err.Error()) + } + clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{ev}, gomatrixserverlib.FormatSync) + b, err := json.Marshal(clientEvs[0]) + if err != nil { + panic("failed to marshal client event as json: " + err.Error()) + } + jsonBytes, err := gomatrixserverlib.CanonicalJSON(b) + if err != nil { + panic("failed to turn event json into canonical json: " + err.Error()) + } + return string(jsonBytes) +} + // doSyncRequest does a /sync request and returns an error if it fails or doesn't // return the wanted string. func doSyncRequest(syncServerURL, want string) error { @@ -156,10 +187,15 @@ func doSyncRequest(syncServerURL, want string) error { // syncRequestUntilSuccess blocks and performs the same /sync request over and over until // the response returns the wanted string, where it will close the given channel and return. // It will keep track of the last error in `lastRequestErr`. -func syncRequestUntilSuccess(done chan error, want string, since string) { +func syncRequestUntilSuccess(done chan error, userID, since, want string) { for { + sinceQuery := "" + if since != "" { + sinceQuery = "&since=" + since + } err := doSyncRequest( - "http://"+syncserverAddr+"/api/_matrix/client/r0/sync?access_token=@alice:localhost&since="+since, + // low value timeout so polling with an up-to-date token returns quickly + "http://"+syncserverAddr+"/api/_matrix/client/r0/sync?timeout=100&access_token="+userID+sinceQuery, want, ) if err != nil { @@ -172,9 +208,10 @@ func syncRequestUntilSuccess(done chan error, want string, since string) { } } -// prepareSyncServer creates the database and config file needed for the sync server to run. -// It also prepares the CLI command to execute. -func prepareSyncServer() *exec.Cmd { +// startSyncServer creates the database and config file needed for the sync server to run and +// then starts the sync server. The Cmd being executed is returned. A channel is also returned, +// which will have any termination errors sent down it, followed immediately by the channel being closed. +func startSyncServer() (*exec.Cmd, chan error) { if err := createDatabase(testDatabaseName); err != nil { panic(err) } @@ -192,23 +229,28 @@ func prepareSyncServer() *exec.Cmd { ) cmd.Stderr = os.Stderr cmd.Stdout = os.Stderr - return cmd + + if err := cmd.Start(); err != nil { + panic("failed to start sync server: " + err.Error()) + } + syncServerCmdChan := make(chan error, 1) + go func() { + syncServerCmdChan <- cmd.Wait() + close(syncServerCmdChan) + }() + return cmd, syncServerCmdChan } -func testSyncServer(input, want []string, since string) { - // Write the logs to kafka so the sync server has some data to work with. +// prepareKafka creates the topics which will be written to by the tests. +func prepareKafka() { exe.DeleteTopic(inputTopic) if err := exe.CreateTopic(inputTopic); err != nil { panic(err) } - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { - panic(err) - } +} - cmd := prepareSyncServer() - if err := cmd.Start(); err != nil { - panic("failed to start sync server: " + err.Error()) - } +func testSyncServer(syncServerCmdChan chan error, userID, since, want string) { + fmt.Printf("==TESTING== testSyncServer(%s,%s)\n", userID, since) done := make(chan error, 1) // We need to wait for the sync server to: @@ -220,314 +262,382 @@ func testSyncServer(input, want []string, since string) { // We can't even wait for the first valid 200 OK response because it's possible to race // with consuming the kafka logs (so the /sync response will be missing events and // therefore fail the test). - go syncRequestUntilSuccess(done, want[0], since) + go syncRequestUntilSuccess(done, userID, since, canonicalJSONInput([]string{want})[0]) + + // wait for one of: + // - the test to pass (done channel is closed) + // - the sync server to exit with an error (error sent on syncServerCmdChan) + // - our test timeout to expire + // We don't need to clean up since the main() function handles that in the event we panic + var testPassed bool - // wait for the sync server to exit or our test timeout to expire - go func() { - done <- cmd.Wait() - }() select { case <-time.After(timeout): + if testPassed { + break + } + fmt.Printf("==TESTING== testSyncServer(%s,%s) TIMEOUT\n", userID, since) if reqErr := getLastRequestError(); reqErr != nil { fmt.Println("Last /sync request error:") fmt.Println(reqErr) } - - if err := cmd.Process.Kill(); err != nil { - panic(err) - } panic("dendrite-sync-api-server timed out") - case err, open := <-done: - cmd.Process.Kill() // ensure server is dead, only cleaning up so don't care about errors this returns. - if open { // channel is closed on success + case err := <-syncServerCmdChan: + if err != nil { fmt.Println("=============================================================================================") fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:") fmt.Println(" export PGHOST=/var/run/postgresql\n") fmt.Println("=============================================================================================") panic(err) } + case <-done: + testPassed = true + fmt.Printf("==TESTING== testSyncServer(%s,%s) PASSED\n", userID, since) } } +func writeToRoomServerLog(indexes ...int) { + var roomEvents []string + for _, i := range indexes { + roomEvents = append(roomEvents, outputRoomEventTestData[i]) + } + if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(roomEvents)); err != nil { + panic(err) + } +} + +// Runs a battery of sync server tests against test data in testdata.go +// testdata.go has a list of OutputRoomEvents which will be fed into the kafka log which the sync server will consume. +// The tests will pause at various points in this list to conduct tests on the /sync responses before continuing. +// For ease of understanding, the curl commands used to create the OutputRoomEvents are listed along with each write to kafka. func main() { fmt.Println("==TESTING==", os.Args[0]) - // room creation for @alice:localhost - input := []string{ - `{ - "Event": { - "auth_events": [], - "content": { - "creator": "@alice:localhost" - }, - "depth": 1, - "event_id": "$rOaxKSu6K1s0nOsW:localhost", - "hashes": { - "sha256": "g1QC1jZauIcVw+HCGizUqlUaLSmAkEGwGmIcLac5TKk" - }, - "origin": "localhost", - "origin_server_ts": 1493908927170, - "prev_events": [], - "room_id": "!gnrFfNAK7yGBWXFd:localhost", - "sender": "@alice:localhost", - "signatures": { - "localhost": { - "ed25519:something": "WCaImDmpkhNCCoUyRHcrV93SeJpJbq34yWbtjBgNNXVJaoiLSTys6t/gCvVqNYfX6Dt9c+z/sx5LikOLmLm1Dg" - } - }, - "state_key": "", - "type": "m.room.create" - }, - "VisibilityEventIDs": null, - "LatestEventIDs": ["$rOaxKSu6K1s0nOsW:localhost"], - "AddsStateEventIDs": ["$rOaxKSu6K1s0nOsW:localhost"], - "RemovesStateEventIDs": null, - "LastSentEventID": "" - }`, - `{ - "Event": { - "auth_events": [ - ["$rOaxKSu6K1s0nOsW:localhost", { - "sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg" - }] - ], - "content": { - "membership": "join" - }, - "depth": 2, - "event_id": "$uEDYwFpBO936HTfM:localhost", - "hashes": { - "sha256": "y5AQAnnzremC678QTIFEi677wdbMwluPiweZnuvUmz0" - }, - "origin": "localhost", - "origin_server_ts": 1493908927170, - "prev_events": [ - ["$rOaxKSu6K1s0nOsW:localhost", { - "sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg" - }] - ], - "room_id": "!gnrFfNAK7yGBWXFd:localhost", - "sender": "@alice:localhost", - "signatures": { - "localhost": { - "ed25519:something": "5Pl8GkgcyUu2QY7T38OkuufVQQV13f0kl2PLFI2OILBIcy0XPf8hSaFclemYckoo2nRgffIzsHO/ZgqfoBu0BA" - } - }, - "state_key": "@alice:localhost", - "type": "m.room.member" - }, - "VisibilityEventIDs": null, - "LatestEventIDs": ["$uEDYwFpBO936HTfM:localhost"], - "AddsStateEventIDs": ["$uEDYwFpBO936HTfM:localhost"], - "RemovesStateEventIDs": null, - "LastSentEventID": "$rOaxKSu6K1s0nOsW:localhost" - }`, - `{ - "Event": { - "auth_events": [ - ["$rOaxKSu6K1s0nOsW:localhost", { - "sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg" - }], - ["$uEDYwFpBO936HTfM:localhost", { - "sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU" - }] - ], - "content": { - "ban": 50, - "events": { - "m.room.avatar": 50, - "m.room.canonical_alias": 50, - "m.room.history_visibility": 100, - "m.room.name": 50, - "m.room.power_levels": 100 - }, - "events_default": 0, - "invite": 0, - "kick": 50, - "redact": 50, - "state_default": 50, - "users": { - "@alice:localhost": 100 - }, - "users_default": 0 - }, - "depth": 3, - "event_id": "$Axp7qdQXf0bz7zBy:localhost", - "hashes": { - "sha256": "oObDsGkeVtQgyVPauoLIqk+J+Jsz6HOol79uRMTRFFM" - }, - "origin": "localhost", - "origin_server_ts": 1493908927171, - "prev_events": [ - ["$uEDYwFpBO936HTfM:localhost", { - "sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU" - }] - ], - "room_id": "!gnrFfNAK7yGBWXFd:localhost", - "sender": "@alice:localhost", - "signatures": { - "localhost": { - "ed25519:something": "3kV1Wm2E1zUPQ8YUIC1x/8ks1SGvXE0olQ+b0BRMJm7fduY2fNcb/4A4aKbQLRtOwvCNUVuqQkkkdp1Zor1LCw" - } - }, - "state_key": "", - "type": "m.room.power_levels" - }, - "VisibilityEventIDs": null, - "LatestEventIDs": ["$Axp7qdQXf0bz7zBy:localhost"], - "AddsStateEventIDs": ["$Axp7qdQXf0bz7zBy:localhost"], - "RemovesStateEventIDs": null, - "LastSentEventID": "$uEDYwFpBO936HTfM:localhost" - }`, - `{ - "Event": { - "auth_events": [ - ["$rOaxKSu6K1s0nOsW:localhost", { - "sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg" - }], - ["$Axp7qdQXf0bz7zBy:localhost", { - "sha256": "5KIh9uRcgXuiYdO965JSfIOSGeMrasf8N9eEzxisErI" - }], - ["$uEDYwFpBO936HTfM:localhost", { - "sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU" - }] - ], - "content": { - "join_rule": "public" - }, - "depth": 4, - "event_id": "$zCgCrw3aZwVaKm34:localhost", - "hashes": { - "sha256": "KmJ7wAUznMy74MhAB3iDsBdFAkGypWXamDDQeLVzp1w" - }, - "origin": "localhost", - "origin_server_ts": 1493908927172, - "prev_events": [ - ["$Axp7qdQXf0bz7zBy:localhost", { - "sha256": "5KIh9uRcgXuiYdO965JSfIOSGeMrasf8N9eEzxisErI" - }] - ], - "room_id": "!gnrFfNAK7yGBWXFd:localhost", - "sender": "@alice:localhost", - "signatures": { - "localhost": { - "ed25519:something": "BkqU/1QARxNWEDfgKenvrhhGd6nmNZYHugHB0kFqUSQRZo+RV/zThLA0FxMXfmbGqfJdi1wXmxIR3QIwvGuhCg" - } - }, - "state_key": "", - "type": "m.room.join_rules" - }, - "VisibilityEventIDs": null, - "LatestEventIDs": ["$zCgCrw3aZwVaKm34:localhost"], - "AddsStateEventIDs": ["$zCgCrw3aZwVaKm34:localhost"], - "RemovesStateEventIDs": null, - "LastSentEventID": "$Axp7qdQXf0bz7zBy:localhost" - }`, - `{ - "Event": { - "auth_events": [ - ["$rOaxKSu6K1s0nOsW:localhost", { - "sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg" - }], - ["$Axp7qdQXf0bz7zBy:localhost", { - "sha256": "5KIh9uRcgXuiYdO965JSfIOSGeMrasf8N9eEzxisErI" - }], - ["$uEDYwFpBO936HTfM:localhost", { - "sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU" - }] - ], - "content": { - "history_visibility": "joined" - }, - "depth": 5, - "event_id": "$0NUtdnY7KWMhOR9E:localhost", - "hashes": { - "sha256": "9CBp3jcnGKzoKCVYRCFCoe0CJ8IfZZAOhudAoDr2jqU" - }, - "origin": "localhost", - "origin_server_ts": 1493908927174, - "prev_events": [ - ["$zCgCrw3aZwVaKm34:localhost", { - "sha256": "8kNj8j5K6YFWpFa0CLy1pR5Lp9nao0X6TW2iUIya2Tc" - }] - ], - "room_id": "!gnrFfNAK7yGBWXFd:localhost", - "sender": "@alice:localhost", - "signatures": { - "localhost": { - "ed25519:something": "92Dz7JXAxuc87L3+jMps0HC6Z4V5PhMZQIomI8Dod/im1bkfhYUPMOF5EWWMGMDSq+mSpJPVizWAIGa8bIFcDA" - } - }, - "state_key": "", - "type": "m.room.history_visibility" - }, - "VisibilityEventIDs": null, - "LatestEventIDs": ["$0NUtdnY7KWMhOR9E:localhost"], - "AddsStateEventIDs": ["$0NUtdnY7KWMhOR9E:localhost"], - "RemovesStateEventIDs": null, - "LastSentEventID": "$zCgCrw3aZwVaKm34:localhost" - }`, - } - since := "3" - want := []string{ - `{ - "next_batch": "5", + prepareKafka() + cmd, syncServerCmdChan := startSyncServer() + defer cmd.Process.Kill() // ensure server is dead, only cleaning up so don't care about errors this returns. + + // $ curl -XPOST -d '{}' "http://localhost:8009/_matrix/client/r0/createRoom?access_token=@alice:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@alice:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 2"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 3"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + // $ curl -XPUT -d '{"name":"Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + writeToRoomServerLog( + i0StateRoomCreate, i1StateAliceJoin, i2StatePowerLevels, i3StateJoinRules, i4StateHistoryVisibility, + i5AliceMsg, i6AliceMsg, i7AliceMsg, i8StateAliceRoomName, + ) + + // Make sure initial sync works TODO: prev_batch + testSyncServer(syncServerCmdChan, "@alice:localhost", "", `{ "account_data": { "events": [] }, + "next_batch": "9", "presence": { "events": [] }, "rooms": { + "invite": {}, "join": { - "!gnrFfNAK7yGBWXFd:localhost": { - "state": { - "events": [{ - "content": { - "join_rule": "public" - }, - "event_id": "$zCgCrw3aZwVaKm34:localhost", - "origin_server_ts": 1493908927172, - "sender": "@alice:localhost", - "state_key": "", - "type": "m.room.join_rules" - }] - }, - "timeline": { - "events": [{ - "content": { - "join_rule": "public" - }, - "event_id": "$zCgCrw3aZwVaKm34:localhost", - "origin_server_ts": 1493908927172, - "sender": "@alice:localhost", - "state_key": "", - "type": "m.room.join_rules" - }, { - "content": { - "history_visibility": "joined" - }, - "event_id": "$0NUtdnY7KWMhOR9E:localhost", - "origin_server_ts": 1493908927174, - "sender": "@alice:localhost", - "state_key": "", - "type": "m.room.history_visibility" - }], - "limited": false, - "prev_batch": "" + "!PjrbIMW2cIiaYF4t:localhost": { + "account_data": { + "events": [] }, "ephemeral": { "events": [] }, + "state": { + "events": [] + }, + "timeline": { + "events": [`+ + clientEventTestData[i0StateRoomCreate]+","+ + clientEventTestData[i1StateAliceJoin]+","+ + clientEventTestData[i2StatePowerLevels]+","+ + clientEventTestData[i3StateJoinRules]+","+ + clientEventTestData[i4StateHistoryVisibility]+","+ + clientEventTestData[i5AliceMsg]+","+ + clientEventTestData[i6AliceMsg]+","+ + clientEventTestData[i7AliceMsg]+","+ + clientEventTestData[i8StateAliceRoomName]+`], + "limited": true, + "prev_batch": "" + } + } + }, + "leave": {} + } + }`) + // Make sure alice's rooms don't leak to bob + testSyncServer(syncServerCmdChan, "@bob:localhost", "", `{ + "account_data": { + "events": [] + }, + "next_batch": "9", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": {} + } + }`) + // Make sure polling with an up-to-date token returns nothing new + testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{ + "account_data": { + "events": [] + }, + "next_batch": "9", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": {} + } + }`) + + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" + writeToRoomServerLog(i9StateBobJoin) + + // Make sure alice sees it TODO: prev_batch + testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{ + "account_data": { + "events": [] + }, + "next_batch": "10", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": { + "!PjrbIMW2cIiaYF4t:localhost": { "account_data": { "events": [] + }, + "ephemeral": { + "events": [] + }, + "state": { + "events": [] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+clientEventTestData[i9StateBobJoin]+`] + } + } + }, + "leave": {} + } + }`) + + // Make sure bob sees the room AND all the current room state TODO: history visibility + testSyncServer(syncServerCmdChan, "@bob:localhost", "9", `{ + "account_data": { + "events": [] + }, + "next_batch": "10", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": { + "!PjrbIMW2cIiaYF4t:localhost": { + "account_data": { + "events": [] + }, + "ephemeral": { + "events": [] + }, + "state": { + "events": [`+ + clientEventTestData[i0StateRoomCreate]+","+ + clientEventTestData[i1StateAliceJoin]+","+ + clientEventTestData[i2StatePowerLevels]+","+ + clientEventTestData[i3StateJoinRules]+","+ + clientEventTestData[i4StateHistoryVisibility]+","+ + clientEventTestData[i8StateAliceRoomName]+`] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[i9StateBobJoin]+`] + } + } + }, + "leave": {} + } + }`) + + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost" + writeToRoomServerLog(i10BobMsg) + + // Make sure alice can see everything around the join point for bob TODO: prev_batch + testSyncServer(syncServerCmdChan, "@alice:localhost", "7", `{ + "account_data": { + "events": [] + }, + "next_batch": "11", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": { + "!PjrbIMW2cIiaYF4t:localhost": { + "account_data": { + "events": [] + }, + "ephemeral": { + "events": [] + }, + "state": { + "events": [] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[i7AliceMsg]+","+ + clientEventTestData[i8StateAliceRoomName]+","+ + clientEventTestData[i9StateBobJoin]+","+ + clientEventTestData[i10BobMsg]+`] + } + } + }, + "leave": {} + } + }`) + + // $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" + // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost" + writeToRoomServerLog(i11StateAliceRoomName, i12AliceMsg, i13StateBobInviteCharlie) + + // Make sure charlie sees the invite both with and without a ?since= token + // TODO: Invite state should include the invite event and the room name. + charlieInviteData := `{ + "account_data": { + "events": [] + }, + "next_batch": "14", + "presence": { + "events": [] + }, + "rooms": { + "invite": { + "!PjrbIMW2cIiaYF4t:localhost": { + "invite_state": { + "events": [] } } }, - "invite": {}, + "join": {}, "leave": {} } - }`, - } - want = canonicalJSONInput(want) - testSyncServer(input, want, since) + }` + testSyncServer(syncServerCmdChan, "@charlie:localhost", "7", charlieInviteData) + testSyncServer(syncServerCmdChan, "@charlie:localhost", "", charlieInviteData) + + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" + writeToRoomServerLog(i14StateCharlieJoin, i15AliceMsg, i16StateAliceKickCharlie, i17BobMsg) + + // Check transitions to leave work + testSyncServer(syncServerCmdChan, "@charlie:localhost", "15", `{ + "account_data": { + "events": [] + }, + "next_batch": "18", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": { + "!PjrbIMW2cIiaYF4t:localhost": { + "state": { + "events": [] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[i15AliceMsg]+","+ + clientEventTestData[i16StateAliceKickCharlie]+`] + } + } + } + } + }`) + + // Test joining and leaving the same room in a single /sync request puts the room in the 'leave' section. + // TODO: Use an earlier since value to assert that the /sync response doesn't leak messages + // from before charlie was joined to the room. Currently it does leak because RecentEvents doesn't + // take membership into account. + testSyncServer(syncServerCmdChan, "@charlie:localhost", "14", `{ + "account_data": { + "events": [] + }, + "next_batch": "18", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": { + "!PjrbIMW2cIiaYF4t:localhost": { + "state": { + "events": [] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[i14StateCharlieJoin]+","+ + clientEventTestData[i15AliceMsg]+","+ + clientEventTestData[i16StateAliceKickCharlie]+`] + } + } + } + } + }`) + + // $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + writeToRoomServerLog(i18StateAliceRoomName) + + // Check that users don't see state changes in rooms after they have left + testSyncServer(syncServerCmdChan, "@charlie:localhost", "17", `{ + "account_data": { + "events": [] + }, + "next_batch": "19", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": {}, + "leave": {} + } + }`) + + // $ curl -XPUT -d '{"msgtype":"m.text","body":"whatever"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" + // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"im alone now"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@alice:localhost" + // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"so alone"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + // $ curl -XPUT -d '{"name":"Everyone welcome"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hiiiii"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@charlie:localhost" } diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go new file mode 100644 index 000000000..7f241e422 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/testdata.go @@ -0,0 +1,101 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +const ( + i0StateRoomCreate = iota + i1StateAliceJoin + i2StatePowerLevels + i3StateJoinRules + i4StateHistoryVisibility + i5AliceMsg + i6AliceMsg + i7AliceMsg + i8StateAliceRoomName + i9StateBobJoin + i10BobMsg + i11StateAliceRoomName + i12AliceMsg + i13StateBobInviteCharlie + i14StateCharlieJoin + i15AliceMsg + i16StateAliceKickCharlie + i17BobMsg + i18StateAliceRoomName + i19BobMsg + i20StateBobLeave + i21AliceMsg + i22StateAliceInviteBob + i23StateBobRejectInvite + i24AliceMsg + i25StateAliceRoomName + i26StateCharlieJoin + i27CharlieMsg +) + +var outputRoomEventTestData = []string{ + // $ curl -XPOST -d '{}' "http://localhost:8009/_matrix/client/r0/createRoom?access_token=@alice:localhost" + `{"Event":{"auth_events":[],"content":{"creator":"@alice:localhost"},"depth":1,"event_id":"$xz0fUB8zNMTGFh1W:localhost","hashes":{"sha256":"KKkpxS8NoH0igBbL3J+nJ39MRlmA7QgW4BGL7Fv4ASI"},"origin":"localhost","origin_server_ts":1494411218382,"prev_events":[],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"uZG5Q/Hs2Z611gFlZPdwomomRJKf70xV2FQV+gLWM1XgzkLDRlRF3cBZc9y3CnHKnV/upTcXs7Op2/GmgD3UBw"}},"state_key":"","type":"m.room.create"},"VisibilityEventIDs":null,"LatestEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"AddsStateEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":""}`, + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}]],"content":{"membership":"join"},"depth":2,"event_id":"$QTen1vksfcRTpUCk:localhost","hashes":{"sha256":"tTukc9ab1fJfzgc5EMA/UD3swqfl/ic9Y9Zkt4fJo0Q"},"origin":"localhost","origin_server_ts":1494411218385,"prev_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"OPysDn/wT7yHeALXLTcEgR+iaKjv0p7VPuR/Mzvyg2IMAwPUjSOw8SQZlhSioWRtVPUp9VHbhIhJxQaPUg9yBQ"}},"state_key":"@alice:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$QTen1vksfcRTpUCk:localhost"],"AddsStateEventIDs":["$QTen1vksfcRTpUCk:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$xz0fUB8zNMTGFh1W:localhost"}`, + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"content":{"ban":50,"events":{"m.room.avatar":50,"m.room.canonical_alias":50,"m.room.history_visibility":100,"m.room.name":50,"m.room.power_levels":100},"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":{"@alice:localhost":100},"users_default":0},"depth":3,"event_id":"$RWsxGlfPHAcijTgu:localhost","hashes":{"sha256":"ueZWiL/Q8bagRQGFktpnYJAJV6V6U3QKcUEmWYeyaaM"},"origin":"localhost","origin_server_ts":1494411218385,"prev_events":[["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"hZwWx3lyW61zMYmqLOxLTlfW2CnbjJQsZPLjZFa97TVG4ISz8CixMPsnVAIu5is29UCmiHyP8RvLecJjbLCtAQ"}},"state_key":"","type":"m.room.power_levels"},"VisibilityEventIDs":null,"LatestEventIDs":["$RWsxGlfPHAcijTgu:localhost"],"AddsStateEventIDs":["$RWsxGlfPHAcijTgu:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$QTen1vksfcRTpUCk:localhost"}`, + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"content":{"join_rule":"public"},"depth":4,"event_id":"$2O2DpHB37CuwwJOe:localhost","hashes":{"sha256":"3P3HxAXI8gc094i020EoV/gissYiMVWv8+JAbrakM4E"},"origin":"localhost","origin_server_ts":1494411218386,"prev_events":[["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"L2yZoBbG/6TNsRHz+UtHY0SK4FgrdAYPR1l7RBWaNFbm+k/7kVhnoGlJ9yptpdLJjPMR2InqKXH8BBxRC83BCg"}},"state_key":"","type":"m.room.join_rules"},"VisibilityEventIDs":null,"LatestEventIDs":["$2O2DpHB37CuwwJOe:localhost"],"AddsStateEventIDs":["$2O2DpHB37CuwwJOe:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$RWsxGlfPHAcijTgu:localhost"}`, + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"content":{"history_visibility":"joined"},"depth":5,"event_id":"$5LRiBskVCROnL5WY:localhost","hashes":{"sha256":"341alVufcKSVKLPr9WsJNTnW33QkBTn9eTfVWbyoa0o"},"origin":"localhost","origin_server_ts":1494411218387,"prev_events":[["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"kRyt68cstwYgK8NtYzf0V5CnAbqUO47ixCCWYzRCi0WNstEwUw4XW1GHc8BllQsXwSj+nNv9g/66zZgG0DtxCA"}},"state_key":"","type":"m.room.history_visibility"},"VisibilityEventIDs":null,"LatestEventIDs":["$5LRiBskVCROnL5WY:localhost"],"AddsStateEventIDs":["$5LRiBskVCROnL5WY:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$2O2DpHB37CuwwJOe:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello world","msgtype":"m.text"},"depth":0,"event_id":"$Z8ZJik7ghwzSYTH9:localhost","hashes":{"sha256":"ahN1T5aiSZCzllf0pqNWJkF+x2h2S3kic+40pQ1X6BE"},"origin":"localhost","origin_server_ts":1494411339207,"prev_events":[["$5LRiBskVCROnL5WY:localhost",{"sha256":"3jULNC9b9Q0AhvnDQqpjhbtYwmkioHzPzdTJZvn8vOI"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"ylEpahRwEfGpqk+UCv0IF8YAxmut7w7udgHy3sVDfdJhs/4uJ6EkFEsKLknpXRc1vTIy1etKCBQ63QbCmRC2Bw"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$Z8ZJik7ghwzSYTH9:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$5LRiBskVCROnL5WY:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 2"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello world 2","msgtype":"m.text"},"depth":0,"event_id":"$8382Ah682eL4hxjN:localhost","hashes":{"sha256":"hQElDGSYc6KOdylrbMMm3+LlvUiCKo6S9G9n58/qtns"},"origin":"localhost","origin_server_ts":1494411380282,"prev_events":[["$Z8ZJik7ghwzSYTH9:localhost",{"sha256":"FBDwP+2FeqDENe7AEa3iAFAVKl1/IVq43mCH0uPRn90"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"LFXi6jTG7qn9xzi4rhIiHbkLD+4AZ9Yg7UTS2gqm1gt2lXQsgTYH1wE4Fol2fq4lvGlQVpxhtEr2huAYSbT7DA"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$8382Ah682eL4hxjN:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$Z8ZJik7ghwzSYTH9:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 3"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello world 3","msgtype":"m.text"},"depth":0,"event_id":"$17SfHsvSeTQthSWF:localhost","hashes":{"sha256":"eS6VFQI0l2U8rA8U17jgSHr9lQ73SNSnlnZu+HD0IjE"},"origin":"localhost","origin_server_ts":1494411396560,"prev_events":[["$8382Ah682eL4hxjN:localhost",{"sha256":"c6I/PUY7WnvxQ+oUEp/w2HEEuD3g8Vq7QwPUOSUjuc8"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"dvu9bSHZmX+yZoEqHioK7YDMtLH9kol0DdFqc5aHsbhZe/fKRZpfJMrlf1iXQdXSCMhikvnboPAXN3guiZCUBQ"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$17SfHsvSeTQthSWF:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$8382Ah682eL4hxjN:localhost"}`, + // $ curl -XPUT -d '{"name":"Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"Custom Room Name"},"depth":0,"event_id":"$j7KtuOzM0K15h3Kr:localhost","hashes":{"sha256":"QIKj5Klr50ugll4EjaNUATJmrru4CDp6TvGPv0v15bo"},"origin":"localhost","origin_server_ts":1494411482625,"prev_events":[["$17SfHsvSeTQthSWF:localhost",{"sha256":"iMTefewJ4W5sKQy7osQv4ilJAi7X0NsK791kqEUmYX0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"WU7lwSWUAk7bsyDnBs128PyXxPZZoD1sN4AiDcvk+W1mDezJbFvWHDWymclxWESlP7TDrFTZEumRWGGCakjyAg"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$j7KtuOzM0K15h3Kr:localhost"],"AddsStateEventIDs":["$j7KtuOzM0K15h3Kr:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$17SfHsvSeTQthSWF:localhost"}`, + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}]],"content":{"membership":"join"},"depth":0,"event_id":"$wPepDhIla765Odre:localhost","hashes":{"sha256":"KeKqWLvM+LTvyFbwx6y3Y4W5Pj6nBSFUQ6jpkSf1oTE"},"origin":"localhost","origin_server_ts":1494411534290,"prev_events":[["$j7KtuOzM0K15h3Kr:localhost",{"sha256":"oDrWG5/sy1Ea3hYDOSJZRuGKCcjaHQlDYPDn2gB0/L0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"oVtvjZbWFe+iJhoDvLcQKnFpSYQ94dOodM4gGsx26P6fs2sFJissYwSIqpoxlElCJnmBAgy5iv4JK/5x21R2CQ"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$wPepDhIla765Odre:localhost"],"AddsStateEventIDs":["$wPepDhIla765Odre:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$j7KtuOzM0K15h3Kr:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"hello alice","msgtype":"m.text"},"depth":0,"event_id":"$RHNjeYUvXVZfb93t:localhost","hashes":{"sha256":"Ic1QLxTWFrWt1o31DS93ftrNHkunf4O6ubFvdD4ydNI"},"origin":"localhost","origin_server_ts":1494411593196,"prev_events":[["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"8BHHkiThWwiIZbXCegRjIKNVGIa2kqrZW8VuL7nASfJBORhZ9R9p34UsmhsxVwTs/2/dX7M2ogMB28gIGdLQCg"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$RHNjeYUvXVZfb93t:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$wPepDhIla765Odre:localhost"}`, + // $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"A Different Custom Room Name"},"depth":0,"event_id":"$1xoUuqOFjFFJgwA5:localhost","hashes":{"sha256":"2pNnLhoHxNeSUpqxrd3c0kZUA4I+cdWZgYcJ8V3e2tk"},"origin":"localhost","origin_server_ts":1494411643348,"prev_events":[["$RHNjeYUvXVZfb93t:localhost",{"sha256":"LqFmTIzULgUDSf5xM3REObvnsRGLQliWBUf1hEDT4+w"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"gsY4B6TIBdVvLyFAaXw0xez9N5/Cn/ZaJ4z+j9gJU/ZR8j1t3OYlcVQN6uln9JwEU1k20AsGnIqvOaayd+bfCg"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$1xoUuqOFjFFJgwA5:localhost"],"AddsStateEventIDs":["$1xoUuqOFjFFJgwA5:localhost"],"RemovesStateEventIDs":["$j7KtuOzM0K15h3Kr:localhost"],"LastSentEventID":"$RHNjeYUvXVZfb93t:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello bob","msgtype":"m.text"},"depth":0,"event_id":"$4NBTdIwDxq5fDGpv:localhost","hashes":{"sha256":"msCIESAya8kD7nLCopxkEqrgVuGfrlr9YBIADH5czTA"},"origin":"localhost","origin_server_ts":1494411674630,"prev_events":[["$1xoUuqOFjFFJgwA5:localhost",{"sha256":"ZXj+kY6sqQpf5vsNqvCMSvNoXXKDKxRE4R7+gZD9Tkk"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"bZRT3NxVlfBWw1PxSlKlgfnJixG+NI5H9QmUK2AjECg+l887BZJNCvAK0eD27N8e9V+c2glyXWYje2wexP2CBw"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$4NBTdIwDxq5fDGpv:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$1xoUuqOFjFFJgwA5:localhost"}`, + // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"membership":"invite"},"depth":0,"event_id":"$zzLHVlHIWPrnE7DI:localhost","hashes":{"sha256":"LKk7tnYJAHsyffbi9CzfdP+TU4KQ5g6YTgYGKjJ7NxU"},"origin":"localhost","origin_server_ts":1494411709192,"prev_events":[["$4NBTdIwDxq5fDGpv:localhost",{"sha256":"EpqmxEoJP93Zb2Nt2fS95SJWTqqIutHm/Ne8OHqp6Ps"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"GdUzkC+7YKl1XDi7kYuD39yi2L/+nv+YrecIQHS+0BLDQqnEj+iRXfNBuZfTk6lUBCJCHXZlk7MnEIjvWDlZCg"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$zzLHVlHIWPrnE7DI:localhost"],"AddsStateEventIDs":["$zzLHVlHIWPrnE7DI:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$4NBTdIwDxq5fDGpv:localhost"}`, + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$uJVKyzZi8ZX0kOd9:localhost"],"AddsStateEventIDs":["$uJVKyzZi8ZX0kOd9:localhost"],"RemovesStateEventIDs":["$zzLHVlHIWPrnE7DI:localhost"],"LastSentEventID":"$zzLHVlHIWPrnE7DI:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"not charlie...","msgtype":"m.text"},"depth":0,"event_id":"$Ixfn5WT9ocWTYxfy:localhost","hashes":{"sha256":"hRChdyMQ3AY4jvrPpI8PEX6Taux83Qo5hdSeHlhPxGo"},"origin":"localhost","origin_server_ts":1494411792737,"prev_events":[["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"LC/Zqwu/XdqjmLdTOp/NQaFaE0niSAGgEpa39gCxsnsqEX80P7P5WDn/Kzx6rjWTnhIszrLsnoycqkXQT0Z4DQ"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$Ixfn5WT9ocWTYxfy:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$uJVKyzZi8ZX0kOd9:localhost"}`, + // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$om1F4AI8tCYlHUSp:localhost"],"AddsStateEventIDs":["$om1F4AI8tCYlHUSp:localhost"],"RemovesStateEventIDs":["$uJVKyzZi8ZX0kOd9:localhost"],"LastSentEventID":"$Ixfn5WT9ocWTYxfy:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"why did you kick charlie","msgtype":"m.text"},"depth":0,"event_id":"$hgao5gTmr3r9TtK2:localhost","hashes":{"sha256":"Aa2ZCrvwjX5xhvkVqIOFUeEGqrnrQZjjNFiZRybjsPY"},"origin":"localhost","origin_server_ts":1494411912809,"prev_events":[["$om1F4AI8tCYlHUSp:localhost",{"sha256":"yVs+CW7AiJrJOYouL8xPIBrtIHAhnbxaegna8MxeCto"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"sGkpbEXGsvAuCvE3wb5E9H5fjCVKpRdWNt6csj1bCB9Fmg4Rg4mvj3TAJ+91DjO8IPsgSxDKdqqRYF0OtcynBA"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$hgao5gTmr3r9TtK2:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$om1F4AI8tCYlHUSp:localhost"}`, + // $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"No Charlies"},"depth":0,"event_id":"$CY4XDoxjbns3a4Pc:localhost","hashes":{"sha256":"chk72pVkp3AGR2FtdC0mORBWS1b9ePnRN4WK3BP0BiI"},"origin":"localhost","origin_server_ts":1494411959114,"prev_events":[["$hgao5gTmr3r9TtK2:localhost",{"sha256":"/4/OG4Q2YalIeBtN76BEPIieBKA/3UFshR9T+WJip4o"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"mapvA3KJYgw5FmzJMhSFa/+JSuNyv2eKAkiGomAeBB7LQ1e9nK9XhW/Fp7a5Z2Sy2ENwHyd3ij7FEGiLOnSIAw"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$CY4XDoxjbns3a4Pc:localhost"],"AddsStateEventIDs":["$CY4XDoxjbns3a4Pc:localhost"],"RemovesStateEventIDs":["$1xoUuqOFjFFJgwA5:localhost"],"LastSentEventID":"$hgao5gTmr3r9TtK2:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"whatever"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"whatever","msgtype":"m.text"},"depth":0,"event_id":"$pl8VBHRPYDmsnDh4:localhost","hashes":{"sha256":"FYqY9+/cepwIxxjfFV3AjOFBXkTlyEI2jep87dUc+SU"},"origin":"localhost","origin_server_ts":1494411988548,"prev_events":[["$CY4XDoxjbns3a4Pc:localhost",{"sha256":"hCoV63fp8eiquVdEefsOqJtLmJhw4wTlRv+wNTS20Ac"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"sQKwRzE59eZyb8rDySo/pVwZXBh0nA5zx+kjEyXglxIQrTre+8Gj3R7Prni+RE3Dq7oWfKYV7QklTLURAaSICQ"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$pl8VBHRPYDmsnDh4:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$CY4XDoxjbns3a4Pc:localhost"}`, + // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$acCW4IgnBo8YD3jw:localhost","hashes":{"sha256":"porP+E2yftBGjfS381+WpZeDM9gZHsM3UydlBcRKBLw"},"origin":"localhost","origin_server_ts":1494412037042,"prev_events":[["$pl8VBHRPYDmsnDh4:localhost",{"sha256":"b+qQ380JDFq7quVU9EbIJ2sbpUKM1LAUNX0ZZUoVMZw"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"kxbjTIC0/UR4cOYUAOTNiUc0SSVIF4BY6Rq6IEgYJemq4jcU2fYqum4mFxIQTDKKXMSRHEoNPDmYMFIJwkrsCg"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$acCW4IgnBo8YD3jw:localhost"],"AddsStateEventIDs":["$acCW4IgnBo8YD3jw:localhost"],"RemovesStateEventIDs":["$wPepDhIla765Odre:localhost"],"LastSentEventID":"$pl8VBHRPYDmsnDh4:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"im alone now"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"im alone now","msgtype":"m.text"},"depth":0,"event_id":"$nYdEXrvTDeb7DfkC:localhost","hashes":{"sha256":"qibC5NmlJpSRMBWSWxy1pv73FXymhPDXQFMmGosfsV0"},"origin":"localhost","origin_server_ts":1494412084668,"prev_events":[["$acCW4IgnBo8YD3jw:localhost",{"sha256":"8h3uXoE6pnI9iLnXI6493qJ0HeuRQfenRIu9PcgH72g"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"EHRoZznhXywhYeIn83o4FSFm3No/aOdLQPHQ68YGtNgESWwpuWLkkGVjoISjz3QgXQ06Fl3cHt7nlTaAHpCNAg"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$nYdEXrvTDeb7DfkC:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$acCW4IgnBo8YD3jw:localhost"}`, + // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$acCW4IgnBo8YD3jw:localhost",{"sha256":"8h3uXoE6pnI9iLnXI6493qJ0HeuRQfenRIu9PcgH72g"}]],"content":{"membership":"invite"},"depth":0,"event_id":"$gKNfcXLlWvs2cFad:localhost","hashes":{"sha256":"iYDOUjYkaGSFbVp7TRVFvGJyGMEuBHMQrJ9XqwhzmPI"},"origin":"localhost","origin_server_ts":1494412135845,"prev_events":[["$nYdEXrvTDeb7DfkC:localhost",{"sha256":"83T5Q3+nDvtS0oJTEhHxIw02twBDa1A7QR2bHtnxv1Y"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"ofw009aMJMqVjww9eDXgeTjOQqSlJl/GN/AAb+6mZAPcUI8aVgRlXOSESfhu1ONEuV/yNUycxNXWfMwuvoWsDg"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$gKNfcXLlWvs2cFad:localhost"],"AddsStateEventIDs":["$gKNfcXLlWvs2cFad:localhost"],"RemovesStateEventIDs":["$acCW4IgnBo8YD3jw:localhost"],"LastSentEventID":"$nYdEXrvTDeb7DfkC:localhost"}`, + // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$gKNfcXLlWvs2cFad:localhost",{"sha256":"/TYIY+L9qjg516Bzl8sadu+Np21KkxE4KdPXALeJ9eE"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$B2q9Tepb6Xc1Rku0:localhost","hashes":{"sha256":"RbHTVdceAEfTALQDZdGrOmakKeTYnChaKjlVuoNUdSY"},"origin":"localhost","origin_server_ts":1494412187614,"prev_events":[["$gKNfcXLlWvs2cFad:localhost",{"sha256":"/TYIY+L9qjg516Bzl8sadu+Np21KkxE4KdPXALeJ9eE"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"dNtUL86j2zUe5+DkfOkil5VujvFZg4FeTjbtcpeF+3E4SUChCAG3lyR6YOAIYBnjtD0/kqT7OcP3pM6vMEp1Aw"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$B2q9Tepb6Xc1Rku0:localhost"],"AddsStateEventIDs":["$B2q9Tepb6Xc1Rku0:localhost"],"RemovesStateEventIDs":["$gKNfcXLlWvs2cFad:localhost"],"LastSentEventID":"$gKNfcXLlWvs2cFad:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"so alone"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"so alone","msgtype":"m.text"},"depth":0,"event_id":"$W1nrYHQIbCTTSJOV:localhost","hashes":{"sha256":"uUKSa4U1coDoT3LUcNF25dt+UpUa2pLXzRJ3ljgxXZs"},"origin":"localhost","origin_server_ts":1494412229742,"prev_events":[["$B2q9Tepb6Xc1Rku0:localhost",{"sha256":"0CLru7nGPgyF9AWlZnarCElscSVrXl2MMY2atrz80Uc"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"YlBJyDnE34UhaCB9hirQN5OySfTDoqiBDnNvxomXjU94z4a8g2CLWKjApwd/q/j4HamCUtjgkjJ2um6hNjsVBA"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$W1nrYHQIbCTTSJOV:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$B2q9Tepb6Xc1Rku0:localhost"}`, + // $ curl -XPUT -d '{"name":"Everyone welcome"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"Everyone welcome"},"depth":0,"event_id":"$nLzxoBC4A0QRvJ1k:localhost","hashes":{"sha256":"PExCybjaMW1TfgFr57MdIRYJ642FY2jnrdW/tpPOf1Y"},"origin":"localhost","origin_server_ts":1494412294551,"prev_events":[["$W1nrYHQIbCTTSJOV:localhost",{"sha256":"HXk/ACcsiaZ/z1f2aZSIhJF8Ih3BWeh1vp+cV/fwoE0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"RK09L8sQv78y69PNbOLaX8asq5kp51mbqUuct5gd7ZNmaHKnVds6ew06QEn+gHSDAxqQo2tpcfoajp+yMj1HBw"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$nLzxoBC4A0QRvJ1k:localhost"],"AddsStateEventIDs":["$nLzxoBC4A0QRvJ1k:localhost"],"RemovesStateEventIDs":["$CY4XDoxjbns3a4Pc:localhost"],"LastSentEventID":"$W1nrYHQIbCTTSJOV:localhost"}`, + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$om1F4AI8tCYlHUSp:localhost",{"sha256":"yVs+CW7AiJrJOYouL8xPIBrtIHAhnbxaegna8MxeCto"}]],"content":{"membership":"join"},"depth":0,"event_id":"$Zo6P8r9bczF6kctV:localhost","hashes":{"sha256":"R3J2iUWnGxVdmly8ah+Dgb5VbJ2i/e8BLaWM0z9eZKU"},"origin":"localhost","origin_server_ts":1494412338689,"prev_events":[["$nLzxoBC4A0QRvJ1k:localhost",{"sha256":"TDcFaArAXpxIJ1noSubcFqkLXiQTrc1Dw1+kgCtx3XY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"tVnjLVoJ9SLlMQIJSK/6zANWaEu8tVVkx3AEJiC3y5JmhPORb3PyG8eE+e/9hC4aJSQL8LGLaJNWXukMpb2SBg"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$Zo6P8r9bczF6kctV:localhost"],"AddsStateEventIDs":["$Zo6P8r9bczF6kctV:localhost"],"RemovesStateEventIDs":["$om1F4AI8tCYlHUSp:localhost"],"LastSentEventID":"$nLzxoBC4A0QRvJ1k:localhost"}`, + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hiiiii"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@charlie:localhost" + `{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$Zo6P8r9bczF6kctV:localhost",{"sha256":"mnjt3WTYqwtuyl2Fca+0cgm6moHaNL+W9BqRJTQzdEY"}]],"content":{"body":"hiiiii","msgtype":"m.text"},"depth":0,"event_id":"$YAEvK8u2zkTsjf5P:localhost","hashes":{"sha256":"6hKy61h1tuHjYdfpq2MnaPtGEBAZOUz8FLTtxLwjK5A"},"origin":"localhost","origin_server_ts":1494412375465,"prev_events":[["$Zo6P8r9bczF6kctV:localhost",{"sha256":"mnjt3WTYqwtuyl2Fca+0cgm6moHaNL+W9BqRJTQzdEY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"BsSLaMM5U/YkyvBZ00J/+si9My+wAJZOcBhBeato0oHayiag7FW77ZpSTfADazPdNH62kjB0sdP9CN6vQA7yDg"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$YAEvK8u2zkTsjf5P:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$Zo6P8r9bczF6kctV:localhost"}`, +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/README.md b/src/github.com/matrix-org/dendrite/syncapi/README.md index 7ead4cd1a..7221b22d5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/README.md +++ b/src/github.com/matrix-org/dendrite/syncapi/README.md @@ -1,6 +1,13 @@ # Sync API Server -This server is responsible for servicing `/sync` requests. It gets its data from the room server output log. +This server is responsible for servicing `/sync` requests. It gets its data from the room server output log. Currently, the sync server will: + - Return a valid `/sync` response for the user represented by the provided `access_token`. + - Return a "complete sync" if no `since` value is provided, and return a valid `next_batch` token. This contains all rooms the user has been invited to or has joined. For joined rooms, this includes the complete current room state and the most recent 20 (hard-coded) events in the timeline. + - For "incremental syncs" (a `since` value is provided), as you get invited to, join, or leave rooms they will be reflected correctly in the `/sync` response. + - For very large state deltas, the `state` section of a room is correctly populated with the state of the room at the *start* of the timeline. + - When you join a room, the `/sync` which transitions your client to be "joined" will include the complete current room state as per the specification. + - Only wake up user streams it needs to wake up. + - Honours the `timeout` query parameter value. ## Internals @@ -59,3 +66,21 @@ are in `OutputRoomEvents` from the room server. This version of the sync server uses very simple indexing to calculate room state at various points. This is inefficient when a very old `since` value is provided, or the `full_state` is requested, as the state delta becomes very large. This is mitigated slightly with indexes, but better data structures could be used in the future. + +## Known Issues + +- `m.room.history_visibility` is not honoured: it is always treated as "shared". +- All ephemeral events are not implemented (presence, typing, receipts). +- Account data (both user and room) is not implemented. +- `to_device` messages are not implemented. +- Back-pagination via `prev_batch` is not implemented. +- The `limited` flag can lie. +- Filters are not honoured or implemented. The `limit` for each room is hard-coded to 20. +- The `full_state` query parameter is not implemented. +- The `set_presence` query parameter is not implemented. +- "Ignored" users are not ignored. +- Redacted events are still sent to clients. +- Invites over federation (if it existed) won't work as they aren't "real" events and so won't be in the right tables. +- `invite_state` is not implemented (for similar reasons to the above point). +- The current implementation scales badly when a very old `since` token is provided. +- The entire current room state can be re-sent to the client if they send a duplicate "join" event which should be a no-op. diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 4d703ab32..b8ec98d2c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -28,15 +28,15 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -// Server contains all the logic for running a sync server -type Server struct { +// OutputRoomEvent consumes events that originated in the room server. +type OutputRoomEvent struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase notifier *sync.Notifier } -// NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) { +// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -47,7 +47,7 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData Consumer: kafkaConsumer, PartitionStore: store, } - s := &Server{ + s := &OutputRoomEvent{ roomServerConsumer: &consumer, db: store, notifier: n, @@ -58,14 +58,14 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData } // Start consuming from room servers -func (s *Server) Start() error { +func (s *OutputRoomEvent) Start() error { return s.roomServerConsumer.Start() } // onMessage is called when the sync server receives a new event from the room server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputRoomEvent if err := json.Unmarshal(msg.Value, &output); err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index e8cc68517..b74514c16 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -61,11 +61,15 @@ const selectRoomIDsWithMembershipSQL = "" + const selectCurrentStateSQL = "" + "SELECT event_json FROM current_room_state WHERE room_id = $1" +const selectJoinedUsersSQL = "" + + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt + selectJoinedUsersStmt *sql.Stmt } func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { @@ -85,9 +89,34 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil { return } + if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { + return + } return } +// JoinedMemberLists returns a map of room ID to a list of joined user IDs. +func (s *currentRoomStateStatements) JoinedMemberLists() (map[string][]string, error) { + rows, err := s.selectJoinedUsersStmt.Query() + if err != nil { + return nil, err + } + defer rows.Close() + + result := make(map[string][]string) + for rows.Next() { + var roomID string + var userID string + if err := rows.Scan(&roomID, &userID); err != nil { + return nil, err + } + users := result[roomID] + users = append(users, userID) + result[roomID] = users + } + return result, nil +} + // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(txn *sql.Tx, userID, membership string) ([]string, error) { rows, err := txn.Stmt(s.selectRoomIDsWithMembershipStmt).Query(userID, membership) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 222b105c6..6196fa75a 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -50,30 +50,26 @@ const insertEventSQL = "" + "INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5) RETURNING id" const selectEventsSQL = "" + - "SELECT event_json FROM output_room_events WHERE event_id = ANY($1)" - -const selectEventsInRangeSQL = "" + - "SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2" + "SELECT id, event_json FROM output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4" + "SELECT id, event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4" const selectMaxIDSQL = "" + "SELECT MAX(id) FROM output_room_events" // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" + - " WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + + "SELECT id, event_json, add_state_ids, remove_state_ids FROM output_room_events" + + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " ORDER BY id ASC" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxIDStmt *sql.Stmt - selectEventsInRangeStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -90,9 +86,6 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil { return } - if s.selectEventsInRangeStmt, err = db.Prepare(selectEventsInRangeSQL); err != nil { - return - } if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } @@ -102,10 +95,10 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { return } -// StateBetween returns the state events between the two given stream positions, exclusive of both. +// StateBetween returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. -func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) { +func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]streamEvent, error) { rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos) if err != nil { return nil, err @@ -115,18 +108,19 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty // - For each room ID, build up an array of event IDs which represents cumulative adds/removes // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID // if they aren't in the event ID cache. We don't handle state deletion yet. - eventIDToEvent := make(map[string]gomatrixserverlib.Event) + eventIDToEvent := make(map[string]streamEvent) // RoomID => A set (map[string]bool) of state event IDs which are between the two positions stateNeeded := make(map[string]map[string]bool) for rows.Next() { var ( + streamPos int64 eventBytes []byte addIDs pq.StringArray delIDs pq.StringArray ) - if err := rows.Scan(&eventBytes, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil { return nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -157,7 +151,7 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = ev + eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)} } return s.fetchStateEvents(txn, stateNeeded, eventIDToEvent) @@ -165,8 +159,8 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty // fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. // Returns a map of room ID to list of events. -func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]gomatrixserverlib.Event) (map[string][]gomatrixserverlib.Event, error) { - stateBetween := make(map[string][]gomatrixserverlib.Event) +func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]streamEvent) (map[string][]streamEvent, error) { + stateBetween := make(map[string][]streamEvent) missingEvents := make(map[string][]string) for roomID, ids := range roomIDToEventIDSet { events := stateBetween[roomID] @@ -232,7 +226,7 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser } // RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]gomatrixserverlib.Event, error) { +func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]streamEvent, error) { rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit) if err != nil { return nil, err @@ -249,7 +243,7 @@ func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID stri // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // from the database. -func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) { +func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs)) if err != nil { return nil, err @@ -266,11 +260,14 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g return result, nil } -func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { - var result []gomatrixserverlib.Event +func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) { + var result []streamEvent for rows.Next() { - var eventBytes []byte - if err := rows.Scan(&eventBytes); err != nil { + var ( + streamPos int64 + eventBytes []byte + ) + if err := rows.Scan(&streamPos, &eventBytes); err != nil { return nil, err } // TODO: Handle redacted events @@ -278,12 +275,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { if err != nil { return nil, err } - result = append(result, ev) + result = append(result, streamEvent{ev, types.StreamPosition(streamPos)}) } return result, nil } -func reverseEvents(input []gomatrixserverlib.Event) (output []gomatrixserverlib.Event) { +func reverseEvents(input []streamEvent) (output []streamEvent) { for i := len(input) - 1; i >= 0; i-- { output = append(output, input[i]) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index fe9bc4f78..326d12c40 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -16,13 +16,30 @@ package storage import ( "database/sql" + "encoding/json" // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) +type stateDelta struct { + roomID string + stateEvents []gomatrixserverlib.Event + membership string + // The stream position of the latest membership event for this user, if applicable. + // Can be 0 if there is no membership event in this delta. + membershipPos types.StreamPosition +} + +// Same as gomatrixserverlib.Event but also has the stream position for this event. +type streamEvent struct { + gomatrixserverlib.Event + streamPosition types.StreamPosition +} + // SyncServerDatabase represents a sync server database type SyncServerDatabase struct { db *sql.DB @@ -53,6 +70,11 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { return &SyncServerDatabase{db, partitions, events, state}, nil } +// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. +func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error) { + return d.roomstate.JoinedMemberLists() +} + // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. @@ -86,7 +108,7 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve if err != nil { return err } - return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs) + return d.roomstate.UpdateRoomState(txn, streamEventsToEvents(added), removeStateEventIDs) }) return } @@ -111,53 +133,84 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) } // IncrementalSync returns all the data needed in order to create an incremental sync response. -func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { - roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") + // Work out which rooms to return in the response. This is done by getting not only the currently + // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. + // This works out what the 'state' key should be for each room as well as which membership block + // to put the room into. + deltas, err := d.getStateDeltas(txn, fromPos, toPos, userID) if err != nil { return err } - state, err := d.events.StateBetween(txn, fromPos, toPos) - if err != nil { - return err - } - - for _, roomID := range roomIDs { - recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom) + res = types.NewResponse(toPos) + for _, delta := range deltas { + endPos := toPos + if delta.membershipPos > 0 && delta.membership == "leave" { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + endPos = delta.membershipPos + } + recentStreamEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom) if err != nil { return err } - roomData := types.RoomData{ - State: state[roomID], - RecentEvents: recentEvents, + recentEvents := streamEventsToEvents(recentStreamEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back + + switch delta.membership { + case "join": + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case "leave": + fallthrough // transitions to leave are the same as ban + case "ban": + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr } - data[roomID] = roomData } - return nil + + // TODO: This should be done in getStateDeltas + return d.addInvitesToResponse(txn, userID, res) }) return } -// CompleteSync returns all the data needed in order to create a complete sync response. -func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +// CompleteSync a complete /sync API response for the given user. +func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // a consistent view of the database throughout. This includes extracting the sync stream position. + // This does have the unfortunate side-effect that all the matrixy logic resides in this function, + // but it's better to not hide the fact that this is being done in a transaction. returnErr = runTransaction(d.db, func(txn *sql.Tx) error { // Get the current stream position which we will base the sync response on. id, err := d.events.MaxID(txn) if err != nil { return err } - pos = types.StreamPosition(id) + pos := types.StreamPosition(id) // Extract room state and recent events for all rooms the user is joined to. roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") if err != nil { return err } + + // Build up a /sync response. Add joined rooms. + res = types.NewResponse(pos) for _, roomID := range roomIDs { stateEvents, err := d.roomstate.CurrentState(txn, roomID) if err != nil { @@ -165,20 +218,151 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom } // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom) + recentStreamEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom) if err != nil { return err } - data[roomID] = types.RoomData{ - State: stateEvents, - RecentEvents: recentEvents, - } + recentEvents := streamEventsToEvents(recentStreamEvents) + + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } - return nil + + return d.addInvitesToResponse(txn, userID, res) }) return } +func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error { + // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. + roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "invite") + if err != nil { + return err + } + for _, roomID := range roomIDs { + ir := types.NewInviteResponse() + // TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation + res.Rooms.Invite[roomID] = *ir + } + return nil +} + +func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) { + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. + // - Get all CURRENTLY joined rooms, and add them to 'joined' block. + var deltas []stateDelta + + // get all the state events ever between these two positions + state, err := d.events.StateBetween(txn, fromPos, toPos) + if err != nil { + return nil, err + } + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership == "join" { + // send full room state down instead of a delta + var allState []gomatrixserverlib.Event + allState, err = d.roomstate.CurrentState(txn, roomID) + if err != nil { + return nil, err + } + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{allState[i], types.StreamPosition(0)} + } + state[roomID] = s + continue // we'll add this room in when we do joined rooms + } + + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(stateStreamEvents), + roomID: roomID, + }) + break + } + } + } + + // Add in currently joined rooms + joinedRoomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") + if err != nil { + return nil, err + } + for _, joinedRoomID := range joinedRoomIDs { + deltas = append(deltas, stateDelta{ + membership: "join", + stateEvents: streamEventsToEvents(state[joinedRoomID]), + roomID: joinedRoomID, + }) + } + + return deltas, nil +} + +func streamEventsToEvents(in []streamEvent) []gomatrixserverlib.Event { + out := make([]gomatrixserverlib.Event, len(in)) + for i := 0; i < len(in); i++ { + out[i] = in[i].Event + } + return out +} + +// There may be some overlap where events in stateEvents are already in recentEvents, so filter +// them out so we don't include them twice in the /sync response. They should be in recentEvents +// only, so clients get to the correct state once they have rolled forward. +func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event { + for _, recentEv := range recentEvents { + if recentEv.StateKey() == nil { + continue // not a state event + } + // TODO: This is a linear scan over all the current state events in this room. This will + // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) + // then do a binary search to find matching events, similar to what roomserver does. + for j := 0; j < len(stateEvents); j++ { + if stateEvents[j].EventID() == recentEv.EventID() { + // overwrite the element to remove with the last element then pop the last element. + // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering + // (we don't care about the order of stateEvents) + stateEvents[j] = stateEvents[len(stateEvents)-1] + stateEvents = stateEvents[:len(stateEvents)-1] + break // there shouldn't be multiple events with the same event ID + } + } + } + return stateEvents +} + +// getMembershipFromEvent returns the value of content.membership iff the event is a state event +// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. +func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + var memberContent events.MemberContent + if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + return "" + } + return memberContent.Membership + } + return "" +} + func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index cc986579f..1cc9c4e28 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -15,27 +15,41 @@ package sync import ( + "encoding/json" "sync" + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/events" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) -// Notifier will wake up sleeping requests in the request pool when there -// is some new data. It does not tell requests what that data is, only the -// stream position which they can use to get at it. +// Notifier will wake up sleeping requests when there is some new data. +// It does not tell requests what that data is, only the stream position which +// they can use to get at it. This is done to prevent races whereby we tell the caller +// the event, but the token has already advanced by the time they fetch it, resulting +// in missed events. type Notifier struct { - // The latest sync stream position: guarded by 'cond'. + // A map of RoomID => Set : Must only be accessed by the OnNewEvent goroutine + roomIDToJoinedUsers map[string]userIDSet + // Protects currPos and userStreams. + streamLock *sync.Mutex + // The latest sync stream position currPos types.StreamPosition - // A condition variable to notify all waiting goroutines of a new sync stream position - cond *sync.Cond + // A map of user_id => UserStream which can be used to wake a given user's /sync request. + userStreams map[string]*UserStream } // NewNotifier creates a new notifier set to the given stream position. +// In order for this to be of any use, the Notifier needs to be told all rooms and +// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). func NewNotifier(pos types.StreamPosition) *Notifier { return &Notifier{ - pos, - sync.NewCond(&sync.Mutex{}), + currPos: pos, + roomIDToJoinedUsers: make(map[string]userIDSet), + userStreams: make(map[string]*UserStream), + streamLock: &sync.Mutex{}, } } @@ -43,25 +57,157 @@ func NewNotifier(pos types.StreamPosition) *Notifier { // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { - // update the current position in a guard and then notify all /sync streams - n.cond.L.Lock() + // update the current position then notify relevant /sync streams. + // This needs to be done PRIOR to waking up users as they will read this value. + n.streamLock.Lock() + defer n.streamLock.Unlock() n.currPos = pos - n.cond.L.Unlock() - n.cond.Broadcast() // notify ALL waiting goroutines + // Map this event's room_id to a list of joined users, and wake them up. + userIDs := n.joinedUsers(ev.RoomID()) + // If this is an invite, also add in the invitee to this list. + if ev.Type() == "m.room.member" && ev.StateKey() != nil { + userID := *ev.StateKey() + var memberContent events.MemberContent + if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + log.WithError(err).WithField("event_id", ev.EventID()).Errorf( + "Notifier.OnNewEvent: Failed to unmarshal member event", + ) + } else { + // Keep the joined user map up-to-date + switch memberContent.Membership { + case "invite": + userIDs = append(userIDs, userID) + case "join": + n.addJoinedUser(ev.RoomID(), userID) + case "leave": + fallthrough + case "ban": + n.removeJoinedUser(ev.RoomID(), userID) + } + } + } + + for _, userID := range userIDs { + n.wakeupUser(userID, pos) + } } // WaitForEvents blocks until there are new events for this request. func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { - // In a guard, check if the /sync request should block, and block it until we get a new position - n.cond.L.Lock() + // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 + // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID + // - Incoming events wake requests for a matching room ID + // - Incoming events wake requests for a matching user ID (needed for invites) + + // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, + // but given we don't do /events, let's pretend it doesn't exist. + + // In a guard, check if the /sync request should block, and block it until we get woken up + n.streamLock.Lock() currentPos := n.currPos - for req.since == currentPos { - // we need to wait for a new event. - // TODO: This waits for ANY new event, we need to only wait for events which we care about. - n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock - currentPos = n.currPos + + // TODO: We increment the stream position for any event, so it's possible that we return immediately + // with a pos which contains no new events for this user. We should probably re-wait for events + // automatically in this case. + if req.since != currentPos { + n.streamLock.Unlock() + return currentPos } - n.cond.L.Unlock() - return currentPos + + // wait to be woken up, and then re-check the stream position + req.log.WithField("user_id", req.userID).Info("Waiting for event") + + // give up the stream lock prior to waiting on the user lock + stream := n.fetchUserStream(req.userID, true) + n.streamLock.Unlock() + return stream.Wait(currentPos) +} + +// Load the membership states required to notify users correctly. +func (n *Notifier) Load(db *storage.SyncServerDatabase) error { + roomToUsers, err := db.AllJoinedUsersInRooms() + if err != nil { + return err + } + n.setUsersJoinedToRooms(roomToUsers) + return nil +} + +// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from +// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to +// OnNewEvent (eg on startup) to prevent racing. +func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { + // This is just the bulk form of addJoinedUser + for roomID, userIDs := range roomIDToUserIDs { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(userIDSet) + } + for _, userID := range userIDs { + n.roomIDToJoinedUsers[roomID].add(userID) + } + } +} + +func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) { + stream := n.fetchUserStream(userID, false) + if stream == nil { + return + } + stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream +} + +// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, +// a stream will be made for this user if one doesn't exist and it will be returned. This +// function does not wait for data to be available on the stream. +func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { + stream, ok := n.userStreams[userID] + if !ok { + // TODO: Unbounded growth of streams (1 per user) + stream = NewUserStream(userID) + n.userStreams[userID] = stream + } + return stream +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) addJoinedUser(roomID, userID string) { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(userIDSet) + } + n.roomIDToJoinedUsers[roomID].add(userID) +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) removeJoinedUser(roomID, userID string) { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(userIDSet) + } + n.roomIDToJoinedUsers[roomID].remove(userID) +} + +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + return + } + return n.roomIDToJoinedUsers[roomID].values() +} + +// A string set, mainly existing for improving clarity of structs in this file. +type userIDSet map[string]bool + +func (s userIDSet) add(str string) { + s[str] = true +} + +func (s userIDSet) remove(str string) { + delete(s, str) +} + +func (s userIDSet) values() (vals []string) { + for str := range s { + vals = append(vals, str) + } + return } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go new file mode 100644 index 000000000..784faf578 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -0,0 +1,292 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +var ( + randomMessageEvent gomatrixserverlib.Event + aliceInviteBobEvent gomatrixserverlib.Event + bobLeaveEvent gomatrixserverlib.Event +) + +var ( + streamPositionVeryOld = types.StreamPosition(5) + streamPositionBefore = types.StreamPosition(11) + streamPositionAfter = types.StreamPosition(12) + streamPositionAfter2 = types.StreamPosition(13) + roomID = "!test:localhost" + alice = "@alice:localhost" + bob = "@bob:localhost" +) + +func init() { + var err error + randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.message", + "content": { + "body": "Hello World", + "msgtype": "m.text" + }, + "sender": "@noone:localhost", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$randomMessageEvent:localhost" + }`), false) + if err != nil { + panic(err) + } + aliceInviteBobEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.member", + "state_key": "`+bob+`", + "content": { + "membership": "invite" + }, + "sender": "`+alice+`", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$aliceInviteBobEvent:localhost" + }`), false) + if err != nil { + panic(err) + } + bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.member", + "state_key": "`+bob+`", + "content": { + "membership": "leave" + }, + "sender": "`+bob+`", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$bobLeaveEvent:localhost" + }`), false) + if err != nil { + panic(err) + } +} + +// Test that the current position is returned if a request is already behind. +func TestImmediateNotification(t *testing.T) { + n := NewNotifier(streamPositionBefore) + pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld)) + if err != nil { + t.Fatalf("TestImmediateNotification error: %s", err) + } + if pos != streamPositionBefore { + t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos) + } +} + +// Test that new events to a joined room unblocks the request. +func TestNewEventAndJoinedToRoom(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: []string{alice, bob}, + }) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewEventAndJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + }() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 1) + + n.OnNewEvent(&randomMessageEvent, streamPositionAfter) + + wg.Wait() +} + +// Test that an invite unblocks the request +func TestNewInviteEventForUser(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: []string{alice, bob}, + }) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewInviteEventForUser error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + }() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 1) + + n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter) + + wg.Wait() +} + +// Test that all blocked requests get woken up on a new event. +func TestMultipleRequestWakeup(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: []string{alice, bob}, + }) + + var wg sync.WaitGroup + wg.Add(3) + poll := func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestMultipleRequestWakeup error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + } + go poll() + go poll() + go poll() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 3) + + n.OnNewEvent(&randomMessageEvent, streamPositionAfter) + + wg.Wait() + + numWaiting := stream.NumWaiting() + if numWaiting != 0 { + t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting) + } +} + +// Test that you stop getting woken up when you leave a room. +func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { + // listen as bob. Make bob leave room. Make alice send event to room. + // Make sure alice gets woken up only and not bob as well. + n := NewNotifier(streamPositionBefore) + n.setUsersJoinedToRooms(map[string][]string{ + roomID: []string{alice, bob}, + }) + + var leaveWG sync.WaitGroup + + // Make bob leave the room + leaveWG.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos) + } + leaveWG.Done() + }() + bobStream := n.fetchUserStream(bob, true) + waitForBlocking(bobStream, 1) + n.OnNewEvent(&bobLeaveEvent, streamPositionAfter) + leaveWG.Wait() + + // send an event into the room. Make sure alice gets it. Bob should not. + var aliceWG sync.WaitGroup + aliceStream := n.fetchUserStream(alice, true) + aliceWG.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter)) + if err != nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter2 { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos) + } + aliceWG.Done() + }() + + go func() { + // this should timeout with an error (but the main goroutine won't wait for the timeout explicitly) + _, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter)) + if err == nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil") + } + }() + + waitForBlocking(aliceStream, 1) + waitForBlocking(bobStream, 1) + + n.OnNewEvent(&randomMessageEvent, streamPositionAfter2) + aliceWG.Wait() + + // it's possible that at this point alice has been informed and bob is about to be informed, so wait + // for a fraction of a second to account for this race + time.Sleep(1 * time.Millisecond) +} + +// same as Notifier.WaitForEvents but with a timeout. +func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { + done := make(chan types.StreamPosition, 1) + go func() { + newPos := n.WaitForEvents(req) + done <- newPos + close(done) + }() + select { + case <-time.After(5 * time.Second): + return types.StreamPosition(0), fmt.Errorf( + "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, + ) + case p := <-done: + return p, nil + } +} + +// Wait until something is Wait()ing on the user stream. +func waitForBlocking(s *UserStream, numBlocking int) { + for numBlocking != s.NumWaiting() { + // This is horrible but I don't want to add a signalling mechanism JUST for testing. + time.Sleep(1 * time.Microsecond) + } +} + +func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { + return syncRequest{ + userID: userID, + timeout: 1 * time.Minute, + since: since, + wantFullState: false, + limit: defaultTimelineLimit, + log: util.GetLogger(context.TODO()), + } +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index a44f8557f..5260a3639 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -15,10 +15,13 @@ package sync import ( - "github.com/matrix-org/dendrite/syncapi/types" "net/http" "strconv" "time" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/util" ) const defaultSyncTimeout = time.Duration(30) * time.Second @@ -31,6 +34,7 @@ type syncRequest struct { timeout time.Duration since types.StreamPosition wantFullState bool + log *log.Entry } func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { @@ -48,6 +52,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { since: since, wantFullState: wantFullState, limit: defaultTimelineLimit, // TODO: read from filter + log: util.GetLogger(req.Context()), }, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index eee117e76..8e9affb68 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -64,7 +63,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons // Fork off 2 goroutines: one to do the work, and one to serve as a timeout. // Whichever returns first is the one we will serve back to the client. - // TODO: Currently this means that cpu work is timed, which may not be what we want long term. timeoutChan := make(chan struct{}) timer := time.AfterFunc(syncReq.timeout, func() { close(timeoutChan) // signal that the timeout has expired @@ -72,8 +70,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons done := make(chan util.JSONResponse) go func() { - syncData, err := rp.currentSyncForUser(*syncReq) + currentPos := rp.notifier.WaitForEvents(*syncReq) + // We stop the timer BEFORE calculating the response so the cpu work + // done to calculate the response is not timed. This stops us from + // doing lots of work then timing out and sending back an empty response. timer.Stop() + syncData, err := rp.currentSyncForUser(*syncReq, currentPos) var res util.JSONResponse if err != nil { res = httputil.LogThenError(req, err) @@ -98,39 +100,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } } -func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { - currentPos := rp.notifier.WaitForEvents(req) - - if req.since == types.StreamPosition(0) { - pos, data, err := rp.db.CompleteSync(req.userID, req.limit) - if err != nil { - return nil, err - } - res := types.NewResponse(pos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil - } - +func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) { // TODO: handle ignored users - - data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) - if err != nil { - return nil, err + if req.since == types.StreamPosition(0) { + return rp.db.CompleteSync(req.userID, req.limit) } - - res := types.NewResponse(currentPos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil + return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go new file mode 100644 index 000000000..349b3e272 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -0,0 +1,79 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +// UserStream represents a communication mechanism between the /sync request goroutine +// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and +// goroutines can Broadcast(streamPosition) to other goroutines. +type UserStream struct { + UserID string + // Because this is a Cond, we can notify all waiting goroutines so this works + // across devices for the same user. Protects pos. + cond *sync.Cond + // The position to broadcast to callers of Wait(). + pos types.StreamPosition + // The number of goroutines blocked on Wait() - used for testing and metrics + numWaiting int +} + +// NewUserStream creates a new user stream +func NewUserStream(userID string) *UserStream { + return &UserStream{ + UserID: userID, + cond: sync.NewCond(&sync.Mutex{}), + } +} + +// Wait blocks until there is a new stream position for this user, which is then returned. +// waitAtPos should be the position the stream thinks it should be waiting at. +func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) { + s.cond.L.Lock() + // Before we start blocking, we need to make sure that we didn't race with a call + // to Broadcast() between calling Wait() and actually sleeping. We check the last + // broadcast pos to see if it is newer than the pos we are meant to wait at. If it + // is newer, something has Broadcast to this stream more recently so return immediately. + if s.pos > waitAtPos { + pos = s.pos + s.cond.L.Unlock() + return + } + s.numWaiting++ + s.cond.Wait() + pos = s.pos + s.numWaiting-- + s.cond.L.Unlock() + return +} + +// Broadcast a new stream position for this user. +func (s *UserStream) Broadcast(pos types.StreamPosition) { + s.cond.L.Lock() + s.pos = pos + s.cond.L.Unlock() + s.cond.Broadcast() +} + +// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing. +func (s *UserStream) NumWaiting() int { + s.cond.L.Lock() + defer s.cond.L.Unlock() + return s.numWaiting +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index 8c112936c..a1c3e3c76 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -28,12 +28,6 @@ func (sp StreamPosition) String() string { return strconv.FormatInt(int64(sp), 10) } -// RoomData represents the data for a room suitable for building a sync response from. -type RoomData struct { - State []gomatrixserverlib.Event - RecentEvents []gomatrixserverlib.Event -} - // Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync type Response struct { NextBatch string `json:"next_batch"` @@ -103,7 +97,7 @@ func NewJoinResponse() *JoinResponse { // InviteResponse represents a /sync response for a room which is under the 'invite' key. type InviteResponse struct { InviteState struct { - Events []gomatrixserverlib.ClientEvent + Events []gomatrixserverlib.ClientEvent `json:"events"` } `json:"invite_state"` }