mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 13:23:22 -06:00
Comments
This commit is contained in:
parent
ded130f548
commit
0177ea04c4
|
|
@ -137,22 +137,32 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
|
||||||
res = types.NewResponse()
|
res = types.NewResponse()
|
||||||
|
|
||||||
|
// See if we have any new tasks to do for the send-to-device messaging.
|
||||||
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, latestPos)
|
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, latestPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Before we return the sync response, make sure that we take action on
|
||||||
|
// any send-to-device database updates or deletions that we need to do.
|
||||||
|
// Then add the updates into the sync response.
|
||||||
defer func() {
|
defer func() {
|
||||||
if len(updates) > 0 || len(deletions) > 0 {
|
if len(updates) > 0 || len(deletions) > 0 {
|
||||||
|
// Handle the updates and deletions in the database.
|
||||||
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, latestPos)
|
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, latestPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
|
// Add the updates into the sync response.
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
|
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the next_batch from the sync response and increase the
|
||||||
|
// EDU counter.
|
||||||
if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
|
if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
|
||||||
pos.Positions[1]++
|
pos.Positions[1]++
|
||||||
res.NextBatch = pos.String()
|
res.NextBatch = pos.String()
|
||||||
|
|
@ -160,6 +170,17 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if len(events) > 0 {
|
||||||
|
// This is a bit of a hack until we can do something better with the sync API
|
||||||
|
// than this mess. If we have pending send-to-device updates then we want to
|
||||||
|
// deliver them pretty quickly. We still want the next step to run so that the
|
||||||
|
// sync tokens are updated properly. Set a short timeout on the next step so
|
||||||
|
// that we return faster.
|
||||||
|
ctx, cancel := context.WithTimeout(req.ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
req.ctx = ctx
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
if req.since == nil {
|
if req.since == nil {
|
||||||
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
|
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue