From e524b10fb9e262a0263b5de9d51aeeb3615096de Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 10 Dec 2020 18:37:52 +0000 Subject: [PATCH] Remove New functions for streaming tokens --- syncapi/consumers/clientapi.go | 2 +- syncapi/consumers/eduserver_receipts.go | 2 +- syncapi/consumers/eduserver_sendtodevice.go | 2 +- syncapi/consumers/eduserver_typing.go | 6 ++- syncapi/consumers/roomserver.go | 12 ++--- syncapi/routing/messages.go | 4 +- syncapi/storage/shared/syncserver.go | 30 +++++------ syncapi/storage/storage_test.go | 59 +++++++++++---------- syncapi/sync/notifier_test.go | 8 +-- syncapi/sync/request.go | 3 +- syncapi/types/types.go | 32 ++--------- syncapi/types/types_test.go | 8 +-- 12 files changed, 74 insertions(+), 94 deletions(-) diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 9b9b46a44..9883c6b03 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -92,7 +92,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0, 0, 0, nil)) + s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos}) return nil } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 4c131c055..5c286cf08 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -88,7 +88,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro return err } // update stream position - s.notifier.OnNewReceipt(types.NewStreamToken(0, 0, streamPos, 0, nil)) + s.notifier.OnNewReceipt(types.StreamingToken{ReceiptPosition: streamPos}) return nil } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 97ff74290..0c3f52cd3 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -107,7 +107,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) s.notifier.OnNewSendToDevice( output.UserID, []string{output.DeviceID}, - types.NewStreamToken(0, 0, 0, streamPos, nil), + types.StreamingToken{SendToDevicePosition: streamPos}, ) return nil diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 18741ea7b..885e7fd1f 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -66,7 +66,9 @@ func (s *OutputTypingEventConsumer) Start() error { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { s.notifier.OnNewEvent( nil, roomID, nil, - types.NewStreamToken(0, types.StreamPosition(latestSyncPosition), 0, 0, nil), + types.StreamingToken{ + TypingPosition: types.StreamPosition(latestSyncPosition), + }, ) }) @@ -95,6 +97,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) } - s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos, 0, 0, nil)) + s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.StreamingToken{TypingPosition: typingPos}) return nil } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index d5b01617c..be84a2816 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -181,7 +181,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } - s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil)) + s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos}) return nil } @@ -220,7 +220,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return err } - s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil)) + s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos}) return nil } @@ -269,7 +269,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( }).Panicf("roomserver output log: write invite failure") return nil } - s.notifier.OnNewEvent(msg.Event, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil)) + s.notifier.OnNewEvent(msg.Event, "", nil, types.StreamingToken{PDUPosition: pduPos}) return nil } @@ -287,7 +287,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } // Notify any active sync requests that the invite has been retired. // Invites share the same stream counter as PDUs - s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0, 0, 0, nil)) + s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.StreamingToken{PDUPosition: sp}) return nil } @@ -307,7 +307,7 @@ func (s *OutputRoomEventConsumer) onNewPeek( // we need to wake up the users who might need to now be peeking into this room, // so we send in a dummy event to trigger a wakeup - s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, 0, 0, nil)) + s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) return nil } @@ -327,7 +327,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek( // we need to wake up the users who might need to now be peeking into this room, // so we send in a dummy event to trigger a wakeup - s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, 0, 0, nil)) + s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) return nil } diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 8c7477fff..865203a9b 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -381,7 +381,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st if r.backwardOrdering && events[len(events)-1].Type() == gomatrixserverlib.MRoomCreate { // We've hit the beginning of the room so there's really nowhere else // to go. This seems to fix Riot iOS from looping on /messages endlessly. - end = types.NewTopologyToken(0, 0) + end = types.TopologyToken{} } else { end, err = r.db.EventPositionInTopology( r.ctx, events[len(events)-1].EventID(), @@ -565,7 +565,7 @@ func setToDefault( if backwardOrdering { // go 1 earlier than the first event so we correctly fetch the earliest event // this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound. - to = types.NewTopologyToken(0, 0) + to = types.TopologyToken{} } else { to, err = db.MaxTopologicalPosition(ctx, roomID) } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 143c8884b..c0ae3d7a9 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -440,9 +440,9 @@ func (d *Database) MaxTopologicalPosition( ) (types.TopologyToken, error) { depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID) if err != nil { - return types.NewTopologyToken(0, 0), err + return types.TopologyToken{}, err } - return types.NewTopologyToken(depth, streamPos), nil + return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil } func (d *Database) EventPositionInTopology( @@ -450,9 +450,9 @@ func (d *Database) EventPositionInTopology( ) (types.TopologyToken, error) { depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID) if err != nil { - return types.NewTopologyToken(0, 0), err + return types.TopologyToken{}, err } - return types.NewTopologyToken(depth, stream), nil + return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } func (d *Database) syncPositionTx( @@ -484,13 +484,10 @@ func (d *Database) syncPositionTx( maxEventID = maxPeekID } // TODO: complete these positions - sp = types.NewStreamToken( - types.StreamPosition(maxEventID), - types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), - 0, - 0, - nil, - ) + sp = types.StreamingToken{ + PDUPosition: types.StreamPosition(maxEventID), + TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), + } return } @@ -889,7 +886,10 @@ func (d *Database) getJoinResponseForCompleteSync( if err != nil { return } - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch := types.TopologyToken{ + Depth: backwardTopologyPos, + PDUPosition: backwardStreamPos, + } prevBatch.Decrement() prevBatchStr = prevBatch.String() } @@ -922,7 +922,7 @@ func (d *Database) CompleteSync( // Use a zero value SyncPosition for fromPos so all EDU states are added. err = d.addEDUDeltaToResponse( - types.NewStreamToken(0, 0, 0, 0, nil), toPos, joinedRoomIDs, res, + types.StreamingToken{}, toPos, joinedRoomIDs, res, ) if err != nil { return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err) @@ -972,7 +972,7 @@ func (d *Database) getBackwardTopologyPos( ctx context.Context, txn *sql.Tx, events []types.StreamEvent, ) (types.TopologyToken, error) { - zeroToken := types.NewTopologyToken(0, 0) + zeroToken := types.TopologyToken{} if len(events) == 0 { return zeroToken, nil } @@ -980,7 +980,7 @@ func (d *Database) getBackwardTopologyPos( if err != nil { return zeroToken, err } - tok := types.NewTopologyToken(pos, spos) + tok := types.TopologyToken{Depth: pos, PDUPosition: spos} tok.Decrement() return tok, nil } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 6bf826cf0..8387543f5 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -165,9 +165,9 @@ func TestSyncResponse(t *testing.T) { { Name: "IncrementalSync penultimate", DoSync: func() (*types.Response, error) { - from := types.NewStreamToken( // pretend we are at the penultimate event - positions[len(positions)-2], 0, 0, 0, nil, - ) + from := types.StreamingToken{ // pretend we are at the penultimate event + PDUPosition: positions[len(positions)-2], + } res := types.NewResponse() return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) }, @@ -178,9 +178,9 @@ func TestSyncResponse(t *testing.T) { { Name: "IncrementalSync limited", DoSync: func() (*types.Response, error) { - from := types.NewStreamToken( // pretend we are 10 events behind - positions[len(positions)-11], 0, 0, 0, nil, - ) + from := types.StreamingToken{ // pretend we are 10 events behind + PDUPosition: positions[len(positions)-11], + } res := types.NewResponse() // limit is set to 5 return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) @@ -222,7 +222,12 @@ func TestSyncResponse(t *testing.T) { if err != nil { st.Fatalf("failed to do sync: %s", err) } - next := types.NewStreamToken(latest.PDUPosition, latest.TypingPosition, latest.ReceiptPosition, latest.SendToDevicePosition, nil) + next := types.StreamingToken{ + PDUPosition: latest.PDUPosition, + TypingPosition: latest.TypingPosition, + ReceiptPosition: latest.ReceiptPosition, + SendToDevicePosition: latest.SendToDevicePosition, + } if res.NextBatch != next.String() { st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String()) } @@ -245,9 +250,9 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) { if err != nil { t.Fatalf("failed to get SyncPosition: %s", err) } - from := types.NewStreamToken( - positions[len(positions)-2], 0, 0, 0, nil, - ) + from := types.StreamingToken{ + PDUPosition: positions[len(positions)-2], + } res := types.NewResponse() res, err = db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false) @@ -271,7 +276,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) { } // backpaginate 5 messages starting at the latest position. // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &prevBatchToken, &to, testRoomID, 5, true) if err != nil { t.Fatalf("GetEventsInRange returned an error: %s", err) @@ -291,7 +296,7 @@ func TestGetEventsInRangeWithStreamToken(t *testing.T) { t.Fatalf("failed to get SyncPosition: %s", err) } // head towards the beginning of time - to := types.NewStreamToken(0, 0, 0, 0, nil) + to := types.StreamingToken{} // backpaginate 5 messages starting at the latest position. paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true) @@ -313,7 +318,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { t.Fatalf("failed to get MaxTopologicalPosition: %s", err) } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} // backpaginate 5 messages starting at the latest position. paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, testRoomID, 5, true) @@ -382,7 +387,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { t.Fatalf("failed to get EventPositionInTopology for event: %s", err) } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} testCases := []struct { Name string @@ -458,7 +463,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) { t.Fatalf("failed to get MaxTopologicalPosition: %s", err) } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} // Query using room B as room A was inserted first and hence A will have lower stream positions but identical depths, // allowing this bug to surface. @@ -508,7 +513,7 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) { } // head towards the beginning of time - to := types.NewTopologyToken(0, 0) + to := types.TopologyToken{} // starting at `from`, backpaginate to the beginning of time, asserting as we go. chunkSize = 3 @@ -534,14 +539,14 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point there should be no messages. We haven't sent anything // yet. - events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, 0, nil)) + events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{}) if err != nil { t.Fatal(err) } if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 { t.Fatal("first call should have no updates") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, 0, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{}) if err != nil { return } @@ -559,14 +564,14 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point we should get exactly one message. We're sending the sync position // that we were given from the update and the send-to-device update will be updated // in the database to reflect that this was the sync position we sent the message at. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { t.Fatal(err) } if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 { t.Fatal("second call should have one update") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { return } @@ -574,35 +579,35 @@ func TestSendToDeviceBehaviour(t *testing.T) { // At this point we should still have one message because we haven't progressed the // sync position yet. This is equivalent to the client failing to /sync and retrying // with the same position. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { t.Fatal(err) } if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 { t.Fatal("third call should have one update still") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) if err != nil { return } // At this point we should now have no updates, because we've progressed the sync // position. Therefore the update from before will not be sent again. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos+1, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 1}) if err != nil { t.Fatal(err) } if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 { t.Fatal("fourth call should have no updates") } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos+1, nil)) + err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos + 1}) if err != nil { return } // At this point we should still have no updates, because no new updates have been // sent. - events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos+2, nil)) + events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 2}) if err != nil { t.Fatal(err) } @@ -639,7 +644,7 @@ func TestInviteBehaviour(t *testing.T) { } // both invite events should appear in a new sync beforeRetireRes := types.NewResponse() - beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, 0, 0, nil), latest, 0, false) + beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.StreamingToken{}, latest, 0, false) if err != nil { t.Fatalf("IncrementalSync failed: %s", err) } @@ -654,7 +659,7 @@ func TestInviteBehaviour(t *testing.T) { t.Fatalf("failed to get SyncPosition: %s", err) } res := types.NewResponse() - res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, 0, 0, nil), latest, 0, false) + res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.StreamingToken{}, latest, 0, false) if err != nil { t.Fatalf("IncrementalSync failed: %s", err) } diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index d62e432a4..39124214a 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -32,11 +32,11 @@ var ( randomMessageEvent gomatrixserverlib.HeaderedEvent aliceInviteBobEvent gomatrixserverlib.HeaderedEvent bobLeaveEvent gomatrixserverlib.HeaderedEvent - syncPositionVeryOld = types.NewStreamToken(5, 0, 0, 0, nil) - syncPositionBefore = types.NewStreamToken(11, 0, 0, 0, nil) - syncPositionAfter = types.NewStreamToken(12, 0, 0, 0, nil) + syncPositionVeryOld = types.StreamingToken{PDUPosition: 5} + syncPositionBefore = types.StreamingToken{PDUPosition: 11} + syncPositionAfter = types.StreamingToken{PDUPosition: 12} //syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil) - syncPositionAfter2 = types.NewStreamToken(13, 0, 0, 0, nil) + syncPositionAfter2 = types.StreamingToken{PDUPosition: 13} ) var ( diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index e0405141f..d5cf143d9 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -65,8 +65,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat since = &tok } if since == nil { - tok := types.NewStreamToken(0, 0, 0, 0, nil) - since = &tok + since = &types.StreamingToken{} } timelineLimit := DefaultTimelineLimit // TODO: read from stored filters too diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 21de9f33a..7ccf6384c 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -206,7 +206,9 @@ type TopologyToken struct { } func (t *TopologyToken) StreamToken() StreamingToken { - return NewStreamToken(t.PDUPosition, 0, 0, 0, nil) + return StreamingToken{ + PDUPosition: t.PDUPosition, + } } func (t TopologyToken) String() string { @@ -235,17 +237,6 @@ func (t *TopologyToken) Decrement() { t.PDUPosition = pduPos } -// NewTopologyToken creates a new sync token for /messages -func NewTopologyToken(depth, pduPos StreamPosition) TopologyToken { - if depth < 0 { - depth = 1 - } - return TopologyToken{ - Depth: depth, - PDUPosition: pduPos, - } -} - func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) { if len(tok) < 1 { err = fmt.Errorf("empty topology token") @@ -275,23 +266,6 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) { return } -// NewStreamToken creates a new sync token for /sync -func NewStreamToken( - pduPos, typingPos, receiptPos, sendToDevicePos StreamPosition, - logs map[string]*LogPosition, -) StreamingToken { - if logs == nil { - logs = make(map[string]*LogPosition) - } - return StreamingToken{ - PDUPosition: pduPos, - TypingPosition: typingPos, - ReceiptPosition: receiptPos, - SendToDevicePosition: sendToDevicePos, - logs: logs, - } -} - func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { if len(tok) < 1 { err = fmt.Errorf("empty stream token") diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 419d46ec0..70aa227b2 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -58,10 +58,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) { func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0": NewStreamToken(4, 0, 0, 0, nil).String(), - "s3_1_0_0": NewStreamToken(3, 1, 0, 0, nil).String(), - "s3_1_2_3": NewStreamToken(3, 1, 2, 3, nil).String(), - "t3_1": NewTopologyToken(3, 1).String(), + "s4_0_0_0": StreamingToken{4, 0, 0, 0, nil}.String(), + "s3_1_0_0": StreamingToken{3, 1, 0, 0, nil}.String(), + "s3_1_2_3": StreamingToken{3, 1, 2, 3, nil}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass {