Update sync responses
This commit is contained in:
parent
9c03b0a4fa
commit
9eff386e21
|
|
@ -134,7 +134,7 @@ func DeviceListCatchup(
|
||||||
Partition: queryRes.Partition,
|
Partition: queryRes.Partition,
|
||||||
Offset: queryRes.Offset,
|
Offset: queryRes.Offset,
|
||||||
})
|
})
|
||||||
res.NextBatch = to.String()
|
res.NextBatch = res.NextBatch.WithUpdates(to)
|
||||||
|
|
||||||
return hasNew, nil
|
return hasNew, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -686,8 +686,7 @@ func (d *Database) IncrementalSync(
|
||||||
numRecentEventsPerRoom int,
|
numRecentEventsPerRoom int,
|
||||||
wantFullState bool,
|
wantFullState bool,
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
nextBatchPos := fromPos.WithUpdates(toPos)
|
res.NextBatch = res.NextBatch.WithUpdates(toPos)
|
||||||
res.NextBatch = nextBatchPos.String()
|
|
||||||
|
|
||||||
var joinedRoomIDs []string
|
var joinedRoomIDs []string
|
||||||
var err error
|
var err error
|
||||||
|
|
@ -779,7 +778,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
||||||
To: toPos.PDUPosition,
|
To: toPos.PDUPosition,
|
||||||
}
|
}
|
||||||
|
|
||||||
res.NextBatch = toPos.String()
|
res.NextBatch = res.NextBatch.WithUpdates(toPos)
|
||||||
|
|
||||||
// Extract room state and recent events for all rooms the user is joined to.
|
// Extract room state and recent events for all rooms the user is joined to.
|
||||||
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
|
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
|
||||||
|
|
@ -879,19 +878,18 @@ func (d *Database) getJoinResponseForCompleteSync(
|
||||||
|
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
// oldest event in the room's topology.
|
// oldest event in the room's topology.
|
||||||
var prevBatchStr string
|
var prevBatch types.TopologyToken
|
||||||
if len(recentStreamEvents) > 0 {
|
if len(recentStreamEvents) > 0 {
|
||||||
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
||||||
backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
|
backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
prevBatch := types.TopologyToken{
|
prevBatch = types.TopologyToken{
|
||||||
Depth: backwardTopologyPos,
|
Depth: backwardTopologyPos,
|
||||||
PDUPosition: backwardStreamPos,
|
PDUPosition: backwardStreamPos,
|
||||||
}
|
}
|
||||||
prevBatch.Decrement()
|
prevBatch.Decrement()
|
||||||
prevBatchStr = prevBatch.String()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't include a device here as we don't need to send down
|
// We don't include a device here as we don't need to send down
|
||||||
|
|
@ -900,7 +898,7 @@ func (d *Database) getJoinResponseForCompleteSync(
|
||||||
recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents)
|
recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents)
|
||||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||||
jr = types.NewJoinResponse()
|
jr = types.NewJoinResponse()
|
||||||
jr.Timeline.PrevBatch = prevBatchStr
|
jr.Timeline.PrevBatch = prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = limited
|
jr.Timeline.Limited = limited
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
|
@ -1028,7 +1026,7 @@ func (d *Database) addRoomDeltaToResponse(
|
||||||
case gomatrixserverlib.Join:
|
case gomatrixserverlib.Join:
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
|
|
||||||
jr.Timeline.PrevBatch = prevBatch.String()
|
jr.Timeline.PrevBatch = prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = limited
|
jr.Timeline.Limited = limited
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
|
@ -1036,7 +1034,7 @@ func (d *Database) addRoomDeltaToResponse(
|
||||||
case gomatrixserverlib.Peek:
|
case gomatrixserverlib.Peek:
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
|
|
||||||
jr.Timeline.PrevBatch = prevBatch.String()
|
jr.Timeline.PrevBatch = prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = limited
|
jr.Timeline.Limited = limited
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
|
@ -1047,7 +1045,7 @@ func (d *Database) addRoomDeltaToResponse(
|
||||||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||||
// no longer in the room.
|
// no longer in the room.
|
||||||
lr := types.NewLeaveResponse()
|
lr := types.NewLeaveResponse()
|
||||||
lr.Timeline.PrevBatch = prevBatch.String()
|
lr.Timeline.PrevBatch = prevBatch
|
||||||
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
|
|
||||||
|
|
@ -228,7 +228,7 @@ func TestSyncResponse(t *testing.T) {
|
||||||
ReceiptPosition: latest.ReceiptPosition,
|
ReceiptPosition: latest.ReceiptPosition,
|
||||||
SendToDevicePosition: latest.SendToDevicePosition,
|
SendToDevicePosition: latest.SendToDevicePosition,
|
||||||
}
|
}
|
||||||
if res.NextBatch != next.String() {
|
if res.NextBatch.String() != next.String() {
|
||||||
st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String())
|
st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String())
|
||||||
}
|
}
|
||||||
roomRes, ok := res.Rooms.Join[testRoomID]
|
roomRes, ok := res.Rooms.Join[testRoomID]
|
||||||
|
|
@ -266,7 +266,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
|
||||||
// returns the last event "Message 10"
|
// returns the last event "Message 10"
|
||||||
assertEventsEqual(t, "IncrementalSync Timeline", false, roomRes.Timeline.Events, reversed(events[len(events)-1:]))
|
assertEventsEqual(t, "IncrementalSync Timeline", false, roomRes.Timeline.Events, reversed(events[len(events)-1:]))
|
||||||
|
|
||||||
prev := roomRes.Timeline.PrevBatch
|
prev := roomRes.Timeline.PrevBatch.String()
|
||||||
if prev == "" {
|
if prev == "" {
|
||||||
t.Fatalf("IncrementalSync expected prev_batch token")
|
t.Fatalf("IncrementalSync expected prev_batch token")
|
||||||
}
|
}
|
||||||
|
|
@ -666,12 +666,8 @@ func TestInviteBehaviour(t *testing.T) {
|
||||||
assertInvitedToRooms(t, res, []string{inviteRoom2})
|
assertInvitedToRooms(t, res, []string{inviteRoom2})
|
||||||
|
|
||||||
// a sync after we have received both invites should result in a leave for the retired room
|
// a sync after we have received both invites should result in a leave for the retired room
|
||||||
beforeRetireTok, err := types.NewStreamTokenFromString(beforeRetireRes.NextBatch)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("NewStreamTokenFromString cannot parse next batch '%s' : %s", beforeRetireRes.NextBatch, err)
|
|
||||||
}
|
|
||||||
res = types.NewResponse()
|
res = types.NewResponse()
|
||||||
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, beforeRetireTok, latest, 0, false)
|
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, beforeRetireRes.NextBatch, latest, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("IncrementalSync failed: %s", err)
|
t.Fatalf("IncrementalSync failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -295,13 +295,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
|
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
|
||||||
}
|
}
|
||||||
|
res.NextBatch.SendToDevicePosition++
|
||||||
// Get the next_batch from the sync response and increase the
|
|
||||||
// EDU counter.
|
|
||||||
if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
|
|
||||||
pos.SendToDevicePosition++
|
|
||||||
res.NextBatch = pos.String()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
|
|
|
||||||
|
|
@ -113,6 +113,17 @@ type StreamingToken struct {
|
||||||
Logs map[string]*LogPosition
|
Logs map[string]*LogPosition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This will be used as a fallback by json.Marshal.
|
||||||
|
func (t *StreamingToken) MarshalText() ([]byte, error) {
|
||||||
|
return []byte(t.String()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will be used as a fallback by json.Unmarshal.
|
||||||
|
func (t *StreamingToken) UnmarshalText(text []byte) (err error) {
|
||||||
|
*t, err = NewStreamTokenFromString(string(text))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
|
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
|
||||||
if t.Logs == nil {
|
if t.Logs == nil {
|
||||||
t.Logs = make(map[string]*LogPosition)
|
t.Logs = make(map[string]*LogPosition)
|
||||||
|
|
@ -205,6 +216,17 @@ type TopologyToken struct {
|
||||||
PDUPosition StreamPosition
|
PDUPosition StreamPosition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This will be used as a fallback by json.Marshal.
|
||||||
|
func (t *TopologyToken) MarshalText() ([]byte, error) {
|
||||||
|
return []byte(t.String()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will be used as a fallback by json.Unmarshal.
|
||||||
|
func (t *TopologyToken) UnmarshalText(text []byte) (err error) {
|
||||||
|
*t, err = NewTopologyTokenFromString(string(text))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TopologyToken) StreamToken() StreamingToken {
|
func (t *TopologyToken) StreamToken() StreamingToken {
|
||||||
return StreamingToken{
|
return StreamingToken{
|
||||||
PDUPosition: t.PDUPosition,
|
PDUPosition: t.PDUPosition,
|
||||||
|
|
@ -331,7 +353,7 @@ type PrevEventRef struct {
|
||||||
|
|
||||||
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||||
type Response struct {
|
type Response struct {
|
||||||
NextBatch string `json:"next_batch"`
|
NextBatch StreamingToken `json:"next_batch"`
|
||||||
AccountData struct {
|
AccountData struct {
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
} `json:"account_data,omitempty"`
|
} `json:"account_data,omitempty"`
|
||||||
|
|
@ -395,7 +417,7 @@ type JoinResponse struct {
|
||||||
Timeline struct {
|
Timeline struct {
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
Limited bool `json:"limited"`
|
Limited bool `json:"limited"`
|
||||||
PrevBatch string `json:"prev_batch"`
|
PrevBatch TopologyToken `json:"prev_batch"`
|
||||||
} `json:"timeline"`
|
} `json:"timeline"`
|
||||||
Ephemeral struct {
|
Ephemeral struct {
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
|
|
@ -453,7 +475,7 @@ type LeaveResponse struct {
|
||||||
Timeline struct {
|
Timeline struct {
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
Limited bool `json:"limited"`
|
Limited bool `json:"limited"`
|
||||||
PrevBatch string `json:"prev_batch"`
|
PrevBatch TopologyToken `json:"prev_batch"`
|
||||||
} `json:"timeline"`
|
} `json:"timeline"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue