mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 06:53:09 -06:00
Merge branch 'master' into kegan/notify-relevant-requests
This commit is contained in:
commit
474e836b8b
|
|
@ -392,7 +392,6 @@ func main() {
|
|||
}
|
||||
|
||||
// Make sure alice sees it TODO: prev_batch
|
||||
// TODO: Make sure bob sees it AND all the current room state
|
||||
testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{
|
||||
"account_data": {
|
||||
"events": []
|
||||
|
|
@ -425,6 +424,46 @@ func main() {
|
|||
}
|
||||
}`)
|
||||
|
||||
// Make sure bob sees the room AND all the current room state TODO: history visibility
|
||||
testSyncServer(syncServerCmdChan, "@bob:localhost", "9", `{
|
||||
"account_data": {
|
||||
"events": []
|
||||
},
|
||||
"next_batch": "10",
|
||||
"presence": {
|
||||
"events": []
|
||||
},
|
||||
"rooms": {
|
||||
"invite": {},
|
||||
"join": {
|
||||
"!PjrbIMW2cIiaYF4t:localhost": {
|
||||
"account_data": {
|
||||
"events": []
|
||||
},
|
||||
"ephemeral": {
|
||||
"events": []
|
||||
},
|
||||
"state": {
|
||||
"events": [`+
|
||||
clientEventTestData[0]+","+
|
||||
clientEventTestData[1]+","+
|
||||
clientEventTestData[2]+","+
|
||||
clientEventTestData[3]+","+
|
||||
clientEventTestData[4]+","+
|
||||
clientEventTestData[8]+`]
|
||||
},
|
||||
"timeline": {
|
||||
"limited": false,
|
||||
"prev_batch": "",
|
||||
"events": [`+
|
||||
clientEventTestData[9]+`]
|
||||
}
|
||||
}
|
||||
},
|
||||
"leave": {}
|
||||
}
|
||||
}`)
|
||||
|
||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost"
|
||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[10]})); err != nil {
|
||||
panic(err)
|
||||
|
|
@ -469,6 +508,35 @@ func main() {
|
|||
// $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
|
||||
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost"
|
||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(outputRoomEventTestData[11:14])); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Make sure charlie sees the invite both with and without a ?since= token
|
||||
// TODO: Invite state should include the invite event and the room name.
|
||||
charlieInviteData := `{
|
||||
"account_data": {
|
||||
"events": []
|
||||
},
|
||||
"next_batch": "14",
|
||||
"presence": {
|
||||
"events": []
|
||||
},
|
||||
"rooms": {
|
||||
"invite": {
|
||||
"!PjrbIMW2cIiaYF4t:localhost": {
|
||||
"invite_state": {
|
||||
"events": []
|
||||
}
|
||||
}
|
||||
},
|
||||
"join": {},
|
||||
"leave": {}
|
||||
}
|
||||
}`
|
||||
testSyncServer(syncServerCmdChan, "@charlie:localhost", "7", charlieInviteData)
|
||||
testSyncServer(syncServerCmdChan, "@charlie:localhost", "", charlieInviteData)
|
||||
|
||||
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
|
||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
||||
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost"
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ const selectMaxIDSQL = "" +
|
|||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" +
|
||||
" WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
" ORDER BY id ASC"
|
||||
|
||||
type outputRoomEventsStatements struct {
|
||||
|
|
@ -102,7 +102,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// StateBetween returns the state events between the two given stream positions, exclusive of both.
|
||||
// StateBetween returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos.
|
||||
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
||||
// two positions, only the most recent state is returned.
|
||||
func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) {
|
||||
|
|
|
|||
|
|
@ -16,8 +16,10 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
// Import the postgres database driver.
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -116,8 +118,7 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
|
|||
}
|
||||
|
||||
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
||||
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) {
|
||||
data = make(map[string]types.RoomData)
|
||||
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||
if err != nil {
|
||||
|
|
@ -129,41 +130,84 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types
|
|||
return err
|
||||
}
|
||||
|
||||
res = types.NewResponse(toPos)
|
||||
|
||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||
// - Get membership list changes for this user in this sync response
|
||||
// - For each room which has membership list changes:
|
||||
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
||||
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
||||
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||
// * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block.
|
||||
|
||||
// work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed.
|
||||
for roomID, stateEvents := range state {
|
||||
for _, ev := range stateEvents {
|
||||
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
||||
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
||||
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
||||
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
||||
// the timeline.
|
||||
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
||||
var memberContent events.MemberContent
|
||||
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
||||
return err
|
||||
}
|
||||
if memberContent.Membership != "join" {
|
||||
continue
|
||||
}
|
||||
|
||||
allState, err := d.roomstate.CurrentState(txn, roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
state[roomID] = allState
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
for _, roomID := range roomIDs {
|
||||
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
state[roomID] = removeDuplicates(state[roomID], recentEvents)
|
||||
roomData := types.RoomData{
|
||||
State: state[roomID],
|
||||
RecentEvents: recentEvents,
|
||||
}
|
||||
data[roomID] = roomData
|
||||
|
||||
jr := types.NewJoinResponse()
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||
jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync)
|
||||
res.Rooms.Join[roomID] = *jr
|
||||
}
|
||||
return nil
|
||||
|
||||
return d.addInvitesToResponse(txn, userID, res)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// CompleteSync returns all the data needed in order to create a complete sync response.
|
||||
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
|
||||
data = make(map[string]types.RoomData)
|
||||
// CompleteSync a complete /sync API response for the given user.
|
||||
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
||||
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
||||
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
||||
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
|
||||
// but it's better to not hide the fact that this is being done in a transaction.
|
||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||
// Get the current stream position which we will base the sync response on.
|
||||
id, err := d.events.MaxID(txn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pos = types.StreamPosition(id)
|
||||
pos := types.StreamPosition(id)
|
||||
|
||||
// Extract room state and recent events for all rooms the user is joined to.
|
||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build up a /sync response. Add joined rooms.
|
||||
res = types.NewResponse(pos)
|
||||
for _, roomID := range roomIDs {
|
||||
stateEvents, err := d.roomstate.CurrentState(txn, roomID)
|
||||
if err != nil {
|
||||
|
|
@ -177,17 +221,32 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
|
|||
}
|
||||
|
||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||
|
||||
data[roomID] = types.RoomData{
|
||||
State: stateEvents,
|
||||
RecentEvents: recentEvents,
|
||||
}
|
||||
jr := types.NewJoinResponse()
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = true
|
||||
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
||||
res.Rooms.Join[roomID] = *jr
|
||||
}
|
||||
return nil
|
||||
|
||||
return d.addInvitesToResponse(txn, userID, res)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error {
|
||||
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
|
||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "invite")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, roomID := range roomIDs {
|
||||
ir := types.NewInviteResponse()
|
||||
// TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation
|
||||
res.Rooms.Invite[roomID] = *ir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
||||
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
||||
// only, so clients get to the correct state once they have rolled forward.
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
|
|
@ -64,7 +63,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
|||
|
||||
// Fork off 2 goroutines: one to do the work, and one to serve as a timeout.
|
||||
// Whichever returns first is the one we will serve back to the client.
|
||||
// TODO: Currently this means that cpu work is timed, which may not be what we want long term.
|
||||
timeoutChan := make(chan struct{})
|
||||
timer := time.AfterFunc(syncReq.timeout, func() {
|
||||
close(timeoutChan) // signal that the timeout has expired
|
||||
|
|
@ -72,8 +70,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
|||
|
||||
done := make(chan util.JSONResponse)
|
||||
go func() {
|
||||
syncData, err := rp.currentSyncForUser(*syncReq)
|
||||
currentPos := rp.notifier.WaitForEvents(*syncReq)
|
||||
// We stop the timer BEFORE calculating the response so the cpu work
|
||||
// done to calculate the response is not timed. This stops us from
|
||||
// doing lots of work then timing out and sending back an empty response.
|
||||
timer.Stop()
|
||||
syncData, err := rp.currentSyncForUser(*syncReq, currentPos)
|
||||
var res util.JSONResponse
|
||||
if err != nil {
|
||||
res = httputil.LogThenError(req, err)
|
||||
|
|
@ -98,39 +100,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
|||
}
|
||||
}
|
||||
|
||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
||||
currentPos := rp.notifier.WaitForEvents(req)
|
||||
|
||||
if req.since == types.StreamPosition(0) {
|
||||
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := types.NewResponse(pos)
|
||||
for roomID, d := range data {
|
||||
jr := types.NewJoinResponse()
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = true
|
||||
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
|
||||
res.Rooms.Join[roomID] = *jr
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) {
|
||||
// TODO: handle ignored users
|
||||
|
||||
data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if req.since == types.StreamPosition(0) {
|
||||
return rp.db.CompleteSync(req.userID, req.limit)
|
||||
}
|
||||
|
||||
res := types.NewResponse(currentPos)
|
||||
for roomID, d := range data {
|
||||
jr := types.NewJoinResponse()
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
|
||||
res.Rooms.Join[roomID] = *jr
|
||||
}
|
||||
return res, nil
|
||||
return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,12 +28,6 @@ func (sp StreamPosition) String() string {
|
|||
return strconv.FormatInt(int64(sp), 10)
|
||||
}
|
||||
|
||||
// RoomData represents the data for a room suitable for building a sync response from.
|
||||
type RoomData struct {
|
||||
State []gomatrixserverlib.Event
|
||||
RecentEvents []gomatrixserverlib.Event
|
||||
}
|
||||
|
||||
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||
type Response struct {
|
||||
NextBatch string `json:"next_batch"`
|
||||
|
|
@ -103,7 +97,7 @@ func NewJoinResponse() *JoinResponse {
|
|||
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
||||
type InviteResponse struct {
|
||||
InviteState struct {
|
||||
Events []gomatrixserverlib.ClientEvent
|
||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||
} `json:"invite_state"`
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue