Send-to-device/sync tweaks (#2630)
* Always delete send to device messages * Omit empty to_device * Tweak /sync response to omit empty values
This commit is contained in:
parent
03ddd98f5e
commit
e930959e49
|
@ -39,13 +39,6 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(events) > 0 {
|
|
||||||
// Clean up old send-to-device messages from before this stream position.
|
|
||||||
if err := p.DB.CleanSendToDeviceUpdates(req.Context, req.Device.UserID, req.Device.ID, from); err != nil {
|
|
||||||
req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed")
|
|
||||||
return from
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the updates into the sync response.
|
// Add the updates into the sync response.
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
// skip ignored user events
|
// skip ignored user events
|
||||||
|
@ -54,7 +47,6 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
|
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return lastPos
|
return lastPos
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -35,10 +40,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/streams"
|
"github.com/matrix-org/dendrite/syncapi/streams"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/util"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RequestPool manages HTTP long-poll connections for /sync
|
// RequestPool manages HTTP long-poll connections for /sync
|
||||||
|
@ -251,6 +252,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
waitingSyncRequests.Inc()
|
waitingSyncRequests.Inc()
|
||||||
defer waitingSyncRequests.Dec()
|
defer waitingSyncRequests.Dec()
|
||||||
|
|
||||||
|
// Clean up old send-to-device messages from before this stream position.
|
||||||
|
// This is needed to avoid sending the same message multiple times
|
||||||
|
if err = rp.db.CleanSendToDeviceUpdates(syncReq.Context, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Since.SendToDevicePosition); err != nil {
|
||||||
|
syncReq.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed")
|
||||||
|
}
|
||||||
|
|
||||||
// loop until we get some data
|
// loop until we get some data
|
||||||
for {
|
for {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
|
@ -21,9 +21,10 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -330,23 +331,23 @@ type Response struct {
|
||||||
NextBatch StreamingToken `json:"next_batch"`
|
NextBatch StreamingToken `json:"next_batch"`
|
||||||
AccountData struct {
|
AccountData struct {
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
|
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
|
||||||
} `json:"account_data"`
|
} `json:"account_data,omitempty"`
|
||||||
Presence struct {
|
Presence struct {
|
||||||
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
|
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
|
||||||
} `json:"presence"`
|
} `json:"presence,omitempty"`
|
||||||
Rooms struct {
|
Rooms struct {
|
||||||
Join map[string]JoinResponse `json:"join"`
|
Join map[string]JoinResponse `json:"join,omitempty"`
|
||||||
Peek map[string]JoinResponse `json:"peek"`
|
Peek map[string]JoinResponse `json:"peek,omitempty"`
|
||||||
Invite map[string]InviteResponse `json:"invite"`
|
Invite map[string]InviteResponse `json:"invite,omitempty"`
|
||||||
Leave map[string]LeaveResponse `json:"leave"`
|
Leave map[string]LeaveResponse `json:"leave,omitempty"`
|
||||||
} `json:"rooms"`
|
} `json:"rooms,omitempty"`
|
||||||
ToDevice struct {
|
ToDevice struct {
|
||||||
Events []gomatrixserverlib.SendToDeviceEvent `json:"events"`
|
Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"`
|
||||||
} `json:"to_device"`
|
} `json:"to_device,omitempty"`
|
||||||
DeviceLists struct {
|
DeviceLists struct {
|
||||||
Changed []string `json:"changed,omitempty"`
|
Changed []string `json:"changed,omitempty"`
|
||||||
Left []string `json:"left,omitempty"`
|
Left []string `json:"left,omitempty"`
|
||||||
} `json:"device_lists"`
|
} `json:"device_lists,omitempty"`
|
||||||
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
|
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue