Merge branch 'master' into rob/media-upload

This commit is contained in:
Robert Swain 2017-05-18 12:47:41 +02:00
commit 1057e2e117
15 changed files with 1361 additions and 423 deletions

View file

@ -1,6 +1,6 @@
language: go
go:
- 1.7
- 1.8
sudo: false

View file

@ -78,12 +78,15 @@ func main() {
}
n := sync.NewNotifier(types.StreamPosition(pos))
server, err := consumers.NewServer(cfg, n, db)
if err != nil {
log.Panicf("startup: failed to create sync server: %s", err)
if err := n.Load(db); err != nil {
log.Panicf("startup: failed to set up notifier: %s", err)
}
if err = server.Start(); err != nil {
log.Panicf("startup: failed to start sync server")
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
if err != nil {
log.Panicf("startup: failed to create room server consumer: %s", err)
}
if err = consumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer")
}
log.Info("Starting sync server on ", *bindAddr)

View file

@ -15,6 +15,7 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
@ -26,6 +27,7 @@ import (
"time"
"github.com/matrix-org/dendrite/common/test"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
@ -91,6 +93,7 @@ func defaulting(value, defaultValue string) string {
}
var timeout time.Duration
var clientEventTestData []string
func init() {
var err error
@ -98,6 +101,10 @@ func init() {
if err != nil {
panic(err)
}
for _, s := range outputRoomEventTestData {
clientEventTestData = append(clientEventTestData, clientEventJSONForOutputRoomEvent(s))
}
}
// TODO: dupes roomserver integration tests. Factor out.
@ -125,6 +132,30 @@ func canonicalJSONInput(jsonData []string) []string {
return jsonData
}
// clientEventJSONForOutputRoomEvent parses the given output room event and extracts the 'Event' JSON. It is
// trimmed to the client format and then canonicalised and returned as a string.
// Panics if there are any problems.
func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string {
var out api.OutputRoomEvent
if err := json.Unmarshal([]byte(outputRoomEvent), &out); err != nil {
panic("failed to unmarshal output room event: " + err.Error())
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(out.Event, false)
if err != nil {
panic("failed to convert event field in output room event to Event: " + err.Error())
}
clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{ev}, gomatrixserverlib.FormatSync)
b, err := json.Marshal(clientEvs[0])
if err != nil {
panic("failed to marshal client event as json: " + err.Error())
}
jsonBytes, err := gomatrixserverlib.CanonicalJSON(b)
if err != nil {
panic("failed to turn event json into canonical json: " + err.Error())
}
return string(jsonBytes)
}
// doSyncRequest does a /sync request and returns an error if it fails or doesn't
// return the wanted string.
func doSyncRequest(syncServerURL, want string) error {
@ -156,10 +187,15 @@ func doSyncRequest(syncServerURL, want string) error {
// syncRequestUntilSuccess blocks and performs the same /sync request over and over until
// the response returns the wanted string, where it will close the given channel and return.
// It will keep track of the last error in `lastRequestErr`.
func syncRequestUntilSuccess(done chan error, want string, since string) {
func syncRequestUntilSuccess(done chan error, userID, since, want string) {
for {
sinceQuery := ""
if since != "" {
sinceQuery = "&since=" + since
}
err := doSyncRequest(
"http://"+syncserverAddr+"/api/_matrix/client/r0/sync?access_token=@alice:localhost&since="+since,
// low value timeout so polling with an up-to-date token returns quickly
"http://"+syncserverAddr+"/api/_matrix/client/r0/sync?timeout=100&access_token="+userID+sinceQuery,
want,
)
if err != nil {
@ -172,9 +208,10 @@ func syncRequestUntilSuccess(done chan error, want string, since string) {
}
}
// prepareSyncServer creates the database and config file needed for the sync server to run.
// It also prepares the CLI command to execute.
func prepareSyncServer() *exec.Cmd {
// startSyncServer creates the database and config file needed for the sync server to run and
// then starts the sync server. The Cmd being executed is returned. A channel is also returned,
// which will have any termination errors sent down it, followed immediately by the channel being closed.
func startSyncServer() (*exec.Cmd, chan error) {
if err := createDatabase(testDatabaseName); err != nil {
panic(err)
}
@ -192,23 +229,28 @@ func prepareSyncServer() *exec.Cmd {
)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
return cmd
if err := cmd.Start(); err != nil {
panic("failed to start sync server: " + err.Error())
}
syncServerCmdChan := make(chan error, 1)
go func() {
syncServerCmdChan <- cmd.Wait()
close(syncServerCmdChan)
}()
return cmd, syncServerCmdChan
}
func testSyncServer(input, want []string, since string) {
// Write the logs to kafka so the sync server has some data to work with.
// prepareKafka creates the topics which will be written to by the tests.
func prepareKafka() {
exe.DeleteTopic(inputTopic)
if err := exe.CreateTopic(inputTopic); err != nil {
panic(err)
}
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
panic(err)
}
}
cmd := prepareSyncServer()
if err := cmd.Start(); err != nil {
panic("failed to start sync server: " + err.Error())
}
func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
fmt.Printf("==TESTING== testSyncServer(%s,%s)\n", userID, since)
done := make(chan error, 1)
// We need to wait for the sync server to:
@ -220,314 +262,382 @@ func testSyncServer(input, want []string, since string) {
// We can't even wait for the first valid 200 OK response because it's possible to race
// with consuming the kafka logs (so the /sync response will be missing events and
// therefore fail the test).
go syncRequestUntilSuccess(done, want[0], since)
go syncRequestUntilSuccess(done, userID, since, canonicalJSONInput([]string{want})[0])
// wait for one of:
// - the test to pass (done channel is closed)
// - the sync server to exit with an error (error sent on syncServerCmdChan)
// - our test timeout to expire
// We don't need to clean up since the main() function handles that in the event we panic
var testPassed bool
// wait for the sync server to exit or our test timeout to expire
go func() {
done <- cmd.Wait()
}()
select {
case <-time.After(timeout):
if testPassed {
break
}
fmt.Printf("==TESTING== testSyncServer(%s,%s) TIMEOUT\n", userID, since)
if reqErr := getLastRequestError(); reqErr != nil {
fmt.Println("Last /sync request error:")
fmt.Println(reqErr)
}
if err := cmd.Process.Kill(); err != nil {
panic(err)
}
panic("dendrite-sync-api-server timed out")
case err, open := <-done:
cmd.Process.Kill() // ensure server is dead, only cleaning up so don't care about errors this returns.
if open { // channel is closed on success
case err := <-syncServerCmdChan:
if err != nil {
fmt.Println("=============================================================================================")
fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:")
fmt.Println(" export PGHOST=/var/run/postgresql\n")
fmt.Println("=============================================================================================")
panic(err)
}
case <-done:
testPassed = true
fmt.Printf("==TESTING== testSyncServer(%s,%s) PASSED\n", userID, since)
}
}
func writeToRoomServerLog(indexes ...int) {
var roomEvents []string
for _, i := range indexes {
roomEvents = append(roomEvents, outputRoomEventTestData[i])
}
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(roomEvents)); err != nil {
panic(err)
}
}
// Runs a battery of sync server tests against test data in testdata.go
// testdata.go has a list of OutputRoomEvents which will be fed into the kafka log which the sync server will consume.
// The tests will pause at various points in this list to conduct tests on the /sync responses before continuing.
// For ease of understanding, the curl commands used to create the OutputRoomEvents are listed along with each write to kafka.
func main() {
fmt.Println("==TESTING==", os.Args[0])
// room creation for @alice:localhost
input := []string{
`{
"Event": {
"auth_events": [],
"content": {
"creator": "@alice:localhost"
},
"depth": 1,
"event_id": "$rOaxKSu6K1s0nOsW:localhost",
"hashes": {
"sha256": "g1QC1jZauIcVw+HCGizUqlUaLSmAkEGwGmIcLac5TKk"
},
"origin": "localhost",
"origin_server_ts": 1493908927170,
"prev_events": [],
"room_id": "!gnrFfNAK7yGBWXFd:localhost",
"sender": "@alice:localhost",
"signatures": {
"localhost": {
"ed25519:something": "WCaImDmpkhNCCoUyRHcrV93SeJpJbq34yWbtjBgNNXVJaoiLSTys6t/gCvVqNYfX6Dt9c+z/sx5LikOLmLm1Dg"
}
},
"state_key": "",
"type": "m.room.create"
},
"VisibilityEventIDs": null,
"LatestEventIDs": ["$rOaxKSu6K1s0nOsW:localhost"],
"AddsStateEventIDs": ["$rOaxKSu6K1s0nOsW:localhost"],
"RemovesStateEventIDs": null,
"LastSentEventID": ""
}`,
`{
"Event": {
"auth_events": [
["$rOaxKSu6K1s0nOsW:localhost", {
"sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg"
}]
],
"content": {
"membership": "join"
},
"depth": 2,
"event_id": "$uEDYwFpBO936HTfM:localhost",
"hashes": {
"sha256": "y5AQAnnzremC678QTIFEi677wdbMwluPiweZnuvUmz0"
},
"origin": "localhost",
"origin_server_ts": 1493908927170,
"prev_events": [
["$rOaxKSu6K1s0nOsW:localhost", {
"sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg"
}]
],
"room_id": "!gnrFfNAK7yGBWXFd:localhost",
"sender": "@alice:localhost",
"signatures": {
"localhost": {
"ed25519:something": "5Pl8GkgcyUu2QY7T38OkuufVQQV13f0kl2PLFI2OILBIcy0XPf8hSaFclemYckoo2nRgffIzsHO/ZgqfoBu0BA"
}
},
"state_key": "@alice:localhost",
"type": "m.room.member"
},
"VisibilityEventIDs": null,
"LatestEventIDs": ["$uEDYwFpBO936HTfM:localhost"],
"AddsStateEventIDs": ["$uEDYwFpBO936HTfM:localhost"],
"RemovesStateEventIDs": null,
"LastSentEventID": "$rOaxKSu6K1s0nOsW:localhost"
}`,
`{
"Event": {
"auth_events": [
["$rOaxKSu6K1s0nOsW:localhost", {
"sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg"
}],
["$uEDYwFpBO936HTfM:localhost", {
"sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU"
}]
],
"content": {
"ban": 50,
"events": {
"m.room.avatar": 50,
"m.room.canonical_alias": 50,
"m.room.history_visibility": 100,
"m.room.name": 50,
"m.room.power_levels": 100
},
"events_default": 0,
"invite": 0,
"kick": 50,
"redact": 50,
"state_default": 50,
"users": {
"@alice:localhost": 100
},
"users_default": 0
},
"depth": 3,
"event_id": "$Axp7qdQXf0bz7zBy:localhost",
"hashes": {
"sha256": "oObDsGkeVtQgyVPauoLIqk+J+Jsz6HOol79uRMTRFFM"
},
"origin": "localhost",
"origin_server_ts": 1493908927171,
"prev_events": [
["$uEDYwFpBO936HTfM:localhost", {
"sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU"
}]
],
"room_id": "!gnrFfNAK7yGBWXFd:localhost",
"sender": "@alice:localhost",
"signatures": {
"localhost": {
"ed25519:something": "3kV1Wm2E1zUPQ8YUIC1x/8ks1SGvXE0olQ+b0BRMJm7fduY2fNcb/4A4aKbQLRtOwvCNUVuqQkkkdp1Zor1LCw"
}
},
"state_key": "",
"type": "m.room.power_levels"
},
"VisibilityEventIDs": null,
"LatestEventIDs": ["$Axp7qdQXf0bz7zBy:localhost"],
"AddsStateEventIDs": ["$Axp7qdQXf0bz7zBy:localhost"],
"RemovesStateEventIDs": null,
"LastSentEventID": "$uEDYwFpBO936HTfM:localhost"
}`,
`{
"Event": {
"auth_events": [
["$rOaxKSu6K1s0nOsW:localhost", {
"sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg"
}],
["$Axp7qdQXf0bz7zBy:localhost", {
"sha256": "5KIh9uRcgXuiYdO965JSfIOSGeMrasf8N9eEzxisErI"
}],
["$uEDYwFpBO936HTfM:localhost", {
"sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU"
}]
],
"content": {
"join_rule": "public"
},
"depth": 4,
"event_id": "$zCgCrw3aZwVaKm34:localhost",
"hashes": {
"sha256": "KmJ7wAUznMy74MhAB3iDsBdFAkGypWXamDDQeLVzp1w"
},
"origin": "localhost",
"origin_server_ts": 1493908927172,
"prev_events": [
["$Axp7qdQXf0bz7zBy:localhost", {
"sha256": "5KIh9uRcgXuiYdO965JSfIOSGeMrasf8N9eEzxisErI"
}]
],
"room_id": "!gnrFfNAK7yGBWXFd:localhost",
"sender": "@alice:localhost",
"signatures": {
"localhost": {
"ed25519:something": "BkqU/1QARxNWEDfgKenvrhhGd6nmNZYHugHB0kFqUSQRZo+RV/zThLA0FxMXfmbGqfJdi1wXmxIR3QIwvGuhCg"
}
},
"state_key": "",
"type": "m.room.join_rules"
},
"VisibilityEventIDs": null,
"LatestEventIDs": ["$zCgCrw3aZwVaKm34:localhost"],
"AddsStateEventIDs": ["$zCgCrw3aZwVaKm34:localhost"],
"RemovesStateEventIDs": null,
"LastSentEventID": "$Axp7qdQXf0bz7zBy:localhost"
}`,
`{
"Event": {
"auth_events": [
["$rOaxKSu6K1s0nOsW:localhost", {
"sha256": "XFb+VOx/74T3RPw2PXTY4AXDZaEy8uLCSFuHCK4XYHg"
}],
["$Axp7qdQXf0bz7zBy:localhost", {
"sha256": "5KIh9uRcgXuiYdO965JSfIOSGeMrasf8N9eEzxisErI"
}],
["$uEDYwFpBO936HTfM:localhost", {
"sha256": "3z+JL3VmTtVROucpsrEWkxNVzn8ZOP2I1jU362pQIUU"
}]
],
"content": {
"history_visibility": "joined"
},
"depth": 5,
"event_id": "$0NUtdnY7KWMhOR9E:localhost",
"hashes": {
"sha256": "9CBp3jcnGKzoKCVYRCFCoe0CJ8IfZZAOhudAoDr2jqU"
},
"origin": "localhost",
"origin_server_ts": 1493908927174,
"prev_events": [
["$zCgCrw3aZwVaKm34:localhost", {
"sha256": "8kNj8j5K6YFWpFa0CLy1pR5Lp9nao0X6TW2iUIya2Tc"
}]
],
"room_id": "!gnrFfNAK7yGBWXFd:localhost",
"sender": "@alice:localhost",
"signatures": {
"localhost": {
"ed25519:something": "92Dz7JXAxuc87L3+jMps0HC6Z4V5PhMZQIomI8Dod/im1bkfhYUPMOF5EWWMGMDSq+mSpJPVizWAIGa8bIFcDA"
}
},
"state_key": "",
"type": "m.room.history_visibility"
},
"VisibilityEventIDs": null,
"LatestEventIDs": ["$0NUtdnY7KWMhOR9E:localhost"],
"AddsStateEventIDs": ["$0NUtdnY7KWMhOR9E:localhost"],
"RemovesStateEventIDs": null,
"LastSentEventID": "$zCgCrw3aZwVaKm34:localhost"
}`,
}
since := "3"
want := []string{
`{
"next_batch": "5",
prepareKafka()
cmd, syncServerCmdChan := startSyncServer()
defer cmd.Process.Kill() // ensure server is dead, only cleaning up so don't care about errors this returns.
// $ curl -XPOST -d '{}' "http://localhost:8009/_matrix/client/r0/createRoom?access_token=@alice:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@alice:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 2"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 3"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
// $ curl -XPUT -d '{"name":"Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
writeToRoomServerLog(
i0StateRoomCreate, i1StateAliceJoin, i2StatePowerLevels, i3StateJoinRules, i4StateHistoryVisibility,
i5AliceMsg, i6AliceMsg, i7AliceMsg, i8StateAliceRoomName,
)
// Make sure initial sync works TODO: prev_batch
testSyncServer(syncServerCmdChan, "@alice:localhost", "", `{
"account_data": {
"events": []
},
"next_batch": "9",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {
"!gnrFfNAK7yGBWXFd:localhost": {
"state": {
"events": [{
"content": {
"join_rule": "public"
},
"event_id": "$zCgCrw3aZwVaKm34:localhost",
"origin_server_ts": 1493908927172,
"sender": "@alice:localhost",
"state_key": "",
"type": "m.room.join_rules"
}]
},
"timeline": {
"events": [{
"content": {
"join_rule": "public"
},
"event_id": "$zCgCrw3aZwVaKm34:localhost",
"origin_server_ts": 1493908927172,
"sender": "@alice:localhost",
"state_key": "",
"type": "m.room.join_rules"
}, {
"content": {
"history_visibility": "joined"
},
"event_id": "$0NUtdnY7KWMhOR9E:localhost",
"origin_server_ts": 1493908927174,
"sender": "@alice:localhost",
"state_key": "",
"type": "m.room.history_visibility"
}],
"limited": false,
"prev_batch": ""
"!PjrbIMW2cIiaYF4t:localhost": {
"account_data": {
"events": []
},
"ephemeral": {
"events": []
},
"state": {
"events": []
},
"timeline": {
"events": [`+
clientEventTestData[i0StateRoomCreate]+","+
clientEventTestData[i1StateAliceJoin]+","+
clientEventTestData[i2StatePowerLevels]+","+
clientEventTestData[i3StateJoinRules]+","+
clientEventTestData[i4StateHistoryVisibility]+","+
clientEventTestData[i5AliceMsg]+","+
clientEventTestData[i6AliceMsg]+","+
clientEventTestData[i7AliceMsg]+","+
clientEventTestData[i8StateAliceRoomName]+`],
"limited": true,
"prev_batch": ""
}
}
},
"leave": {}
}
}`)
// Make sure alice's rooms don't leak to bob
testSyncServer(syncServerCmdChan, "@bob:localhost", "", `{
"account_data": {
"events": []
},
"next_batch": "9",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {},
"leave": {}
}
}`)
// Make sure polling with an up-to-date token returns nothing new
testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{
"account_data": {
"events": []
},
"next_batch": "9",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {},
"leave": {}
}
}`)
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
writeToRoomServerLog(i9StateBobJoin)
// Make sure alice sees it TODO: prev_batch
testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{
"account_data": {
"events": []
},
"next_batch": "10",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {
"!PjrbIMW2cIiaYF4t:localhost": {
"account_data": {
"events": []
},
"ephemeral": {
"events": []
},
"state": {
"events": []
},
"timeline": {
"limited": false,
"prev_batch": "",
"events": [`+clientEventTestData[i9StateBobJoin]+`]
}
}
},
"leave": {}
}
}`)
// Make sure bob sees the room AND all the current room state TODO: history visibility
testSyncServer(syncServerCmdChan, "@bob:localhost", "9", `{
"account_data": {
"events": []
},
"next_batch": "10",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {
"!PjrbIMW2cIiaYF4t:localhost": {
"account_data": {
"events": []
},
"ephemeral": {
"events": []
},
"state": {
"events": [`+
clientEventTestData[i0StateRoomCreate]+","+
clientEventTestData[i1StateAliceJoin]+","+
clientEventTestData[i2StatePowerLevels]+","+
clientEventTestData[i3StateJoinRules]+","+
clientEventTestData[i4StateHistoryVisibility]+","+
clientEventTestData[i8StateAliceRoomName]+`]
},
"timeline": {
"limited": false,
"prev_batch": "",
"events": [`+
clientEventTestData[i9StateBobJoin]+`]
}
}
},
"leave": {}
}
}`)
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost"
writeToRoomServerLog(i10BobMsg)
// Make sure alice can see everything around the join point for bob TODO: prev_batch
testSyncServer(syncServerCmdChan, "@alice:localhost", "7", `{
"account_data": {
"events": []
},
"next_batch": "11",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {
"!PjrbIMW2cIiaYF4t:localhost": {
"account_data": {
"events": []
},
"ephemeral": {
"events": []
},
"state": {
"events": []
},
"timeline": {
"limited": false,
"prev_batch": "",
"events": [`+
clientEventTestData[i7AliceMsg]+","+
clientEventTestData[i8StateAliceRoomName]+","+
clientEventTestData[i9StateBobJoin]+","+
clientEventTestData[i10BobMsg]+`]
}
}
},
"leave": {}
}
}`)
// $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost"
writeToRoomServerLog(i11StateAliceRoomName, i12AliceMsg, i13StateBobInviteCharlie)
// Make sure charlie sees the invite both with and without a ?since= token
// TODO: Invite state should include the invite event and the room name.
charlieInviteData := `{
"account_data": {
"events": []
},
"next_batch": "14",
"presence": {
"events": []
},
"rooms": {
"invite": {
"!PjrbIMW2cIiaYF4t:localhost": {
"invite_state": {
"events": []
}
}
},
"invite": {},
"join": {},
"leave": {}
}
}`,
}
want = canonicalJSONInput(want)
testSyncServer(input, want, since)
}`
testSyncServer(syncServerCmdChan, "@charlie:localhost", "7", charlieInviteData)
testSyncServer(syncServerCmdChan, "@charlie:localhost", "", charlieInviteData)
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
writeToRoomServerLog(i14StateCharlieJoin, i15AliceMsg, i16StateAliceKickCharlie, i17BobMsg)
// Check transitions to leave work
testSyncServer(syncServerCmdChan, "@charlie:localhost", "15", `{
"account_data": {
"events": []
},
"next_batch": "18",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {},
"leave": {
"!PjrbIMW2cIiaYF4t:localhost": {
"state": {
"events": []
},
"timeline": {
"limited": false,
"prev_batch": "",
"events": [`+
clientEventTestData[i15AliceMsg]+","+
clientEventTestData[i16StateAliceKickCharlie]+`]
}
}
}
}
}`)
// Test joining and leaving the same room in a single /sync request puts the room in the 'leave' section.
// TODO: Use an earlier since value to assert that the /sync response doesn't leak messages
// from before charlie was joined to the room. Currently it does leak because RecentEvents doesn't
// take membership into account.
testSyncServer(syncServerCmdChan, "@charlie:localhost", "14", `{
"account_data": {
"events": []
},
"next_batch": "18",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {},
"leave": {
"!PjrbIMW2cIiaYF4t:localhost": {
"state": {
"events": []
},
"timeline": {
"limited": false,
"prev_batch": "",
"events": [`+
clientEventTestData[i14StateCharlieJoin]+","+
clientEventTestData[i15AliceMsg]+","+
clientEventTestData[i16StateAliceKickCharlie]+`]
}
}
}
}
}`)
// $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
writeToRoomServerLog(i18StateAliceRoomName)
// Check that users don't see state changes in rooms after they have left
testSyncServer(syncServerCmdChan, "@charlie:localhost", "17", `{
"account_data": {
"events": []
},
"next_batch": "19",
"presence": {
"events": []
},
"rooms": {
"invite": {},
"join": {},
"leave": {}
}
}`)
// $ curl -XPUT -d '{"msgtype":"m.text","body":"whatever"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"im alone now"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@alice:localhost"
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"so alone"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
// $ curl -XPUT -d '{"name":"Everyone welcome"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hiiiii"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@charlie:localhost"
}

View file

@ -0,0 +1,101 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
const (
i0StateRoomCreate = iota
i1StateAliceJoin
i2StatePowerLevels
i3StateJoinRules
i4StateHistoryVisibility
i5AliceMsg
i6AliceMsg
i7AliceMsg
i8StateAliceRoomName
i9StateBobJoin
i10BobMsg
i11StateAliceRoomName
i12AliceMsg
i13StateBobInviteCharlie
i14StateCharlieJoin
i15AliceMsg
i16StateAliceKickCharlie
i17BobMsg
i18StateAliceRoomName
i19BobMsg
i20StateBobLeave
i21AliceMsg
i22StateAliceInviteBob
i23StateBobRejectInvite
i24AliceMsg
i25StateAliceRoomName
i26StateCharlieJoin
i27CharlieMsg
)
var outputRoomEventTestData = []string{
// $ curl -XPOST -d '{}' "http://localhost:8009/_matrix/client/r0/createRoom?access_token=@alice:localhost"
`{"Event":{"auth_events":[],"content":{"creator":"@alice:localhost"},"depth":1,"event_id":"$xz0fUB8zNMTGFh1W:localhost","hashes":{"sha256":"KKkpxS8NoH0igBbL3J+nJ39MRlmA7QgW4BGL7Fv4ASI"},"origin":"localhost","origin_server_ts":1494411218382,"prev_events":[],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"uZG5Q/Hs2Z611gFlZPdwomomRJKf70xV2FQV+gLWM1XgzkLDRlRF3cBZc9y3CnHKnV/upTcXs7Op2/GmgD3UBw"}},"state_key":"","type":"m.room.create"},"VisibilityEventIDs":null,"LatestEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"AddsStateEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":""}`,
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}]],"content":{"membership":"join"},"depth":2,"event_id":"$QTen1vksfcRTpUCk:localhost","hashes":{"sha256":"tTukc9ab1fJfzgc5EMA/UD3swqfl/ic9Y9Zkt4fJo0Q"},"origin":"localhost","origin_server_ts":1494411218385,"prev_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"OPysDn/wT7yHeALXLTcEgR+iaKjv0p7VPuR/Mzvyg2IMAwPUjSOw8SQZlhSioWRtVPUp9VHbhIhJxQaPUg9yBQ"}},"state_key":"@alice:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$QTen1vksfcRTpUCk:localhost"],"AddsStateEventIDs":["$QTen1vksfcRTpUCk:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$xz0fUB8zNMTGFh1W:localhost"}`,
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"content":{"ban":50,"events":{"m.room.avatar":50,"m.room.canonical_alias":50,"m.room.history_visibility":100,"m.room.name":50,"m.room.power_levels":100},"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":{"@alice:localhost":100},"users_default":0},"depth":3,"event_id":"$RWsxGlfPHAcijTgu:localhost","hashes":{"sha256":"ueZWiL/Q8bagRQGFktpnYJAJV6V6U3QKcUEmWYeyaaM"},"origin":"localhost","origin_server_ts":1494411218385,"prev_events":[["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"hZwWx3lyW61zMYmqLOxLTlfW2CnbjJQsZPLjZFa97TVG4ISz8CixMPsnVAIu5is29UCmiHyP8RvLecJjbLCtAQ"}},"state_key":"","type":"m.room.power_levels"},"VisibilityEventIDs":null,"LatestEventIDs":["$RWsxGlfPHAcijTgu:localhost"],"AddsStateEventIDs":["$RWsxGlfPHAcijTgu:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$QTen1vksfcRTpUCk:localhost"}`,
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"content":{"join_rule":"public"},"depth":4,"event_id":"$2O2DpHB37CuwwJOe:localhost","hashes":{"sha256":"3P3HxAXI8gc094i020EoV/gissYiMVWv8+JAbrakM4E"},"origin":"localhost","origin_server_ts":1494411218386,"prev_events":[["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"L2yZoBbG/6TNsRHz+UtHY0SK4FgrdAYPR1l7RBWaNFbm+k/7kVhnoGlJ9yptpdLJjPMR2InqKXH8BBxRC83BCg"}},"state_key":"","type":"m.room.join_rules"},"VisibilityEventIDs":null,"LatestEventIDs":["$2O2DpHB37CuwwJOe:localhost"],"AddsStateEventIDs":["$2O2DpHB37CuwwJOe:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$RWsxGlfPHAcijTgu:localhost"}`,
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}]],"content":{"history_visibility":"joined"},"depth":5,"event_id":"$5LRiBskVCROnL5WY:localhost","hashes":{"sha256":"341alVufcKSVKLPr9WsJNTnW33QkBTn9eTfVWbyoa0o"},"origin":"localhost","origin_server_ts":1494411218387,"prev_events":[["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"kRyt68cstwYgK8NtYzf0V5CnAbqUO47ixCCWYzRCi0WNstEwUw4XW1GHc8BllQsXwSj+nNv9g/66zZgG0DtxCA"}},"state_key":"","type":"m.room.history_visibility"},"VisibilityEventIDs":null,"LatestEventIDs":["$5LRiBskVCROnL5WY:localhost"],"AddsStateEventIDs":["$5LRiBskVCROnL5WY:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$2O2DpHB37CuwwJOe:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello world","msgtype":"m.text"},"depth":0,"event_id":"$Z8ZJik7ghwzSYTH9:localhost","hashes":{"sha256":"ahN1T5aiSZCzllf0pqNWJkF+x2h2S3kic+40pQ1X6BE"},"origin":"localhost","origin_server_ts":1494411339207,"prev_events":[["$5LRiBskVCROnL5WY:localhost",{"sha256":"3jULNC9b9Q0AhvnDQqpjhbtYwmkioHzPzdTJZvn8vOI"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"ylEpahRwEfGpqk+UCv0IF8YAxmut7w7udgHy3sVDfdJhs/4uJ6EkFEsKLknpXRc1vTIy1etKCBQ63QbCmRC2Bw"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$Z8ZJik7ghwzSYTH9:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$5LRiBskVCROnL5WY:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 2"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello world 2","msgtype":"m.text"},"depth":0,"event_id":"$8382Ah682eL4hxjN:localhost","hashes":{"sha256":"hQElDGSYc6KOdylrbMMm3+LlvUiCKo6S9G9n58/qtns"},"origin":"localhost","origin_server_ts":1494411380282,"prev_events":[["$Z8ZJik7ghwzSYTH9:localhost",{"sha256":"FBDwP+2FeqDENe7AEa3iAFAVKl1/IVq43mCH0uPRn90"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"LFXi6jTG7qn9xzi4rhIiHbkLD+4AZ9Yg7UTS2gqm1gt2lXQsgTYH1wE4Fol2fq4lvGlQVpxhtEr2huAYSbT7DA"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$8382Ah682eL4hxjN:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$Z8ZJik7ghwzSYTH9:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 3"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello world 3","msgtype":"m.text"},"depth":0,"event_id":"$17SfHsvSeTQthSWF:localhost","hashes":{"sha256":"eS6VFQI0l2U8rA8U17jgSHr9lQ73SNSnlnZu+HD0IjE"},"origin":"localhost","origin_server_ts":1494411396560,"prev_events":[["$8382Ah682eL4hxjN:localhost",{"sha256":"c6I/PUY7WnvxQ+oUEp/w2HEEuD3g8Vq7QwPUOSUjuc8"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"dvu9bSHZmX+yZoEqHioK7YDMtLH9kol0DdFqc5aHsbhZe/fKRZpfJMrlf1iXQdXSCMhikvnboPAXN3guiZCUBQ"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$17SfHsvSeTQthSWF:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$8382Ah682eL4hxjN:localhost"}`,
// $ curl -XPUT -d '{"name":"Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"Custom Room Name"},"depth":0,"event_id":"$j7KtuOzM0K15h3Kr:localhost","hashes":{"sha256":"QIKj5Klr50ugll4EjaNUATJmrru4CDp6TvGPv0v15bo"},"origin":"localhost","origin_server_ts":1494411482625,"prev_events":[["$17SfHsvSeTQthSWF:localhost",{"sha256":"iMTefewJ4W5sKQy7osQv4ilJAi7X0NsK791kqEUmYX0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"WU7lwSWUAk7bsyDnBs128PyXxPZZoD1sN4AiDcvk+W1mDezJbFvWHDWymclxWESlP7TDrFTZEumRWGGCakjyAg"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$j7KtuOzM0K15h3Kr:localhost"],"AddsStateEventIDs":["$j7KtuOzM0K15h3Kr:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$17SfHsvSeTQthSWF:localhost"}`,
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}]],"content":{"membership":"join"},"depth":0,"event_id":"$wPepDhIla765Odre:localhost","hashes":{"sha256":"KeKqWLvM+LTvyFbwx6y3Y4W5Pj6nBSFUQ6jpkSf1oTE"},"origin":"localhost","origin_server_ts":1494411534290,"prev_events":[["$j7KtuOzM0K15h3Kr:localhost",{"sha256":"oDrWG5/sy1Ea3hYDOSJZRuGKCcjaHQlDYPDn2gB0/L0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"oVtvjZbWFe+iJhoDvLcQKnFpSYQ94dOodM4gGsx26P6fs2sFJissYwSIqpoxlElCJnmBAgy5iv4JK/5x21R2CQ"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$wPepDhIla765Odre:localhost"],"AddsStateEventIDs":["$wPepDhIla765Odre:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$j7KtuOzM0K15h3Kr:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"hello alice","msgtype":"m.text"},"depth":0,"event_id":"$RHNjeYUvXVZfb93t:localhost","hashes":{"sha256":"Ic1QLxTWFrWt1o31DS93ftrNHkunf4O6ubFvdD4ydNI"},"origin":"localhost","origin_server_ts":1494411593196,"prev_events":[["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"8BHHkiThWwiIZbXCegRjIKNVGIa2kqrZW8VuL7nASfJBORhZ9R9p34UsmhsxVwTs/2/dX7M2ogMB28gIGdLQCg"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$RHNjeYUvXVZfb93t:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$wPepDhIla765Odre:localhost"}`,
// $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"A Different Custom Room Name"},"depth":0,"event_id":"$1xoUuqOFjFFJgwA5:localhost","hashes":{"sha256":"2pNnLhoHxNeSUpqxrd3c0kZUA4I+cdWZgYcJ8V3e2tk"},"origin":"localhost","origin_server_ts":1494411643348,"prev_events":[["$RHNjeYUvXVZfb93t:localhost",{"sha256":"LqFmTIzULgUDSf5xM3REObvnsRGLQliWBUf1hEDT4+w"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"gsY4B6TIBdVvLyFAaXw0xez9N5/Cn/ZaJ4z+j9gJU/ZR8j1t3OYlcVQN6uln9JwEU1k20AsGnIqvOaayd+bfCg"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$1xoUuqOFjFFJgwA5:localhost"],"AddsStateEventIDs":["$1xoUuqOFjFFJgwA5:localhost"],"RemovesStateEventIDs":["$j7KtuOzM0K15h3Kr:localhost"],"LastSentEventID":"$RHNjeYUvXVZfb93t:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"hello bob","msgtype":"m.text"},"depth":0,"event_id":"$4NBTdIwDxq5fDGpv:localhost","hashes":{"sha256":"msCIESAya8kD7nLCopxkEqrgVuGfrlr9YBIADH5czTA"},"origin":"localhost","origin_server_ts":1494411674630,"prev_events":[["$1xoUuqOFjFFJgwA5:localhost",{"sha256":"ZXj+kY6sqQpf5vsNqvCMSvNoXXKDKxRE4R7+gZD9Tkk"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"bZRT3NxVlfBWw1PxSlKlgfnJixG+NI5H9QmUK2AjECg+l887BZJNCvAK0eD27N8e9V+c2glyXWYje2wexP2CBw"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$4NBTdIwDxq5fDGpv:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$1xoUuqOFjFFJgwA5:localhost"}`,
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"membership":"invite"},"depth":0,"event_id":"$zzLHVlHIWPrnE7DI:localhost","hashes":{"sha256":"LKk7tnYJAHsyffbi9CzfdP+TU4KQ5g6YTgYGKjJ7NxU"},"origin":"localhost","origin_server_ts":1494411709192,"prev_events":[["$4NBTdIwDxq5fDGpv:localhost",{"sha256":"EpqmxEoJP93Zb2Nt2fS95SJWTqqIutHm/Ne8OHqp6Ps"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"GdUzkC+7YKl1XDi7kYuD39yi2L/+nv+YrecIQHS+0BLDQqnEj+iRXfNBuZfTk6lUBCJCHXZlk7MnEIjvWDlZCg"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$zzLHVlHIWPrnE7DI:localhost"],"AddsStateEventIDs":["$zzLHVlHIWPrnE7DI:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":"$4NBTdIwDxq5fDGpv:localhost"}`,
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$uJVKyzZi8ZX0kOd9:localhost"],"AddsStateEventIDs":["$uJVKyzZi8ZX0kOd9:localhost"],"RemovesStateEventIDs":["$zzLHVlHIWPrnE7DI:localhost"],"LastSentEventID":"$zzLHVlHIWPrnE7DI:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"not charlie...","msgtype":"m.text"},"depth":0,"event_id":"$Ixfn5WT9ocWTYxfy:localhost","hashes":{"sha256":"hRChdyMQ3AY4jvrPpI8PEX6Taux83Qo5hdSeHlhPxGo"},"origin":"localhost","origin_server_ts":1494411792737,"prev_events":[["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"LC/Zqwu/XdqjmLdTOp/NQaFaE0niSAGgEpa39gCxsnsqEX80P7P5WDn/Kzx6rjWTnhIszrLsnoycqkXQT0Z4DQ"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$Ixfn5WT9ocWTYxfy:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$uJVKyzZi8ZX0kOd9:localhost"}`,
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$om1F4AI8tCYlHUSp:localhost"],"AddsStateEventIDs":["$om1F4AI8tCYlHUSp:localhost"],"RemovesStateEventIDs":["$uJVKyzZi8ZX0kOd9:localhost"],"LastSentEventID":"$Ixfn5WT9ocWTYxfy:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"why did you kick charlie","msgtype":"m.text"},"depth":0,"event_id":"$hgao5gTmr3r9TtK2:localhost","hashes":{"sha256":"Aa2ZCrvwjX5xhvkVqIOFUeEGqrnrQZjjNFiZRybjsPY"},"origin":"localhost","origin_server_ts":1494411912809,"prev_events":[["$om1F4AI8tCYlHUSp:localhost",{"sha256":"yVs+CW7AiJrJOYouL8xPIBrtIHAhnbxaegna8MxeCto"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"sGkpbEXGsvAuCvE3wb5E9H5fjCVKpRdWNt6csj1bCB9Fmg4Rg4mvj3TAJ+91DjO8IPsgSxDKdqqRYF0OtcynBA"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$hgao5gTmr3r9TtK2:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$om1F4AI8tCYlHUSp:localhost"}`,
// $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"No Charlies"},"depth":0,"event_id":"$CY4XDoxjbns3a4Pc:localhost","hashes":{"sha256":"chk72pVkp3AGR2FtdC0mORBWS1b9ePnRN4WK3BP0BiI"},"origin":"localhost","origin_server_ts":1494411959114,"prev_events":[["$hgao5gTmr3r9TtK2:localhost",{"sha256":"/4/OG4Q2YalIeBtN76BEPIieBKA/3UFshR9T+WJip4o"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"mapvA3KJYgw5FmzJMhSFa/+JSuNyv2eKAkiGomAeBB7LQ1e9nK9XhW/Fp7a5Z2Sy2ENwHyd3ij7FEGiLOnSIAw"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$CY4XDoxjbns3a4Pc:localhost"],"AddsStateEventIDs":["$CY4XDoxjbns3a4Pc:localhost"],"RemovesStateEventIDs":["$1xoUuqOFjFFJgwA5:localhost"],"LastSentEventID":"$hgao5gTmr3r9TtK2:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"whatever"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"whatever","msgtype":"m.text"},"depth":0,"event_id":"$pl8VBHRPYDmsnDh4:localhost","hashes":{"sha256":"FYqY9+/cepwIxxjfFV3AjOFBXkTlyEI2jep87dUc+SU"},"origin":"localhost","origin_server_ts":1494411988548,"prev_events":[["$CY4XDoxjbns3a4Pc:localhost",{"sha256":"hCoV63fp8eiquVdEefsOqJtLmJhw4wTlRv+wNTS20Ac"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"sQKwRzE59eZyb8rDySo/pVwZXBh0nA5zx+kjEyXglxIQrTre+8Gj3R7Prni+RE3Dq7oWfKYV7QklTLURAaSICQ"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$pl8VBHRPYDmsnDh4:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$CY4XDoxjbns3a4Pc:localhost"}`,
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$acCW4IgnBo8YD3jw:localhost","hashes":{"sha256":"porP+E2yftBGjfS381+WpZeDM9gZHsM3UydlBcRKBLw"},"origin":"localhost","origin_server_ts":1494412037042,"prev_events":[["$pl8VBHRPYDmsnDh4:localhost",{"sha256":"b+qQ380JDFq7quVU9EbIJ2sbpUKM1LAUNX0ZZUoVMZw"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"kxbjTIC0/UR4cOYUAOTNiUc0SSVIF4BY6Rq6IEgYJemq4jcU2fYqum4mFxIQTDKKXMSRHEoNPDmYMFIJwkrsCg"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$acCW4IgnBo8YD3jw:localhost"],"AddsStateEventIDs":["$acCW4IgnBo8YD3jw:localhost"],"RemovesStateEventIDs":["$wPepDhIla765Odre:localhost"],"LastSentEventID":"$pl8VBHRPYDmsnDh4:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"im alone now"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"im alone now","msgtype":"m.text"},"depth":0,"event_id":"$nYdEXrvTDeb7DfkC:localhost","hashes":{"sha256":"qibC5NmlJpSRMBWSWxy1pv73FXymhPDXQFMmGosfsV0"},"origin":"localhost","origin_server_ts":1494412084668,"prev_events":[["$acCW4IgnBo8YD3jw:localhost",{"sha256":"8h3uXoE6pnI9iLnXI6493qJ0HeuRQfenRIu9PcgH72g"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"EHRoZznhXywhYeIn83o4FSFm3No/aOdLQPHQ68YGtNgESWwpuWLkkGVjoISjz3QgXQ06Fl3cHt7nlTaAHpCNAg"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$nYdEXrvTDeb7DfkC:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$acCW4IgnBo8YD3jw:localhost"}`,
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$acCW4IgnBo8YD3jw:localhost",{"sha256":"8h3uXoE6pnI9iLnXI6493qJ0HeuRQfenRIu9PcgH72g"}]],"content":{"membership":"invite"},"depth":0,"event_id":"$gKNfcXLlWvs2cFad:localhost","hashes":{"sha256":"iYDOUjYkaGSFbVp7TRVFvGJyGMEuBHMQrJ9XqwhzmPI"},"origin":"localhost","origin_server_ts":1494412135845,"prev_events":[["$nYdEXrvTDeb7DfkC:localhost",{"sha256":"83T5Q3+nDvtS0oJTEhHxIw02twBDa1A7QR2bHtnxv1Y"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"ofw009aMJMqVjww9eDXgeTjOQqSlJl/GN/AAb+6mZAPcUI8aVgRlXOSESfhu1ONEuV/yNUycxNXWfMwuvoWsDg"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$gKNfcXLlWvs2cFad:localhost"],"AddsStateEventIDs":["$gKNfcXLlWvs2cFad:localhost"],"RemovesStateEventIDs":["$acCW4IgnBo8YD3jw:localhost"],"LastSentEventID":"$nYdEXrvTDeb7DfkC:localhost"}`,
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$gKNfcXLlWvs2cFad:localhost",{"sha256":"/TYIY+L9qjg516Bzl8sadu+Np21KkxE4KdPXALeJ9eE"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$B2q9Tepb6Xc1Rku0:localhost","hashes":{"sha256":"RbHTVdceAEfTALQDZdGrOmakKeTYnChaKjlVuoNUdSY"},"origin":"localhost","origin_server_ts":1494412187614,"prev_events":[["$gKNfcXLlWvs2cFad:localhost",{"sha256":"/TYIY+L9qjg516Bzl8sadu+Np21KkxE4KdPXALeJ9eE"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"dNtUL86j2zUe5+DkfOkil5VujvFZg4FeTjbtcpeF+3E4SUChCAG3lyR6YOAIYBnjtD0/kqT7OcP3pM6vMEp1Aw"}},"state_key":"@bob:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$B2q9Tepb6Xc1Rku0:localhost"],"AddsStateEventIDs":["$B2q9Tepb6Xc1Rku0:localhost"],"RemovesStateEventIDs":["$gKNfcXLlWvs2cFad:localhost"],"LastSentEventID":"$gKNfcXLlWvs2cFad:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"so alone"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"so alone","msgtype":"m.text"},"depth":0,"event_id":"$W1nrYHQIbCTTSJOV:localhost","hashes":{"sha256":"uUKSa4U1coDoT3LUcNF25dt+UpUa2pLXzRJ3ljgxXZs"},"origin":"localhost","origin_server_ts":1494412229742,"prev_events":[["$B2q9Tepb6Xc1Rku0:localhost",{"sha256":"0CLru7nGPgyF9AWlZnarCElscSVrXl2MMY2atrz80Uc"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"YlBJyDnE34UhaCB9hirQN5OySfTDoqiBDnNvxomXjU94z4a8g2CLWKjApwd/q/j4HamCUtjgkjJ2um6hNjsVBA"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$W1nrYHQIbCTTSJOV:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$B2q9Tepb6Xc1Rku0:localhost"}`,
// $ curl -XPUT -d '{"name":"Everyone welcome"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"name":"Everyone welcome"},"depth":0,"event_id":"$nLzxoBC4A0QRvJ1k:localhost","hashes":{"sha256":"PExCybjaMW1TfgFr57MdIRYJ642FY2jnrdW/tpPOf1Y"},"origin":"localhost","origin_server_ts":1494412294551,"prev_events":[["$W1nrYHQIbCTTSJOV:localhost",{"sha256":"HXk/ACcsiaZ/z1f2aZSIhJF8Ih3BWeh1vp+cV/fwoE0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"RK09L8sQv78y69PNbOLaX8asq5kp51mbqUuct5gd7ZNmaHKnVds6ew06QEn+gHSDAxqQo2tpcfoajp+yMj1HBw"}},"state_key":"","type":"m.room.name"},"VisibilityEventIDs":null,"LatestEventIDs":["$nLzxoBC4A0QRvJ1k:localhost"],"AddsStateEventIDs":["$nLzxoBC4A0QRvJ1k:localhost"],"RemovesStateEventIDs":["$CY4XDoxjbns3a4Pc:localhost"],"LastSentEventID":"$W1nrYHQIbCTTSJOV:localhost"}`,
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$om1F4AI8tCYlHUSp:localhost",{"sha256":"yVs+CW7AiJrJOYouL8xPIBrtIHAhnbxaegna8MxeCto"}]],"content":{"membership":"join"},"depth":0,"event_id":"$Zo6P8r9bczF6kctV:localhost","hashes":{"sha256":"R3J2iUWnGxVdmly8ah+Dgb5VbJ2i/e8BLaWM0z9eZKU"},"origin":"localhost","origin_server_ts":1494412338689,"prev_events":[["$nLzxoBC4A0QRvJ1k:localhost",{"sha256":"TDcFaArAXpxIJ1noSubcFqkLXiQTrc1Dw1+kgCtx3XY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"tVnjLVoJ9SLlMQIJSK/6zANWaEu8tVVkx3AEJiC3y5JmhPORb3PyG8eE+e/9hC4aJSQL8LGLaJNWXukMpb2SBg"}},"state_key":"@charlie:localhost","type":"m.room.member"},"VisibilityEventIDs":null,"LatestEventIDs":["$Zo6P8r9bczF6kctV:localhost"],"AddsStateEventIDs":["$Zo6P8r9bczF6kctV:localhost"],"RemovesStateEventIDs":["$om1F4AI8tCYlHUSp:localhost"],"LastSentEventID":"$nLzxoBC4A0QRvJ1k:localhost"}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hiiiii"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@charlie:localhost"
`{"Event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$Zo6P8r9bczF6kctV:localhost",{"sha256":"mnjt3WTYqwtuyl2Fca+0cgm6moHaNL+W9BqRJTQzdEY"}]],"content":{"body":"hiiiii","msgtype":"m.text"},"depth":0,"event_id":"$YAEvK8u2zkTsjf5P:localhost","hashes":{"sha256":"6hKy61h1tuHjYdfpq2MnaPtGEBAZOUz8FLTtxLwjK5A"},"origin":"localhost","origin_server_ts":1494412375465,"prev_events":[["$Zo6P8r9bczF6kctV:localhost",{"sha256":"mnjt3WTYqwtuyl2Fca+0cgm6moHaNL+W9BqRJTQzdEY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"BsSLaMM5U/YkyvBZ00J/+si9My+wAJZOcBhBeato0oHayiag7FW77ZpSTfADazPdNH62kjB0sdP9CN6vQA7yDg"}},"type":"m.room.message"},"VisibilityEventIDs":null,"LatestEventIDs":["$YAEvK8u2zkTsjf5P:localhost"],"AddsStateEventIDs":null,"RemovesStateEventIDs":null,"LastSentEventID":"$Zo6P8r9bczF6kctV:localhost"}`,
}

View file

@ -1,6 +1,13 @@
# Sync API Server
This server is responsible for servicing `/sync` requests. It gets its data from the room server output log.
This server is responsible for servicing `/sync` requests. It gets its data from the room server output log. Currently, the sync server will:
- Return a valid `/sync` response for the user represented by the provided `access_token`.
- Return a "complete sync" if no `since` value is provided, and return a valid `next_batch` token. This contains all rooms the user has been invited to or has joined. For joined rooms, this includes the complete current room state and the most recent 20 (hard-coded) events in the timeline.
- For "incremental syncs" (a `since` value is provided), as you get invited to, join, or leave rooms they will be reflected correctly in the `/sync` response.
- For very large state deltas, the `state` section of a room is correctly populated with the state of the room at the *start* of the timeline.
- When you join a room, the `/sync` which transitions your client to be "joined" will include the complete current room state as per the specification.
- Only wake up user streams it needs to wake up.
- Honours the `timeout` query parameter value.
## Internals
@ -59,3 +66,21 @@ are in `OutputRoomEvents` from the room server.
This version of the sync server uses very simple indexing to calculate room state at various points.
This is inefficient when a very old `since` value is provided, or the `full_state` is requested, as the state delta becomes
very large. This is mitigated slightly with indexes, but better data structures could be used in the future.
## Known Issues
- `m.room.history_visibility` is not honoured: it is always treated as "shared".
- All ephemeral events are not implemented (presence, typing, receipts).
- Account data (both user and room) is not implemented.
- `to_device` messages are not implemented.
- Back-pagination via `prev_batch` is not implemented.
- The `limited` flag can lie.
- Filters are not honoured or implemented. The `limit` for each room is hard-coded to 20.
- The `full_state` query parameter is not implemented.
- The `set_presence` query parameter is not implemented.
- "Ignored" users are not ignored.
- Redacted events are still sent to clients.
- Invites over federation (if it existed) won't work as they aren't "real" events and so won't be in the right tables.
- `invite_state` is not implemented (for similar reasons to the above point).
- The current implementation scales badly when a very old `since` token is provided.
- The entire current room state can be re-sent to the client if they send a duplicate "join" event which should be a no-op.

View file

@ -28,15 +28,15 @@ import (
sarama "gopkg.in/Shopify/sarama.v1"
)
// Server contains all the logic for running a sync server
type Server struct {
// OutputRoomEvent consumes events that originated in the room server.
type OutputRoomEvent struct {
roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
notifier *sync.Notifier
}
// NewServer creates a new sync server. Call Start() to begin consuming from room servers.
func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) {
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
if err != nil {
return nil, err
@ -47,7 +47,7 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &Server{
s := &OutputRoomEvent{
roomServerConsumer: &consumer,
db: store,
notifier: n,
@ -58,14 +58,14 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData
}
// Start consuming from room servers
func (s *Server) Start() error {
func (s *OutputRoomEvent) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputRoomEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {

View file

@ -61,11 +61,15 @@ const selectRoomIDsWithMembershipSQL = "" +
const selectCurrentStateSQL = "" +
"SELECT event_json FROM current_room_state WHERE room_id = $1"
const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt
selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt
}
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
@ -85,9 +89,34 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
return
}
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return
}
return
}
// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
func (s *currentRoomStateStatements) JoinedMemberLists() (map[string][]string, error) {
rows, err := s.selectJoinedUsersStmt.Query()
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string][]string)
for rows.Next() {
var roomID string
var userID string
if err := rows.Scan(&roomID, &userID); err != nil {
return nil, err
}
users := result[roomID]
users = append(users, userID)
result[roomID] = users
}
return result, nil
}
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(txn *sql.Tx, userID, membership string) ([]string, error) {
rows, err := txn.Stmt(s.selectRoomIDsWithMembershipStmt).Query(userID, membership)

View file

@ -50,30 +50,26 @@ const insertEventSQL = "" +
"INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5) RETURNING id"
const selectEventsSQL = "" +
"SELECT event_json FROM output_room_events WHERE event_id = ANY($1)"
const selectEventsInRangeSQL = "" +
"SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2"
"SELECT id, event_json FROM output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" +
"SELECT event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4"
"SELECT id, event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4"
const selectMaxIDSQL = "" +
"SELECT MAX(id) FROM output_room_events"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" +
" WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
"SELECT id, event_json, add_state_ids, remove_state_ids FROM output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" ORDER BY id ASC"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxIDStmt *sql.Stmt
selectEventsInRangeStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
}
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@ -90,9 +86,6 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil {
return
}
if s.selectEventsInRangeStmt, err = db.Prepare(selectEventsInRangeSQL); err != nil {
return
}
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
return
}
@ -102,10 +95,10 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
return
}
// StateBetween returns the state events between the two given stream positions, exclusive of both.
// StateBetween returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) {
func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]streamEvent, error) {
rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos)
if err != nil {
return nil, err
@ -115,18 +108,19 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
// if they aren't in the event ID cache. We don't handle state deletion yet.
eventIDToEvent := make(map[string]gomatrixserverlib.Event)
eventIDToEvent := make(map[string]streamEvent)
// RoomID => A set (map[string]bool) of state event IDs which are between the two positions
stateNeeded := make(map[string]map[string]bool)
for rows.Next() {
var (
streamPos int64
eventBytes []byte
addIDs pq.StringArray
delIDs pq.StringArray
)
if err := rows.Scan(&eventBytes, &addIDs, &delIDs); err != nil {
if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil {
return nil, err
}
// Sanity check for deleted state and whine if we see it. We don't need to do anything
@ -157,7 +151,7 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty
}
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = ev
eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)}
}
return s.fetchStateEvents(txn, stateNeeded, eventIDToEvent)
@ -165,8 +159,8 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
// Returns a map of room ID to list of events.
func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]gomatrixserverlib.Event) (map[string][]gomatrixserverlib.Event, error) {
stateBetween := make(map[string][]gomatrixserverlib.Event)
func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]streamEvent) (map[string][]streamEvent, error) {
stateBetween := make(map[string][]streamEvent)
missingEvents := make(map[string][]string)
for roomID, ids := range roomIDToEventIDSet {
events := stateBetween[roomID]
@ -232,7 +226,7 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser
}
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]gomatrixserverlib.Event, error) {
func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]streamEvent, error) {
rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
@ -249,7 +243,7 @@ func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID stri
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
// from the database.
func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) {
func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs))
if err != nil {
return nil, err
@ -266,11 +260,14 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g
return result, nil
}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
var result []gomatrixserverlib.Event
func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) {
var result []streamEvent
for rows.Next() {
var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil {
var (
streamPos int64
eventBytes []byte
)
if err := rows.Scan(&streamPos, &eventBytes); err != nil {
return nil, err
}
// TODO: Handle redacted events
@ -278,12 +275,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
if err != nil {
return nil, err
}
result = append(result, ev)
result = append(result, streamEvent{ev, types.StreamPosition(streamPos)})
}
return result, nil
}
func reverseEvents(input []gomatrixserverlib.Event) (output []gomatrixserverlib.Event) {
func reverseEvents(input []streamEvent) (output []streamEvent) {
for i := len(input) - 1; i >= 0; i-- {
output = append(output, input[i])
}

View file

@ -16,13 +16,30 @@ package storage
import (
"database/sql"
"encoding/json"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/clientapi/events"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type stateDelta struct {
roomID string
stateEvents []gomatrixserverlib.Event
membership string
// The stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
membershipPos types.StreamPosition
}
// Same as gomatrixserverlib.Event but also has the stream position for this event.
type streamEvent struct {
gomatrixserverlib.Event
streamPosition types.StreamPosition
}
// SyncServerDatabase represents a sync server database
type SyncServerDatabase struct {
db *sql.DB
@ -53,6 +70,11 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
return &SyncServerDatabase{db, partitions, events, state}, nil
}
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error) {
return d.roomstate.JoinedMemberLists()
}
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event.
@ -86,7 +108,7 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve
if err != nil {
return err
}
return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs)
return d.roomstate.UpdateRoomState(txn, streamEventsToEvents(added), removeStateEventIDs)
})
return
}
@ -111,53 +133,84 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
}
// IncrementalSync returns all the data needed in order to create an incremental sync response.
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) {
data = make(map[string]types.RoomData)
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
// Work out which rooms to return in the response. This is done by getting not only the currently
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
// This works out what the 'state' key should be for each room as well as which membership block
// to put the room into.
deltas, err := d.getStateDeltas(txn, fromPos, toPos, userID)
if err != nil {
return err
}
state, err := d.events.StateBetween(txn, fromPos, toPos)
if err != nil {
return err
}
for _, roomID := range roomIDs {
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
res = types.NewResponse(toPos)
for _, delta := range deltas {
endPos := toPos
if delta.membershipPos > 0 && delta.membership == "leave" {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
endPos = delta.membershipPos
}
recentStreamEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom)
if err != nil {
return err
}
roomData := types.RoomData{
State: state[roomID],
RecentEvents: recentEvents,
recentEvents := streamEventsToEvents(recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
switch delta.membership {
case "join":
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case "leave":
fallthrough // transitions to leave are the same as ban
case "ban":
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
data[roomID] = roomData
}
return nil
// TODO: This should be done in getStateDeltas
return d.addInvitesToResponse(txn, userID, res)
})
return
}
// CompleteSync returns all the data needed in order to create a complete sync response.
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
data = make(map[string]types.RoomData)
// CompleteSync a complete /sync API response for the given user.
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync stream position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction.
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
// Get the current stream position which we will base the sync response on.
id, err := d.events.MaxID(txn)
if err != nil {
return err
}
pos = types.StreamPosition(id)
pos := types.StreamPosition(id)
// Extract room state and recent events for all rooms the user is joined to.
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
if err != nil {
return err
}
// Build up a /sync response. Add joined rooms.
res = types.NewResponse(pos)
for _, roomID := range roomIDs {
stateEvents, err := d.roomstate.CurrentState(txn, roomID)
if err != nil {
@ -165,20 +218,151 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom)
recentStreamEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom)
if err != nil {
return err
}
data[roomID] = types.RoomData{
State: stateEvents,
RecentEvents: recentEvents,
}
recentEvents := streamEventsToEvents(recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
return nil
return d.addInvitesToResponse(txn, userID, res)
})
return
}
func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error {
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "invite")
if err != nil {
return err
}
for _, roomID := range roomIDs {
ir := types.NewInviteResponse()
// TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation
res.Rooms.Invite[roomID] = *ir
}
return nil
}
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
// - For each room which has membership list changes:
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
// If it is, then we need to send the full room state down (and 'limited' is always true).
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
var deltas []stateDelta
// get all the state events ever between these two positions
state, err := d.events.StateBetween(txn, fromPos, toPos)
if err != nil {
return nil, err
}
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
// dupe join events will result in the entire room state coming down to the client again. This is added in
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline.
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership == "join" {
// send full room state down instead of a delta
var allState []gomatrixserverlib.Event
allState, err = d.roomstate.CurrentState(txn, roomID)
if err != nil {
return nil, err
}
s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = streamEvent{allState[i], types.StreamPosition(0)}
}
state[roomID] = s
continue // we'll add this room in when we do joined rooms
}
deltas = append(deltas, stateDelta{
membership: membership,
membershipPos: ev.streamPosition,
stateEvents: streamEventsToEvents(stateStreamEvents),
roomID: roomID,
})
break
}
}
}
// Add in currently joined rooms
joinedRoomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
if err != nil {
return nil, err
}
for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, stateDelta{
membership: "join",
stateEvents: streamEventsToEvents(state[joinedRoomID]),
roomID: joinedRoomID,
})
}
return deltas, nil
}
func streamEventsToEvents(in []streamEvent) []gomatrixserverlib.Event {
out := make([]gomatrixserverlib.Event, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].Event
}
return out
}
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward.
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
}
// TODO: This is a linear scan over all the current state events in this room. This will
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
// then do a binary search to find matching events, similar to what roomserver does.
for j := 0; j < len(stateEvents); j++ {
if stateEvents[j].EventID() == recentEv.EventID() {
// overwrite the element to remove with the last element then pop the last element.
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
// (we don't care about the order of stateEvents)
stateEvents[j] = stateEvents[len(stateEvents)-1]
stateEvents = stateEvents[:len(stateEvents)-1]
break // there shouldn't be multiple events with the same event ID
}
}
}
return stateEvents
}
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
var memberContent events.MemberContent
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
return ""
}
return memberContent.Membership
}
return ""
}
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
txn, err := db.Begin()
if err != nil {

View file

@ -15,27 +15,41 @@
package sync
import (
"encoding/json"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/events"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// Notifier will wake up sleeping requests in the request pool when there
// is some new data. It does not tell requests what that data is, only the
// stream position which they can use to get at it.
// Notifier will wake up sleeping requests when there is some new data.
// It does not tell requests what that data is, only the stream position which
// they can use to get at it. This is done to prevent races whereby we tell the caller
// the event, but the token has already advanced by the time they fetch it, resulting
// in missed events.
type Notifier struct {
// The latest sync stream position: guarded by 'cond'.
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
roomIDToJoinedUsers map[string]userIDSet
// Protects currPos and userStreams.
streamLock *sync.Mutex
// The latest sync stream position
currPos types.StreamPosition
// A condition variable to notify all waiting goroutines of a new sync stream position
cond *sync.Cond
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
userStreams map[string]*UserStream
}
// NewNotifier creates a new notifier set to the given stream position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(pos types.StreamPosition) *Notifier {
return &Notifier{
pos,
sync.NewCond(&sync.Mutex{}),
currPos: pos,
roomIDToJoinedUsers: make(map[string]userIDSet),
userStreams: make(map[string]*UserStream),
streamLock: &sync.Mutex{},
}
}
@ -43,25 +57,157 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
// called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly.
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
// update the current position in a guard and then notify all /sync streams
n.cond.L.Lock()
// update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos = pos
n.cond.L.Unlock()
n.cond.Broadcast() // notify ALL waiting goroutines
// Map this event's room_id to a list of joined users, and wake them up.
userIDs := n.joinedUsers(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
userID := *ev.StateKey()
var memberContent events.MemberContent
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
"Notifier.OnNewEvent: Failed to unmarshal member event",
)
} else {
// Keep the joined user map up-to-date
switch memberContent.Membership {
case "invite":
userIDs = append(userIDs, userID)
case "join":
n.addJoinedUser(ev.RoomID(), userID)
case "leave":
fallthrough
case "ban":
n.removeJoinedUser(ev.RoomID(), userID)
}
}
}
for _, userID := range userIDs {
n.wakeupUser(userID, pos)
}
}
// WaitForEvents blocks until there are new events for this request.
func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
// In a guard, check if the /sync request should block, and block it until we get a new position
n.cond.L.Lock()
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID
// - Incoming events wake requests for a matching user ID (needed for invites)
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
// but given we don't do /events, let's pretend it doesn't exist.
// In a guard, check if the /sync request should block, and block it until we get woken up
n.streamLock.Lock()
currentPos := n.currPos
for req.since == currentPos {
// we need to wait for a new event.
// TODO: This waits for ANY new event, we need to only wait for events which we care about.
n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock
currentPos = n.currPos
// TODO: We increment the stream position for any event, so it's possible that we return immediately
// with a pos which contains no new events for this user. We should probably re-wait for events
// automatically in this case.
if req.since != currentPos {
n.streamLock.Unlock()
return currentPos
}
n.cond.L.Unlock()
return currentPos
// wait to be woken up, and then re-check the stream position
req.log.WithField("user_id", req.userID).Info("Waiting for event")
// give up the stream lock prior to waiting on the user lock
stream := n.fetchUserStream(req.userID, true)
n.streamLock.Unlock()
return stream.Wait(currentPos)
}
// Load the membership states required to notify users correctly.
func (n *Notifier) Load(db *storage.SyncServerDatabase) error {
roomToUsers, err := db.AllJoinedUsersInRooms()
if err != nil {
return err
}
n.setUsersJoinedToRooms(roomToUsers)
return nil
}
// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
// OnNewEvent (eg on startup) to prevent racing.
func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
// This is just the bulk form of addJoinedUser
for roomID, userIDs := range roomIDToUserIDs {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
for _, userID := range userIDs {
n.roomIDToJoinedUsers[roomID].add(userID)
}
}
}
func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) {
stream := n.fetchUserStream(userID, false)
if stream == nil {
return
}
stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream
}
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
// a stream will be made for this user if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream.
func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
stream, ok := n.userStreams[userID]
if !ok {
// TODO: Unbounded growth of streams (1 per user)
stream = NewUserStream(userID)
n.userStreams[userID] = stream
}
return stream
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addJoinedUser(roomID, userID string) {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
n.roomIDToJoinedUsers[roomID].add(userID)
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) removeJoinedUser(roomID, userID string) {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
n.roomIDToJoinedUsers[roomID].remove(userID)
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
return
}
return n.roomIDToJoinedUsers[roomID].values()
}
// A string set, mainly existing for improving clarity of structs in this file.
type userIDSet map[string]bool
func (s userIDSet) add(str string) {
s[str] = true
}
func (s userIDSet) remove(str string) {
delete(s, str)
}
func (s userIDSet) values() (vals []string) {
for str := range s {
vals = append(vals, str)
}
return
}

View file

@ -0,0 +1,292 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
var (
randomMessageEvent gomatrixserverlib.Event
aliceInviteBobEvent gomatrixserverlib.Event
bobLeaveEvent gomatrixserverlib.Event
)
var (
streamPositionVeryOld = types.StreamPosition(5)
streamPositionBefore = types.StreamPosition(11)
streamPositionAfter = types.StreamPosition(12)
streamPositionAfter2 = types.StreamPosition(13)
roomID = "!test:localhost"
alice = "@alice:localhost"
bob = "@bob:localhost"
)
func init() {
var err error
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
"type": "m.room.message",
"content": {
"body": "Hello World",
"msgtype": "m.text"
},
"sender": "@noone:localhost",
"room_id": "`+roomID+`",
"origin_server_ts": 12345,
"event_id": "$randomMessageEvent:localhost"
}`), false)
if err != nil {
panic(err)
}
aliceInviteBobEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
"type": "m.room.member",
"state_key": "`+bob+`",
"content": {
"membership": "invite"
},
"sender": "`+alice+`",
"room_id": "`+roomID+`",
"origin_server_ts": 12345,
"event_id": "$aliceInviteBobEvent:localhost"
}`), false)
if err != nil {
panic(err)
}
bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
"type": "m.room.member",
"state_key": "`+bob+`",
"content": {
"membership": "leave"
},
"sender": "`+bob+`",
"room_id": "`+roomID+`",
"origin_server_ts": 12345,
"event_id": "$bobLeaveEvent:localhost"
}`), false)
if err != nil {
panic(err)
}
}
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
n := NewNotifier(streamPositionBefore)
pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
}
if pos != streamPositionBefore {
t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos)
}
}
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
n := NewNotifier(streamPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: []string{alice, bob},
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos)
}
wg.Done()
}()
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
wg.Wait()
}
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
n := NewNotifier(streamPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: []string{alice, bob},
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
if err != nil {
t.Errorf("TestNewInviteEventForUser error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos)
}
wg.Done()
}()
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter)
wg.Wait()
}
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
n := NewNotifier(streamPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: []string{alice, bob},
})
var wg sync.WaitGroup
wg.Add(3)
poll := func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
if err != nil {
t.Errorf("TestMultipleRequestWakeup error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos)
}
wg.Done()
}
go poll()
go poll()
go poll()
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 3)
n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
wg.Wait()
numWaiting := stream.NumWaiting()
if numWaiting != 0 {
t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
}
}
// Test that you stop getting woken up when you leave a room.
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// listen as bob. Make bob leave room. Make alice send event to room.
// Make sure alice gets woken up only and not bob as well.
n := NewNotifier(streamPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: []string{alice, bob},
})
var leaveWG sync.WaitGroup
// Make bob leave the room
leaveWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos)
}
leaveWG.Done()
}()
bobStream := n.fetchUserStream(bob, true)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&bobLeaveEvent, streamPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
var aliceWG sync.WaitGroup
aliceStream := n.fetchUserStream(alice, true)
aliceWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
if pos != streamPositionAfter2 {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos)
}
aliceWG.Done()
}()
go func() {
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
_, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter))
if err == nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
}
}()
waitForBlocking(aliceStream, 1)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&randomMessageEvent, streamPositionAfter2)
aliceWG.Wait()
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
// for a fraction of a second to account for this race
time.Sleep(1 * time.Millisecond)
}
// same as Notifier.WaitForEvents but with a timeout.
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
done := make(chan types.StreamPosition, 1)
go func() {
newPos := n.WaitForEvents(req)
done <- newPos
close(done)
}()
select {
case <-time.After(5 * time.Second):
return types.StreamPosition(0), fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
)
case p := <-done:
return p, nil
}
}
// Wait until something is Wait()ing on the user stream.
func waitForBlocking(s *UserStream, numBlocking int) {
for numBlocking != s.NumWaiting() {
// This is horrible but I don't want to add a signalling mechanism JUST for testing.
time.Sleep(1 * time.Microsecond)
}
}
func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
return syncRequest{
userID: userID,
timeout: 1 * time.Minute,
since: since,
wantFullState: false,
limit: defaultTimelineLimit,
log: util.GetLogger(context.TODO()),
}
}

View file

@ -15,10 +15,13 @@
package sync
import (
"github.com/matrix-org/dendrite/syncapi/types"
"net/http"
"strconv"
"time"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/util"
)
const defaultSyncTimeout = time.Duration(30) * time.Second
@ -31,6 +34,7 @@ type syncRequest struct {
timeout time.Duration
since types.StreamPosition
wantFullState bool
log *log.Entry
}
func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
@ -48,6 +52,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
since: since,
wantFullState: wantFullState,
limit: defaultTimelineLimit, // TODO: read from filter
log: util.GetLogger(req.Context()),
}, nil
}

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -64,7 +63,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
// Fork off 2 goroutines: one to do the work, and one to serve as a timeout.
// Whichever returns first is the one we will serve back to the client.
// TODO: Currently this means that cpu work is timed, which may not be what we want long term.
timeoutChan := make(chan struct{})
timer := time.AfterFunc(syncReq.timeout, func() {
close(timeoutChan) // signal that the timeout has expired
@ -72,8 +70,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
done := make(chan util.JSONResponse)
go func() {
syncData, err := rp.currentSyncForUser(*syncReq)
currentPos := rp.notifier.WaitForEvents(*syncReq)
// We stop the timer BEFORE calculating the response so the cpu work
// done to calculate the response is not timed. This stops us from
// doing lots of work then timing out and sending back an empty response.
timer.Stop()
syncData, err := rp.currentSyncForUser(*syncReq, currentPos)
var res util.JSONResponse
if err != nil {
res = httputil.LogThenError(req, err)
@ -98,39 +100,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
}
}
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
currentPos := rp.notifier.WaitForEvents(req)
if req.since == types.StreamPosition(0) {
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
if err != nil {
return nil, err
}
res := types.NewResponse(pos)
for roomID, d := range data {
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
return res, nil
}
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) {
// TODO: handle ignored users
data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
if err != nil {
return nil, err
if req.since == types.StreamPosition(0) {
return rp.db.CompleteSync(req.userID, req.limit)
}
res := types.NewResponse(currentPos)
for roomID, d := range data {
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
return res, nil
return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
}

View file

@ -0,0 +1,79 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"sync"
"github.com/matrix-org/dendrite/syncapi/types"
)
// UserStream represents a communication mechanism between the /sync request goroutine
// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and
// goroutines can Broadcast(streamPosition) to other goroutines.
type UserStream struct {
UserID string
// Because this is a Cond, we can notify all waiting goroutines so this works
// across devices for the same user. Protects pos.
cond *sync.Cond
// The position to broadcast to callers of Wait().
pos types.StreamPosition
// The number of goroutines blocked on Wait() - used for testing and metrics
numWaiting int
}
// NewUserStream creates a new user stream
func NewUserStream(userID string) *UserStream {
return &UserStream{
UserID: userID,
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Wait blocks until there is a new stream position for this user, which is then returned.
// waitAtPos should be the position the stream thinks it should be waiting at.
func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) {
s.cond.L.Lock()
// Before we start blocking, we need to make sure that we didn't race with a call
// to Broadcast() between calling Wait() and actually sleeping. We check the last
// broadcast pos to see if it is newer than the pos we are meant to wait at. If it
// is newer, something has Broadcast to this stream more recently so return immediately.
if s.pos > waitAtPos {
pos = s.pos
s.cond.L.Unlock()
return
}
s.numWaiting++
s.cond.Wait()
pos = s.pos
s.numWaiting--
s.cond.L.Unlock()
return
}
// Broadcast a new stream position for this user.
func (s *UserStream) Broadcast(pos types.StreamPosition) {
s.cond.L.Lock()
s.pos = pos
s.cond.L.Unlock()
s.cond.Broadcast()
}
// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing.
func (s *UserStream) NumWaiting() int {
s.cond.L.Lock()
defer s.cond.L.Unlock()
return s.numWaiting
}

View file

@ -28,12 +28,6 @@ func (sp StreamPosition) String() string {
return strconv.FormatInt(int64(sp), 10)
}
// RoomData represents the data for a room suitable for building a sync response from.
type RoomData struct {
State []gomatrixserverlib.Event
RecentEvents []gomatrixserverlib.Event
}
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
type Response struct {
NextBatch string `json:"next_batch"`
@ -103,7 +97,7 @@ func NewJoinResponse() *JoinResponse {
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
type InviteResponse struct {
InviteState struct {
Events []gomatrixserverlib.ClientEvent
Events []gomatrixserverlib.ClientEvent `json:"events"`
} `json:"invite_state"`
}