Remove New functions for streaming tokens

This commit is contained in:
Neil Alexander 2020-12-10 18:37:52 +00:00
parent ef5b4000dd
commit e524b10fb9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
12 changed files with 74 additions and 94 deletions

View file

@ -92,7 +92,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0, 0, 0, nil))
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos})
return nil
}

View file

@ -88,7 +88,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err
}
// update stream position
s.notifier.OnNewReceipt(types.NewStreamToken(0, 0, streamPos, 0, nil))
s.notifier.OnNewReceipt(types.StreamingToken{ReceiptPosition: streamPos})
return nil
}

View file

@ -107,7 +107,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
types.NewStreamToken(0, 0, 0, streamPos, nil),
types.StreamingToken{SendToDevicePosition: streamPos},
)
return nil

View file

@ -66,7 +66,9 @@ func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent(
nil, roomID, nil,
types.NewStreamToken(0, types.StreamPosition(latestSyncPosition), 0, 0, nil),
types.StreamingToken{
TypingPosition: types.StreamPosition(latestSyncPosition),
},
)
})
@ -95,6 +97,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos, 0, 0, nil))
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.StreamingToken{TypingPosition: typingPos})
return nil
}

View file

@ -181,7 +181,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil))
s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@ -220,7 +220,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil))
s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@ -269,7 +269,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.notifier.OnNewEvent(msg.Event, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil))
s.notifier.OnNewEvent(msg.Event, "", nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@ -287,7 +287,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0, 0, 0, nil))
s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.StreamingToken{PDUPosition: sp})
return nil
}
@ -307,7 +307,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, 0, 0, nil))
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}
@ -327,7 +327,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, 0, 0, nil))
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
return nil
}

View file

@ -381,7 +381,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
if r.backwardOrdering && events[len(events)-1].Type() == gomatrixserverlib.MRoomCreate {
// We've hit the beginning of the room so there's really nowhere else
// to go. This seems to fix Riot iOS from looping on /messages endlessly.
end = types.NewTopologyToken(0, 0)
end = types.TopologyToken{}
} else {
end, err = r.db.EventPositionInTopology(
r.ctx, events[len(events)-1].EventID(),
@ -565,7 +565,7 @@ func setToDefault(
if backwardOrdering {
// go 1 earlier than the first event so we correctly fetch the earliest event
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
to = types.NewTopologyToken(0, 0)
to = types.TopologyToken{}
} else {
to, err = db.MaxTopologicalPosition(ctx, roomID)
}

View file

@ -440,9 +440,9 @@ func (d *Database) MaxTopologicalPosition(
) (types.TopologyToken, error) {
depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
if err != nil {
return types.NewTopologyToken(0, 0), err
return types.TopologyToken{}, err
}
return types.NewTopologyToken(depth, streamPos), nil
return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil
}
func (d *Database) EventPositionInTopology(
@ -450,9 +450,9 @@ func (d *Database) EventPositionInTopology(
) (types.TopologyToken, error) {
depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID)
if err != nil {
return types.NewTopologyToken(0, 0), err
return types.TopologyToken{}, err
}
return types.NewTopologyToken(depth, stream), nil
return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
func (d *Database) syncPositionTx(
@ -484,13 +484,10 @@ func (d *Database) syncPositionTx(
maxEventID = maxPeekID
}
// TODO: complete these positions
sp = types.NewStreamToken(
types.StreamPosition(maxEventID),
types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
0,
0,
nil,
)
sp = types.StreamingToken{
PDUPosition: types.StreamPosition(maxEventID),
TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
}
return
}
@ -889,7 +886,10 @@ func (d *Database) getJoinResponseForCompleteSync(
if err != nil {
return
}
prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
prevBatch := types.TopologyToken{
Depth: backwardTopologyPos,
PDUPosition: backwardStreamPos,
}
prevBatch.Decrement()
prevBatchStr = prevBatch.String()
}
@ -922,7 +922,7 @@ func (d *Database) CompleteSync(
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
types.NewStreamToken(0, 0, 0, 0, nil), toPos, joinedRoomIDs, res,
types.StreamingToken{}, toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
@ -972,7 +972,7 @@ func (d *Database) getBackwardTopologyPos(
ctx context.Context, txn *sql.Tx,
events []types.StreamEvent,
) (types.TopologyToken, error) {
zeroToken := types.NewTopologyToken(0, 0)
zeroToken := types.TopologyToken{}
if len(events) == 0 {
return zeroToken, nil
}
@ -980,7 +980,7 @@ func (d *Database) getBackwardTopologyPos(
if err != nil {
return zeroToken, err
}
tok := types.NewTopologyToken(pos, spos)
tok := types.TopologyToken{Depth: pos, PDUPosition: spos}
tok.Decrement()
return tok, nil
}

View file

@ -165,9 +165,9 @@ func TestSyncResponse(t *testing.T) {
{
Name: "IncrementalSync penultimate",
DoSync: func() (*types.Response, error) {
from := types.NewStreamToken( // pretend we are at the penultimate event
positions[len(positions)-2], 0, 0, 0, nil,
)
from := types.StreamingToken{ // pretend we are at the penultimate event
PDUPosition: positions[len(positions)-2],
}
res := types.NewResponse()
return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
},
@ -178,9 +178,9 @@ func TestSyncResponse(t *testing.T) {
{
Name: "IncrementalSync limited",
DoSync: func() (*types.Response, error) {
from := types.NewStreamToken( // pretend we are 10 events behind
positions[len(positions)-11], 0, 0, 0, nil,
)
from := types.StreamingToken{ // pretend we are 10 events behind
PDUPosition: positions[len(positions)-11],
}
res := types.NewResponse()
// limit is set to 5
return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
@ -222,7 +222,12 @@ func TestSyncResponse(t *testing.T) {
if err != nil {
st.Fatalf("failed to do sync: %s", err)
}
next := types.NewStreamToken(latest.PDUPosition, latest.TypingPosition, latest.ReceiptPosition, latest.SendToDevicePosition, nil)
next := types.StreamingToken{
PDUPosition: latest.PDUPosition,
TypingPosition: latest.TypingPosition,
ReceiptPosition: latest.ReceiptPosition,
SendToDevicePosition: latest.SendToDevicePosition,
}
if res.NextBatch != next.String() {
st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String())
}
@ -245,9 +250,9 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
if err != nil {
t.Fatalf("failed to get SyncPosition: %s", err)
}
from := types.NewStreamToken(
positions[len(positions)-2], 0, 0, 0, nil,
)
from := types.StreamingToken{
PDUPosition: positions[len(positions)-2],
}
res := types.NewResponse()
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
@ -271,7 +276,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
}
// backpaginate 5 messages starting at the latest position.
// head towards the beginning of time
to := types.NewTopologyToken(0, 0)
to := types.TopologyToken{}
paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &prevBatchToken, &to, testRoomID, 5, true)
if err != nil {
t.Fatalf("GetEventsInRange returned an error: %s", err)
@ -291,7 +296,7 @@ func TestGetEventsInRangeWithStreamToken(t *testing.T) {
t.Fatalf("failed to get SyncPosition: %s", err)
}
// head towards the beginning of time
to := types.NewStreamToken(0, 0, 0, 0, nil)
to := types.StreamingToken{}
// backpaginate 5 messages starting at the latest position.
paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true)
@ -313,7 +318,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
}
// head towards the beginning of time
to := types.NewTopologyToken(0, 0)
to := types.TopologyToken{}
// backpaginate 5 messages starting at the latest position.
paginatedEvents, err := db.GetEventsInTopologicalRange(ctx, &from, &to, testRoomID, 5, true)
@ -382,7 +387,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
t.Fatalf("failed to get EventPositionInTopology for event: %s", err)
}
// head towards the beginning of time
to := types.NewTopologyToken(0, 0)
to := types.TopologyToken{}
testCases := []struct {
Name string
@ -458,7 +463,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
}
// head towards the beginning of time
to := types.NewTopologyToken(0, 0)
to := types.TopologyToken{}
// Query using room B as room A was inserted first and hence A will have lower stream positions but identical depths,
// allowing this bug to surface.
@ -508,7 +513,7 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
}
// head towards the beginning of time
to := types.NewTopologyToken(0, 0)
to := types.TopologyToken{}
// starting at `from`, backpaginate to the beginning of time, asserting as we go.
chunkSize = 3
@ -534,14 +539,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point there should be no messages. We haven't sent anything
// yet.
events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, 0, nil))
events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{})
if err != nil {
t.Fatal(err)
}
if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 {
t.Fatal("first call should have no updates")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, 0, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{})
if err != nil {
return
}
@ -559,14 +564,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should get exactly one message. We're sending the sync position
// that we were given from the update and the send-to-device update will be updated
// in the database to reflect that this was the sync position we sent the message at.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
t.Fatal(err)
}
if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 {
t.Fatal("second call should have one update")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
return
}
@ -574,35 +579,35 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should still have one message because we haven't progressed the
// sync position yet. This is equivalent to the client failing to /sync and retrying
// with the same position.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
t.Fatal(err)
}
if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 {
t.Fatal("third call should have one update still")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos})
if err != nil {
return
}
// At this point we should now have no updates, because we've progressed the sync
// position. Therefore the update from before will not be sent again.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos+1, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 1})
if err != nil {
t.Fatal(err)
}
if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 {
t.Fatal("fourth call should have no updates")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos+1, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos + 1})
if err != nil {
return
}
// At this point we should still have no updates, because no new updates have been
// sent.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos+2, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 2})
if err != nil {
t.Fatal(err)
}
@ -639,7 +644,7 @@ func TestInviteBehaviour(t *testing.T) {
}
// both invite events should appear in a new sync
beforeRetireRes := types.NewResponse()
beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, 0, 0, nil), latest, 0, false)
beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.StreamingToken{}, latest, 0, false)
if err != nil {
t.Fatalf("IncrementalSync failed: %s", err)
}
@ -654,7 +659,7 @@ func TestInviteBehaviour(t *testing.T) {
t.Fatalf("failed to get SyncPosition: %s", err)
}
res := types.NewResponse()
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, 0, 0, nil), latest, 0, false)
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.StreamingToken{}, latest, 0, false)
if err != nil {
t.Fatalf("IncrementalSync failed: %s", err)
}

View file

@ -32,11 +32,11 @@ var (
randomMessageEvent gomatrixserverlib.HeaderedEvent
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
bobLeaveEvent gomatrixserverlib.HeaderedEvent
syncPositionVeryOld = types.NewStreamToken(5, 0, 0, 0, nil)
syncPositionBefore = types.NewStreamToken(11, 0, 0, 0, nil)
syncPositionAfter = types.NewStreamToken(12, 0, 0, 0, nil)
syncPositionVeryOld = types.StreamingToken{PDUPosition: 5}
syncPositionBefore = types.StreamingToken{PDUPosition: 11}
syncPositionAfter = types.StreamingToken{PDUPosition: 12}
//syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil)
syncPositionAfter2 = types.NewStreamToken(13, 0, 0, 0, nil)
syncPositionAfter2 = types.StreamingToken{PDUPosition: 13}
)
var (

View file

@ -65,8 +65,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
since = &tok
}
if since == nil {
tok := types.NewStreamToken(0, 0, 0, 0, nil)
since = &tok
since = &types.StreamingToken{}
}
timelineLimit := DefaultTimelineLimit
// TODO: read from stored filters too

View file

@ -206,7 +206,9 @@ type TopologyToken struct {
}
func (t *TopologyToken) StreamToken() StreamingToken {
return NewStreamToken(t.PDUPosition, 0, 0, 0, nil)
return StreamingToken{
PDUPosition: t.PDUPosition,
}
}
func (t TopologyToken) String() string {
@ -235,17 +237,6 @@ func (t *TopologyToken) Decrement() {
t.PDUPosition = pduPos
}
// NewTopologyToken creates a new sync token for /messages
func NewTopologyToken(depth, pduPos StreamPosition) TopologyToken {
if depth < 0 {
depth = 1
}
return TopologyToken{
Depth: depth,
PDUPosition: pduPos,
}
}
func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
if len(tok) < 1 {
err = fmt.Errorf("empty topology token")
@ -275,23 +266,6 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
return
}
// NewStreamToken creates a new sync token for /sync
func NewStreamToken(
pduPos, typingPos, receiptPos, sendToDevicePos StreamPosition,
logs map[string]*LogPosition,
) StreamingToken {
if logs == nil {
logs = make(map[string]*LogPosition)
}
return StreamingToken{
PDUPosition: pduPos,
TypingPosition: typingPos,
ReceiptPosition: receiptPos,
SendToDevicePosition: sendToDevicePos,
logs: logs,
}
}
func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
if len(tok) < 1 {
err = fmt.Errorf("empty stream token")

View file

@ -58,10 +58,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
"s4_0_0_0": NewStreamToken(4, 0, 0, 0, nil).String(),
"s3_1_0_0": NewStreamToken(3, 1, 0, 0, nil).String(),
"s3_1_2_3": NewStreamToken(3, 1, 2, 3, nil).String(),
"t3_1": NewTopologyToken(3, 1).String(),
"s4_0_0_0": StreamingToken{4, 0, 0, 0, nil}.String(),
"s3_1_0_0": StreamingToken{3, 1, 0, 0, nil}.String(),
"s3_1_2_3": StreamingToken{3, 1, 2, 3, nil}.String(),
"t3_1": TopologyToken{3, 1}.String(),
}
for a, b := range shouldPass {