diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 3bf2d19a7..7bba8e522 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -132,8 +132,8 @@ func (t *StreamingToken) String() string { logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset) logStrings = append(logStrings, logStr) } - // E.g s11_22_33|dl0-134|ab1-441 - return strings.Join(logStrings, "|") + // E.g s11_22_33.dl0-134.ab1-441 + return strings.Join(logStrings, ".") } // IsAfter returns true if ANY position in this token is greater than `other`. @@ -236,7 +236,7 @@ func newSyncTokenFromString(s string) (token *syncToken, categories []string, er switch t := SyncTokenType(s[:1]); t { case SyncTokenTypeStream, SyncTokenTypeTopology: token.Type = t - categories = strings.Split(s[1:], "|") + categories = strings.Split(s[1:], ".") positions = strings.Split(categories[0], "_") default: return nil, nil, ErrInvalidSyncTokenType diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 22d344330..7590ea522 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -11,7 +11,7 @@ func TestNewSyncTokenWithLogs(t *testing.T) { syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}}, logs: make(map[string]*LogPosition), }, - "s4_0|dl-0-123": &StreamingToken{ + "s4_0.dl-0-123": &StreamingToken{ syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}}, logs: map[string]*LogPosition{ "dl": &LogPosition{ @@ -20,7 +20,7 @@ func TestNewSyncTokenWithLogs(t *testing.T) { }, }, }, - "s4_0|dl-0-123|ab-1-14419482332": &StreamingToken{ + "s4_0.dl-0-123.ab-1-14419482332": &StreamingToken{ syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}}, logs: map[string]*LogPosition{ "ab": &LogPosition{