diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index ad6290e3f..f7cf96d94 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -91,7 +91,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0)) + s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0, nil)) return nil } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 487018031..06a8928da 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -106,7 +106,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) s.notifier.OnNewSendToDevice( output.UserID, []string{output.DeviceID}, - types.NewStreamToken(0, streamPos), + types.NewStreamToken(0, streamPos, nil), ) return nil diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 12b1efbc0..0a9a9c0cd 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -65,7 +65,7 @@ func (s *OutputTypingEventConsumer) Start() error { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { s.notifier.OnNewEvent( nil, roomID, nil, - types.NewStreamToken(0, types.StreamPosition(latestSyncPosition)), + types.NewStreamToken(0, types.StreamPosition(latestSyncPosition), nil), ) }) @@ -94,6 +94,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) } - s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos)) + s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos, nil)) return nil } diff --git a/syncapi/consumers/keychange_test.go b/syncapi/consumers/keychange_test.go index f8e965700..3ecb3f583 100644 --- a/syncapi/consumers/keychange_test.go +++ b/syncapi/consumers/keychange_test.go @@ -14,6 +14,7 @@ import ( var ( syncingUser = "@alice:localhost" + emptyToken = types.NewStreamToken(0, 0, nil) ) type mockKeyAPI struct{} @@ -167,7 +168,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -190,7 +191,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -213,7 +214,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -235,7 +236,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -294,7 +295,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -322,7 +323,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -407,7 +408,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index c65027168..da4a5366c 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -158,7 +158,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( }).Panicf("roomserver output log: write event failure") return nil } - s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0)) + s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) return nil } @@ -176,7 +176,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( }).Panicf("roomserver output log: write invite failure") return nil } - s.notifier.OnNewEvent(&msg.Event, "", nil, types.NewStreamToken(pduPos, 0)) + s.notifier.OnNewEvent(&msg.Event, "", nil, types.NewStreamToken(pduPos, 0, nil)) return nil } @@ -194,7 +194,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } // Notify any active sync requests that the invite has been retired. // Invites share the same stream counter as PDUs - s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0)) + s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0, nil)) return nil } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e1312671b..dd5b838ce 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -434,7 +434,7 @@ func (d *Database) syncPositionTx( if maxInviteID > maxEventID { maxEventID = maxInviteID } - sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition())) + sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil) return } @@ -731,7 +731,7 @@ func (d *Database) CompleteSync( // Use a zero value SyncPosition for fromPos so all EDU states are added. err = d.addEDUDeltaToResponse( - types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res, + types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res, ) if err != nil { return nil, err diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 474d3222b..1f679def3 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -163,7 +163,7 @@ func TestSyncResponse(t *testing.T) { Name: "IncrementalSync penultimate", DoSync: func() (*types.Response, error) { from := types.NewStreamToken( // pretend we are at the penultimate event - positions[len(positions)-2], types.StreamPosition(0), + positions[len(positions)-2], types.StreamPosition(0), nil, ) res := types.NewResponse() return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) @@ -176,7 +176,7 @@ func TestSyncResponse(t *testing.T) { Name: "IncrementalSync limited", DoSync: func() (*types.Response, error) { from := types.NewStreamToken( // pretend we are 10 events behind - positions[len(positions)-11], types.StreamPosition(0), + positions[len(positions)-11], types.StreamPosition(0), nil, ) res := types.NewResponse() // limit is set to 5 @@ -219,7 +219,7 @@ func TestSyncResponse(t *testing.T) { if err != nil { st.Fatalf("failed to do sync: %s", err) } - next := types.NewStreamToken(latest.PDUPosition(), latest.EDUPosition()) + next := types.NewStreamToken(latest.PDUPosition(), latest.EDUPosition(), nil) if res.NextBatch != next.String() { st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String()) } @@ -243,7 +243,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) { t.Fatalf("failed to get SyncPosition: %s", err) } from := types.NewStreamToken( - positions[len(positions)-2], types.StreamPosition(0), + positions[len(positions)-2], types.StreamPosition(0), nil, ) res := types.NewResponse() @@ -288,7 +288,7 @@ func TestGetEventsInRangeWithStreamToken(t *testing.T) { t.Fatalf("failed to get SyncPosition: %s", err) } // head towards the beginning of time - to := types.NewStreamToken(0, 0) + to := types.NewStreamToken(0, 0, nil) // backpaginate 5 messages starting at the latest position. paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true) @@ -531,14 +531,14 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point there should be no messages. We haven't sent anything // yet. - events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0)) + events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, nil)) if err != nil { t.Fatal(err) } if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 { t.Fatal("first call should have no updates") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, nil)) if err != nil { return } @@ -556,14 +556,14 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point we should get exactly one message. We're sending the sync position // that we were given from the update and the send-to-device update will be updated // in the database to reflect that this was the sync position we sent the message at. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil)) if err != nil { t.Fatal(err) } if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 { t.Fatal("second call should have one update") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil)) if err != nil { return } @@ -571,35 +571,35 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point we should still have one message because we haven't progressed the // sync position yet. This is equivalent to the client failing to /sync and retrying // with the same position. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil)) if err != nil { t.Fatal(err) } if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 { t.Fatal("third call should have one update still") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil)) if err != nil { return } // At this point we should now have no updates, because we've progressed the sync // position. Therefore the update from before will not be sent again. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+1)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+1, nil)) if err != nil { t.Fatal(err) } if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 { t.Fatal("fourth call should have no updates") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos+1)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos+1, nil)) if err != nil { return } // At this point we should still have no updates, because no new updates have been // sent. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+2)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+2, nil)) if err != nil { t.Fatal(err) } @@ -636,7 +636,7 @@ func TestInviteBehaviour(t *testing.T) { } // both invite events should appear in a new sync beforeRetireRes := types.NewResponse() - beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false) + beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false) if err != nil { t.Fatalf("IncrementalSync failed: %s", err) } @@ -651,7 +651,7 @@ func TestInviteBehaviour(t *testing.T) { t.Fatalf("failed to get SyncPosition: %s", err) } res := types.NewResponse() - res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false) + res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false) if err != nil { t.Fatalf("IncrementalSync failed: %s", err) } diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index f2a368ec2..5a4c7b31b 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -32,11 +32,11 @@ var ( randomMessageEvent gomatrixserverlib.HeaderedEvent aliceInviteBobEvent gomatrixserverlib.HeaderedEvent bobLeaveEvent gomatrixserverlib.HeaderedEvent - syncPositionVeryOld = types.NewStreamToken(5, 0) - syncPositionBefore = types.NewStreamToken(11, 0) - syncPositionAfter = types.NewStreamToken(12, 0) - syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition(), 1) - syncPositionAfter2 = types.NewStreamToken(13, 0) + syncPositionVeryOld = types.NewStreamToken(5, 0, nil) + syncPositionBefore = types.NewStreamToken(11, 0, nil) + syncPositionAfter = types.NewStreamToken(12, 0, nil) + syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition(), 1, nil) + syncPositionAfter2 = types.NewStreamToken(13, 0, nil) ) var ( diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 41b18aa10..0996729e6 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -65,7 +65,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat since = &tok } if since == nil { - tok := types.NewStreamToken(0, 0) + tok := types.NewStreamToken(0, 0, nil) since = &tok } timelineLimit := DefaultTimelineLimit diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 196d446a2..bf6a9e01f 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -138,7 +138,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) { res = types.NewResponse() - since := types.NewStreamToken(0, 0) + since := types.NewStreamToken(0, 0, nil) if req.since != nil { since = *req.since } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 7dc022811..7bba8e522 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -39,6 +39,23 @@ var ( // StreamPosition represents the offset in the sync stream a client is at. type StreamPosition int64 +// LogPosition represents the offset in a Kafka log a client is at. +type LogPosition struct { + Partition int32 + Offset int64 +} + +// IsAfter returns true if this position is after `lp`. +func (p *LogPosition) IsAfter(lp *LogPosition) bool { + if lp == nil { + return false + } + if p.Partition != lp.Partition { + return false + } + return p.Offset > lp.Offset +} + // StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event. type StreamEvent struct { gomatrixserverlib.HeaderedEvent @@ -90,6 +107,15 @@ const ( type StreamingToken struct { syncToken + logs map[string]*LogPosition +} + +func (t *StreamingToken) Log(name string) *LogPosition { + l, ok := t.logs[name] + if !ok { + return nil + } + return l } func (t *StreamingToken) PDUPosition() StreamPosition { @@ -99,7 +125,15 @@ func (t *StreamingToken) EDUPosition() StreamPosition { return t.Positions[1] } func (t *StreamingToken) String() string { - return t.syncToken.String() + logStrings := []string{ + t.syncToken.String(), + } + for name, lp := range t.logs { + logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset) + logStrings = append(logStrings, logStr) + } + // E.g s11_22_33.dl0-134.ab1-441 + return strings.Join(logStrings, ".") } // IsAfter returns true if ANY position in this token is greater than `other`. @@ -109,12 +143,22 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true } } + for name := range t.logs { + otherLog := other.Log(name) + if otherLog == nil { + continue + } + if t.logs[name].IsAfter(otherLog) { + return true + } + } return false } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. // If the latter StreamingToken contains a field that is not 0, it is considered an update, // and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called. +// If the other token has a log, they will replace any existing log on this token. func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) { ret.Type = t.Type ret.Positions = make([]StreamPosition, len(t.Positions)) @@ -125,6 +169,13 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) } ret.Positions[i] = other.Positions[i] } + for name := range t.logs { + otherLog := other.Log(name) + if otherLog == nil { + continue + } + t.logs[name] = otherLog + } return ret } @@ -139,7 +190,7 @@ func (t *TopologyToken) PDUPosition() StreamPosition { return t.Positions[1] } func (t *TopologyToken) StreamToken() StreamingToken { - return NewStreamToken(t.PDUPosition(), 0) + return NewStreamToken(t.PDUPosition(), 0, nil) } func (t *TopologyToken) String() string { return t.syncToken.String() @@ -174,9 +225,9 @@ func (t *TopologyToken) Decrement() { // error if the token couldn't be parsed into an int64, or if the token type // isn't a known type (returns ErrInvalidSyncTokenType in the latter // case). -func newSyncTokenFromString(s string) (token *syncToken, err error) { +func newSyncTokenFromString(s string) (token *syncToken, categories []string, err error) { if len(s) == 0 { - return nil, ErrInvalidSyncTokenLen + return nil, nil, ErrInvalidSyncTokenLen } token = new(syncToken) @@ -185,16 +236,17 @@ func newSyncTokenFromString(s string) (token *syncToken, err error) { switch t := SyncTokenType(s[:1]); t { case SyncTokenTypeStream, SyncTokenTypeTopology: token.Type = t - positions = strings.Split(s[1:], "_") + categories = strings.Split(s[1:], ".") + positions = strings.Split(categories[0], "_") default: - return nil, ErrInvalidSyncTokenType + return nil, nil, ErrInvalidSyncTokenType } for _, pos := range positions { if posInt, err := strconv.ParseInt(pos, 10, 64); err != nil { - return nil, err + return nil, nil, err } else if posInt < 0 { - return nil, errors.New("negative position not allowed") + return nil, nil, errors.New("negative position not allowed") } else { token.Positions = append(token.Positions, StreamPosition(posInt)) } @@ -215,7 +267,7 @@ func NewTopologyToken(depth, streamPos StreamPosition) TopologyToken { } } func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) { - t, err := newSyncTokenFromString(tok) + t, _, err := newSyncTokenFromString(tok) if err != nil { return } @@ -233,16 +285,20 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) { } // NewStreamToken creates a new sync token for /sync -func NewStreamToken(pduPos, eduPos StreamPosition) StreamingToken { +func NewStreamToken(pduPos, eduPos StreamPosition, logs map[string]*LogPosition) StreamingToken { + if logs == nil { + logs = make(map[string]*LogPosition) + } return StreamingToken{ syncToken: syncToken{ Type: SyncTokenTypeStream, Positions: []StreamPosition{pduPos, eduPos}, }, + logs: logs, } } func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { - t, err := newSyncTokenFromString(tok) + t, categories, err := newSyncTokenFromString(tok) if err != nil { return } @@ -254,8 +310,35 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions)) return } + logs := make(map[string]*LogPosition) + if len(categories) > 1 { + // dl-0-1234 + // $log_name-$partition-$offset + for _, logStr := range categories[1:] { + segments := strings.Split(logStr, "-") + if len(segments) != 3 { + err = fmt.Errorf("token %s - invalid log: %s", tok, logStr) + return + } + var partition int64 + partition, err = strconv.ParseInt(segments[1], 10, 32) + if err != nil { + return + } + var offset int64 + offset, err = strconv.ParseInt(segments[2], 10, 64) + if err != nil { + return + } + logs[segments[0]] = &LogPosition{ + Partition: int32(partition), + Offset: offset, + } + } + } return StreamingToken{ syncToken: *t, + logs: logs, }, nil } diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 1e27a8e32..7590ea522 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -1,11 +1,61 @@ package types -import "testing" +import ( + "reflect" + "testing" +) + +func TestNewSyncTokenWithLogs(t *testing.T) { + tests := map[string]*StreamingToken{ + "s4_0": &StreamingToken{ + syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}}, + logs: make(map[string]*LogPosition), + }, + "s4_0.dl-0-123": &StreamingToken{ + syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}}, + logs: map[string]*LogPosition{ + "dl": &LogPosition{ + Partition: 0, + Offset: 123, + }, + }, + }, + "s4_0.dl-0-123.ab-1-14419482332": &StreamingToken{ + syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}}, + logs: map[string]*LogPosition{ + "ab": &LogPosition{ + Partition: 1, + Offset: 14419482332, + }, + "dl": &LogPosition{ + Partition: 0, + Offset: 123, + }, + }, + }, + } + for tok, want := range tests { + got, err := NewStreamTokenFromString(tok) + if err != nil { + if want == nil { + continue // error expected + } + t.Errorf("%s errored: %s", tok, err) + continue + } + if !reflect.DeepEqual(got, *want) { + t.Errorf("%s mismatch: got %v want %v", tok, got, want) + } + if got.String() != tok { + t.Errorf("%s reserialisation mismatch: got %s want %s", tok, got.String(), tok) + } + } +} func TestNewSyncTokenFromString(t *testing.T) { shouldPass := map[string]syncToken{ - "s4_0": NewStreamToken(4, 0).syncToken, - "s3_1": NewStreamToken(3, 1).syncToken, + "s4_0": NewStreamToken(4, 0, nil).syncToken, + "s3_1": NewStreamToken(3, 1, nil).syncToken, "t3_1": NewTopologyToken(3, 1).syncToken, } @@ -21,7 +71,7 @@ func TestNewSyncTokenFromString(t *testing.T) { } for test, expected := range shouldPass { - result, err := newSyncTokenFromString(test) + result, _, err := newSyncTokenFromString(test) if err != nil { t.Error(err) } @@ -31,7 +81,7 @@ func TestNewSyncTokenFromString(t *testing.T) { } for _, test := range shouldFail { - if _, err := newSyncTokenFromString(test); err == nil { + if _, _, err := newSyncTokenFromString(test); err == nil { t.Errorf("input '%v' should have errored but didn't", test) } }