Tweaks, restore StreamingToken.MarshalText which somehow went missing?
This commit is contained in:
parent
7200691610
commit
c35e46521f
|
@ -335,7 +335,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) {
|
||||||
return types.StreamingToken{}, fmt.Errorf(
|
return types.StreamingToken{}, fmt.Errorf(
|
||||||
"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
|
"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
|
||||||
)
|
)
|
||||||
case <-listener.GetNotifyChannel(*req.since):
|
case <-listener.GetNotifyChannel(req.since):
|
||||||
p := listener.GetSyncPosition()
|
p := listener.GetSyncPosition()
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
@ -365,7 +365,7 @@ func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syn
|
||||||
ID: deviceID,
|
ID: deviceID,
|
||||||
},
|
},
|
||||||
timeout: 1 * time.Minute,
|
timeout: 1 * time.Minute,
|
||||||
since: &since,
|
since: since,
|
||||||
wantFullState: false,
|
wantFullState: false,
|
||||||
limit: DefaultTimelineLimit,
|
limit: DefaultTimelineLimit,
|
||||||
log: util.GetLogger(context.TODO()),
|
log: util.GetLogger(context.TODO()),
|
||||||
|
|
|
@ -46,7 +46,7 @@ type syncRequest struct {
|
||||||
device userapi.Device
|
device userapi.Device
|
||||||
limit int
|
limit int
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
since *types.StreamingToken // nil means that no since token was supplied
|
since types.StreamingToken // nil means that no since token was supplied
|
||||||
wantFullState bool
|
wantFullState bool
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
}
|
}
|
||||||
|
@ -55,17 +55,13 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
||||||
fullState := req.URL.Query().Get("full_state")
|
fullState := req.URL.Query().Get("full_state")
|
||||||
wantFullState := fullState != "" && fullState != "false"
|
wantFullState := fullState != "" && fullState != "false"
|
||||||
var since *types.StreamingToken
|
since, sinceStr := types.StreamingToken{}, req.URL.Query().Get("since")
|
||||||
sinceStr := req.URL.Query().Get("since")
|
|
||||||
if sinceStr != "" {
|
if sinceStr != "" {
|
||||||
tok, err := types.NewStreamTokenFromString(sinceStr)
|
var err error
|
||||||
|
since, err = types.NewStreamTokenFromString(sinceStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
since = &tok
|
|
||||||
}
|
|
||||||
if since == nil {
|
|
||||||
since = &types.StreamingToken{}
|
|
||||||
}
|
}
|
||||||
timelineLimit := DefaultTimelineLimit
|
timelineLimit := DefaultTimelineLimit
|
||||||
// TODO: read from stored filters too
|
// TODO: read from stored filters too
|
||||||
|
|
|
@ -185,7 +185,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
// respond with, so we skip the return an go back to waiting for content to
|
// respond with, so we skip the return an go back to waiting for content to
|
||||||
// be sent down or the request timing out.
|
// be sent down or the request timing out.
|
||||||
var hasTimedOut bool
|
var hasTimedOut bool
|
||||||
sincePos := *syncReq.since
|
sincePos := syncReq.since
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// Wait for notifier to wake us up
|
// Wait for notifier to wake us up
|
||||||
|
@ -279,7 +279,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
|
|
||||||
// See if we have any new tasks to do for the send-to-device messaging.
|
// See if we have any new tasks to do for the send-to-device messaging.
|
||||||
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, *req.since)
|
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
|
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -291,7 +291,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
|
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
|
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
|
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("rp.appendAccountData: %w", err)
|
return res, fmt.Errorf("rp.appendAccountData: %w", err)
|
||||||
}
|
}
|
||||||
res, err = rp.appendDeviceLists(res, req.device.UserID, *req.since, latestPos)
|
res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
|
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -316,7 +316,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
// Then add the updates into the sync response.
|
// Then add the updates into the sync response.
|
||||||
if len(updates) > 0 || len(deletions) > 0 {
|
if len(updates) > 0 || len(deletions) > 0 {
|
||||||
// Handle the updates and deletions in the database.
|
// Handle the updates and deletions in the database.
|
||||||
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, *req.since)
|
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
|
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,6 +116,17 @@ type StreamingToken struct {
|
||||||
DeviceListPosition LogPosition
|
DeviceListPosition LogPosition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This will be used as a fallback by json.Marshal.
|
||||||
|
func (s StreamingToken) MarshalText() ([]byte, error) {
|
||||||
|
return []byte(s.String()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will be used as a fallback by json.Unmarshal.
|
||||||
|
func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
|
||||||
|
*s, err = NewStreamTokenFromString(string(text))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (t StreamingToken) String() string {
|
func (t StreamingToken) String() string {
|
||||||
posStr := fmt.Sprintf(
|
posStr := fmt.Sprintf(
|
||||||
"s%d_%d_%d_%d",
|
"s%d_%d_%d_%d",
|
||||||
|
|
Loading…
Reference in a new issue