Refactor sync tokens

This commit is contained in:
Neil Alexander 2020-12-10 16:39:30 +00:00
parent bad81c028f
commit d28fab31e1
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
15 changed files with 204 additions and 211 deletions

View file

@ -92,7 +92,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0, nil))
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0, 0, 0, nil))
return nil
}

View file

@ -88,7 +88,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err
}
// update stream position
s.notifier.OnNewReceipt(types.NewStreamToken(0, streamPos, nil))
s.notifier.OnNewReceipt(types.NewStreamToken(0, 0, streamPos, 0, nil))
return nil
}

View file

@ -107,7 +107,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
types.NewStreamToken(0, streamPos, nil),
types.NewStreamToken(0, 0, 0, streamPos, nil),
)
return nil

View file

@ -66,7 +66,7 @@ func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent(
nil, roomID, nil,
types.NewStreamToken(0, types.StreamPosition(latestSyncPosition), nil),
types.NewStreamToken(0, types.StreamPosition(latestSyncPosition), 0, 0, nil),
)
})
@ -95,6 +95,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos, nil))
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos, 0, 0, nil))
return nil
}

View file

@ -114,7 +114,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
posUpdate := types.NewStreamToken(0, 0, 0, 0, map[string]*types.LogPosition{
syncinternal.DeviceListLogName: {
Offset: msg.Offset,
Partition: msg.Partition,

View file

@ -181,7 +181,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil))
return nil
}
@ -220,7 +220,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil))
return nil
}
@ -269,7 +269,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.notifier.OnNewEvent(msg.Event, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifier.OnNewEvent(msg.Event, "", nil, types.NewStreamToken(pduPos, 0, 0, 0, nil))
return nil
}
@ -287,7 +287,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0, nil))
s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0, 0, 0, nil))
return nil
}
@ -307,7 +307,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil))
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, 0, 0, nil))
return nil
}
@ -327,7 +327,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil))
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, 0, 0, nil))
return nil
}

View file

@ -16,8 +16,8 @@ import (
var (
syncingUser = "@alice:localhost"
emptyToken = types.NewStreamToken(0, 0, nil)
newestToken = types.NewStreamToken(0, 0, map[string]*types.LogPosition{
emptyToken = types.NewStreamToken(0, 0, 0, 0, nil)
newestToken = types.NewStreamToken(0, 0, 0, 0, map[string]*types.LogPosition{
DeviceListLogName: {
Offset: sarama.OffsetNewest,
Partition: 0,

View file

@ -447,11 +447,11 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
streamPos := types.StreamPosition(streamEvents[len(streamEvents)-1].StreamPosition)
isSetLargeEnough = (r.to.PDUPosition()-1 == streamPos)
isSetLargeEnough = (r.to.PDUPosition-1 == streamPos)
}
} else {
streamPos := types.StreamPosition(streamEvents[0].StreamPosition)
isSetLargeEnough = (r.from.PDUPosition()-1 == streamPos)
isSetLargeEnough = (r.from.PDUPosition-1 == streamPos)
}
}

View file

@ -78,8 +78,8 @@ func (d *Database) GetEventsInStreamingRange(
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
r := types.Range{
From: from.PDUPosition(),
To: to.PDUPosition(),
From: from.PDUPosition,
To: to.PDUPosition,
Backwards: backwardOrdering,
}
if backwardOrdering {
@ -391,16 +391,16 @@ func (d *Database) GetEventsInTopologicalRange(
var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition
if backwardOrdering {
// Backward ordering means the 'from' token has a higher depth than the 'to' token
minDepth = to.Depth()
maxDepth = from.Depth()
minDepth = to.Depth
maxDepth = from.Depth
// for cases where we have say 5 events with the same depth, the TopologyToken needs to
// know which of the 5 the client has seen. This is done by using the PDU position.
// Events with the same maxDepth but less than this PDU position will be returned.
maxStreamPosForMaxDepth = from.PDUPosition()
maxStreamPosForMaxDepth = from.PDUPosition
} else {
// Forward ordering means the 'from' token has a lower depth than the 'to' token.
minDepth = from.Depth()
maxDepth = to.Depth()
minDepth = from.Depth
maxDepth = to.Depth
}
// Select the event IDs from the defined range.
@ -483,7 +483,14 @@ func (d *Database) syncPositionTx(
if maxPeekID > maxEventID {
maxEventID = maxPeekID
}
sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil)
// TODO: complete these positions
sp = types.NewStreamToken(
types.StreamPosition(maxEventID),
types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
0,
0,
nil,
)
return
}
@ -555,7 +562,7 @@ func (d *Database) addTypingDeltaToResponse(
for _, roomID := range joinedRoomIDs {
var jr types.JoinResponse
if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUPosition()),
roomID, int64(since.TypingPosition),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
@ -584,7 +591,7 @@ func (d *Database) addReceiptDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.EDUPosition())
receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition)
if err != nil {
return fmt.Errorf("unable to select receipts for rooms: %w", err)
}
@ -639,7 +646,7 @@ func (d *Database) addEDUDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
if fromPos.EDUPosition() != toPos.EDUPosition() {
if fromPos.TypingPosition != toPos.TypingPosition {
// add typing deltas
if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return fmt.Errorf("unable to apply typing delta to response: %w", err)
@ -647,8 +654,8 @@ func (d *Database) addEDUDeltaToResponse(
}
// Check on initial sync and if EDUPositions differ
if (fromPos.EDUPosition() == 0 && toPos.EDUPosition() == 0) ||
fromPos.EDUPosition() != toPos.EDUPosition() {
if (fromPos.ReceiptPosition == 0 && toPos.ReceiptPosition == 0) ||
fromPos.ReceiptPosition != toPos.ReceiptPosition {
if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return fmt.Errorf("unable to apply receipts to response: %w", err)
}
@ -687,10 +694,10 @@ func (d *Database) IncrementalSync(
var joinedRoomIDs []string
var err error
if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
r := types.Range{
From: fromPos.PDUPosition(),
To: toPos.PDUPosition(),
From: fromPos.PDUPosition,
To: toPos.PDUPosition,
}
joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
@ -772,7 +779,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
}
r := types.Range{
From: 0,
To: toPos.PDUPosition(),
To: toPos.PDUPosition,
}
res.NextBatch = toPos.String()
@ -915,7 +922,7 @@ func (d *Database) CompleteSync(
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
types.NewStreamToken(0, 0, 0, 0, nil), toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)

View file

@ -166,7 +166,7 @@ func TestSyncResponse(t *testing.T) {
Name: "IncrementalSync penultimate",
DoSync: func() (*types.Response, error) {
from := types.NewStreamToken( // pretend we are at the penultimate event
positions[len(positions)-2], types.StreamPosition(0), nil,
positions[len(positions)-2], 0, 0, 0, nil,
)
res := types.NewResponse()
return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
@ -179,7 +179,7 @@ func TestSyncResponse(t *testing.T) {
Name: "IncrementalSync limited",
DoSync: func() (*types.Response, error) {
from := types.NewStreamToken( // pretend we are 10 events behind
positions[len(positions)-11], types.StreamPosition(0), nil,
positions[len(positions)-11], 0, 0, 0, nil,
)
res := types.NewResponse()
// limit is set to 5
@ -222,7 +222,7 @@ func TestSyncResponse(t *testing.T) {
if err != nil {
st.Fatalf("failed to do sync: %s", err)
}
next := types.NewStreamToken(latest.PDUPosition(), latest.EDUPosition(), nil)
next := types.NewStreamToken(latest.PDUPosition, latest.TypingPosition, latest.ReceiptPosition, latest.SendToDevicePosition, nil)
if res.NextBatch != next.String() {
st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String())
}
@ -246,7 +246,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
t.Fatalf("failed to get SyncPosition: %s", err)
}
from := types.NewStreamToken(
positions[len(positions)-2], types.StreamPosition(0), nil,
positions[len(positions)-2], 0, 0, 0, nil,
)
res := types.NewResponse()
@ -291,7 +291,7 @@ func TestGetEventsInRangeWithStreamToken(t *testing.T) {
t.Fatalf("failed to get SyncPosition: %s", err)
}
// head towards the beginning of time
to := types.NewStreamToken(0, 0, nil)
to := types.NewStreamToken(0, 0, 0, 0, nil)
// backpaginate 5 messages starting at the latest position.
paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true)
@ -534,14 +534,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point there should be no messages. We haven't sent anything
// yet.
events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, nil))
events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, 0, nil))
if err != nil {
t.Fatal(err)
}
if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 {
t.Fatal("first call should have no updates")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, 0, nil))
if err != nil {
return
}
@ -559,14 +559,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should get exactly one message. We're sending the sync position
// that we were given from the update and the send-to-device update will be updated
// in the database to reflect that this was the sync position we sent the message at.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos, nil))
if err != nil {
t.Fatal(err)
}
if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 {
t.Fatal("second call should have one update")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos, nil))
if err != nil {
return
}
@ -574,35 +574,35 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should still have one message because we haven't progressed the
// sync position yet. This is equivalent to the client failing to /sync and retrying
// with the same position.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos, nil))
if err != nil {
t.Fatal(err)
}
if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 {
t.Fatal("third call should have one update still")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos, nil))
if err != nil {
return
}
// At this point we should now have no updates, because we've progressed the sync
// position. Therefore the update from before will not be sent again.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+1, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos+1, nil))
if err != nil {
t.Fatal(err)
}
if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 {
t.Fatal("fourth call should have no updates")
}
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos+1, nil))
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, 0, streamPos+1, nil))
if err != nil {
return
}
// At this point we should still have no updates, because no new updates have been
// sent.
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+2, nil))
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, 0, streamPos+2, nil))
if err != nil {
t.Fatal(err)
}
@ -639,7 +639,7 @@ func TestInviteBehaviour(t *testing.T) {
}
// both invite events should appear in a new sync
beforeRetireRes := types.NewResponse()
beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false)
beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, 0, 0, nil), latest, 0, false)
if err != nil {
t.Fatalf("IncrementalSync failed: %s", err)
}
@ -654,7 +654,7 @@ func TestInviteBehaviour(t *testing.T) {
t.Fatalf("failed to get SyncPosition: %s", err)
}
res := types.NewResponse()
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false)
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, 0, 0, nil), latest, 0, false)
if err != nil {
t.Fatalf("IncrementalSync failed: %s", err)
}

View file

@ -32,11 +32,11 @@ var (
randomMessageEvent gomatrixserverlib.HeaderedEvent
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
bobLeaveEvent gomatrixserverlib.HeaderedEvent
syncPositionVeryOld = types.NewStreamToken(5, 0, nil)
syncPositionBefore = types.NewStreamToken(11, 0, nil)
syncPositionAfter = types.NewStreamToken(12, 0, nil)
syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition(), 1, nil)
syncPositionAfter2 = types.NewStreamToken(13, 0, nil)
syncPositionVeryOld = types.NewStreamToken(5, 0, 0, 0, nil)
syncPositionBefore = types.NewStreamToken(11, 0, 0, 0, nil)
syncPositionAfter = types.NewStreamToken(12, 0, 0, 0, nil)
syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil)
syncPositionAfter2 = types.NewStreamToken(13, 0, 0, 0, nil)
)
var (

View file

@ -65,7 +65,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
since = &tok
}
if since == nil {
tok := types.NewStreamToken(0, 0, nil)
tok := types.NewStreamToken(0, 0, 0, 0, nil)
since = &tok
}
timelineLimit := DefaultTimelineLimit

View file

@ -254,7 +254,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
}
// TODO: handle ignored users
if req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0 {
if req.since.IsEmpty() {
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
if err != nil {
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
@ -267,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
}
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter)
if err != nil {
return res, fmt.Errorf("rp.appendAccountData: %w", err)
}
@ -299,7 +299,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// Get the next_batch from the sync response and increase the
// EDU counter.
if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
pos.Positions[1]++
pos.SendToDevicePosition++
res.NextBatch = pos.String()
}
}
@ -328,7 +328,7 @@ func (rp *RequestPool) appendAccountData(
// data keys were set between two message. This isn't a huge issue since the
// duplicate data doesn't represent a huge quantity of data, but an optimisation
// here would be making sure each data is sent only once to the client.
if req.since == nil || (req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0) {
if req.since.IsEmpty() {
// If this is the initial sync, we don't need to check if a data has
// already been sent. Instead, we send the whole batch.
dataReq := &userapi.QueryAccountDataRequest{
@ -363,7 +363,7 @@ func (rp *RequestPool) appendAccountData(
}
r := types.Range{
From: req.since.PDUPosition(),
From: req.since.PDUPosition,
To: currentPos,
}
// If both positions are the same, it means that the data was saved after the

View file

@ -16,7 +16,6 @@ package types
import (
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
@ -107,8 +106,11 @@ const (
)
type StreamingToken struct {
syncToken
logs map[string]*LogPosition
PDUPosition StreamPosition
TypingPosition StreamPosition
ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition
logs map[string]*LogPosition
}
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
@ -126,29 +128,33 @@ func (t *StreamingToken) Log(name string) *LogPosition {
return l
}
func (t *StreamingToken) PDUPosition() StreamPosition {
return t.Positions[0]
}
func (t *StreamingToken) EDUPosition() StreamPosition {
return t.Positions[1]
}
func (t *StreamingToken) String() string {
posStr := fmt.Sprintf(
"s%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
)
var logStrings []string
for name, lp := range t.logs {
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
logStrings = append(logStrings, logStr)
}
sort.Strings(logStrings)
// E.g s11_22_33.dl0-134.ab1-441
return strings.Join(append([]string{t.syncToken.String()}, logStrings...), ".")
// E.g s11_22_33_44.dl0-134.ab1-441
return strings.Join(append([]string{posStr}, logStrings...), ".")
}
// IsAfter returns true if ANY position in this token is greater than `other`.
func (t *StreamingToken) IsAfter(other StreamingToken) bool {
for i := range other.Positions {
if t.Positions[i] > other.Positions[i] {
return true
}
switch {
case t.PDUPosition > other.PDUPosition:
return true
case t.TypingPosition > other.TypingPosition:
return true
case t.ReceiptPosition > other.ReceiptPosition:
return true
case t.SendToDevicePosition > other.SendToDevicePosition:
return true
}
for name := range t.logs {
otherLog := other.Log(name)
@ -162,19 +168,25 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return false
}
func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
// If the latter StreamingToken contains a field that is not 0, it is considered an update,
// and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called.
// If the other token has a log, they will replace any existing log on this token.
func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) {
ret.Type = t.Type
ret.Positions = make([]StreamPosition, len(t.Positions))
for i := range t.Positions {
ret.Positions[i] = t.Positions[i]
if other.Positions[i] == 0 {
continue
}
ret.Positions[i] = other.Positions[i]
ret = *t
switch {
case other.PDUPosition > 0:
ret.PDUPosition = other.PDUPosition
case other.TypingPosition > 0:
ret.TypingPosition = other.TypingPosition
case other.ReceiptPosition > 0:
ret.ReceiptPosition = other.ReceiptPosition
case other.SendToDevicePosition > 0:
ret.SendToDevicePosition = other.SendToDevicePosition
}
ret.logs = make(map[string]*LogPosition)
for name := range t.logs {
@ -189,26 +201,23 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
}
type TopologyToken struct {
syncToken
Depth StreamPosition
PDUPosition StreamPosition
ReceiptPosition StreamPosition
}
func (t *TopologyToken) Depth() StreamPosition {
return t.Positions[0]
}
func (t *TopologyToken) PDUPosition() StreamPosition {
return t.Positions[1]
}
func (t *TopologyToken) StreamToken() StreamingToken {
return NewStreamToken(t.PDUPosition(), 0, nil)
return NewStreamToken(t.PDUPosition, 0, t.ReceiptPosition, 0, nil)
}
func (t *TopologyToken) String() string {
return t.syncToken.String()
return fmt.Sprintf("t%d_%d_%d", t.Depth, t.PDUPosition, t.ReceiptPosition)
}
// Decrement the topology token to one event earlier.
func (t *TopologyToken) Decrement() {
depth := t.Positions[0]
pduPos := t.Positions[1]
depth := t.Depth
pduPos := t.PDUPosition
if depth-1 <= 0 {
// nothing can be lower than this
depth = 1
@ -223,151 +232,126 @@ func (t *TopologyToken) Decrement() {
if depth < 1 {
depth = 1
}
t.Positions = []StreamPosition{
depth, pduPos,
}
}
// NewSyncTokenFromString takes a string of the form "xyyyy..." where "x"
// represents the type of a pagination token and "yyyy..." the token itself, and
// parses it in order to create a new instance of SyncToken. Returns an
// error if the token couldn't be parsed into an int64, or if the token type
// isn't a known type (returns ErrInvalidSyncTokenType in the latter
// case).
func newSyncTokenFromString(s string) (token *syncToken, categories []string, err error) {
if len(s) == 0 {
return nil, nil, ErrInvalidSyncTokenLen
}
token = new(syncToken)
var positions []string
switch t := SyncTokenType(s[:1]); t {
case SyncTokenTypeStream, SyncTokenTypeTopology:
token.Type = t
categories = strings.Split(s[1:], ".")
positions = strings.Split(categories[0], "_")
default:
return nil, nil, ErrInvalidSyncTokenType
}
for _, pos := range positions {
if posInt, err := strconv.ParseInt(pos, 10, 64); err != nil {
return nil, nil, err
} else if posInt < 0 {
return nil, nil, errors.New("negative position not allowed")
} else {
token.Positions = append(token.Positions, StreamPosition(posInt))
}
}
return
t.Depth = depth
t.PDUPosition = pduPos
}
// NewTopologyToken creates a new sync token for /messages
func NewTopologyToken(depth, streamPos StreamPosition) TopologyToken {
func NewTopologyToken(depth, pduPos StreamPosition) TopologyToken {
if depth < 0 {
depth = 1
}
return TopologyToken{
syncToken: syncToken{
Type: SyncTokenTypeTopology,
Positions: []StreamPosition{depth, streamPos},
},
Depth: depth,
PDUPosition: pduPos,
}
}
func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
t, _, err := newSyncTokenFromString(tok)
if tok[0] != SyncTokenTypeTopology[0] {
err = fmt.Errorf("topology token must start with 't'")
return
}
parts := strings.Split(tok[1:], "_")
if len(parts) < 3 {
err = fmt.Errorf("topology token must have 3 positions")
return
}
depth, err := strconv.Atoi(parts[0])
if err != nil {
return
}
if t.Type != SyncTokenTypeTopology {
err = fmt.Errorf("token %s is not a topology token", tok)
pduPos, err := strconv.Atoi(parts[1])
if err != nil {
return
}
if len(t.Positions) < 2 {
err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions))
receiptPos, err := strconv.Atoi(parts[2])
if err != nil {
return
}
return TopologyToken{
syncToken: *t,
}, nil
token = TopologyToken{
Depth: StreamPosition(depth),
PDUPosition: StreamPosition(pduPos),
ReceiptPosition: StreamPosition(receiptPos),
}
return
}
// NewStreamToken creates a new sync token for /sync
func NewStreamToken(pduPos, eduPos StreamPosition, logs map[string]*LogPosition) StreamingToken {
func NewStreamToken(
pduPos, typingPos, receiptPos, sendToDevicePos StreamPosition,
logs map[string]*LogPosition,
) StreamingToken {
if logs == nil {
logs = make(map[string]*LogPosition)
}
return StreamingToken{
syncToken: syncToken{
Type: SyncTokenTypeStream,
Positions: []StreamPosition{pduPos, eduPos},
},
logs: logs,
PDUPosition: pduPos,
TypingPosition: typingPos,
ReceiptPosition: receiptPos,
SendToDevicePosition: sendToDevicePos,
logs: logs,
}
}
func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
t, categories, err := newSyncTokenFromString(tok)
if tok[0] != SyncTokenTypeStream[0] {
err = fmt.Errorf("stream token must start with 's'")
return
}
categories := strings.Split(tok[1:], ".")
parts := strings.Split(categories[0], "_")
if len(parts) < 4 {
err = fmt.Errorf("stream token must have 4 positions")
return
}
pduPos, err := strconv.Atoi(parts[0])
if err != nil {
return
}
if t.Type != SyncTokenTypeStream {
err = fmt.Errorf("token %s is not a streaming token", tok)
typingPos, err := strconv.Atoi(parts[1])
if err != nil {
return
}
if len(t.Positions) < 2 {
err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions))
receiptPos, err := strconv.Atoi(parts[2])
if err != nil {
return
}
logs := make(map[string]*LogPosition)
if len(categories) > 1 {
// dl-0-1234
// $log_name-$partition-$offset
for _, logStr := range categories[1:] {
segments := strings.Split(logStr, "-")
if len(segments) != 3 {
err = fmt.Errorf("token %s - invalid log: %s", tok, logStr)
return
}
var partition int64
partition, err = strconv.ParseInt(segments[1], 10, 32)
if err != nil {
return
}
var offset int64
offset, err = strconv.ParseInt(segments[2], 10, 64)
if err != nil {
return
}
logs[segments[0]] = &LogPosition{
Partition: int32(partition),
Offset: offset,
}
sendToDevicePos, err := strconv.Atoi(parts[3])
if err != nil {
return
}
token = StreamingToken{
PDUPosition: StreamPosition(pduPos),
TypingPosition: StreamPosition(typingPos),
ReceiptPosition: StreamPosition(receiptPos),
SendToDevicePosition: StreamPosition(sendToDevicePos),
logs: make(map[string]*LogPosition),
}
// dl-0-1234
// $log_name-$partition-$offset
for _, logStr := range categories[1:] {
segments := strings.Split(logStr, "-")
if len(segments) != 3 {
err = fmt.Errorf("token %s - invalid log: %s", tok, logStr)
return
}
var partition int64
partition, err = strconv.ParseInt(segments[1], 10, 32)
if err != nil {
return
}
var offset int64
offset, err = strconv.ParseInt(segments[2], 10, 64)
if err != nil {
return
}
token.logs[segments[0]] = &LogPosition{
Partition: int32(partition),
Offset: offset,
}
}
return StreamingToken{
syncToken: *t,
logs: logs,
}, nil
}
// syncToken represents a syncapi token, used for interactions with
// /sync or /messages, for example.
type syncToken struct {
Type SyncTokenType
// A list of stream positions, their meanings vary depending on the token type.
Positions []StreamPosition
}
// String translates a SyncToken to a string of the "xyyyy..." (see
// NewSyncToken to know what it represents).
func (p *syncToken) String() string {
posStr := make([]string, len(p.Positions))
for i := range p.Positions {
posStr[i] = strconv.FormatInt(int64(p.Positions[i]), 10)
}
return fmt.Sprintf("%s%s", p.Type, strings.Join(posStr, "_"))
return token, nil
}
// PrevEventRef represents a reference to a previous event in a state event upgrade

View file

@ -10,12 +10,12 @@ import (
func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{
"s4_0": {
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
logs: make(map[string]*LogPosition),
"s4_0_0_0": {
PDUPosition: 4,
logs: make(map[string]*LogPosition),
},
"s4_0.dl-0-123": {
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
"s4_0_0_0.dl-0-123": {
PDUPosition: 4,
logs: map[string]*LogPosition{
"dl": {
Partition: 0,
@ -23,8 +23,8 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
},
},
},
"s4_0.ab-1-14419482332.dl-0-123": {
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
"s4_0_0_0.ab-1-14419482332.dl-0-123": {
PDUPosition: 4,
logs: map[string]*LogPosition{
"ab": {
Partition: 1,
@ -56,6 +56,7 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
}
}
/*
func TestNewSyncTokenFromString(t *testing.T) {
shouldPass := map[string]syncToken{
"s4_0": NewStreamToken(4, 0, nil).syncToken,
@ -90,6 +91,7 @@ func TestNewSyncTokenFromString(t *testing.T) {
}
}
}
*/
func TestNewInviteResponse(t *testing.T) {
event := `{"auth_events":["$SbSsh09j26UAXnjd3RZqf2lyA3Kw2sY_VZJVZQAV9yA","$EwL53onrLwQ5gL8Dv3VrOOCvHiueXu2ovLdzqkNi3lo","$l2wGmz9iAwevBDGpHT_xXLUA5O8BhORxWIGU1cGi1ZM","$GsWFJLXgdlF5HpZeyWkP72tzXYWW3uQ9X28HBuTztHE"],"content":{"avatar_url":"","displayname":"neilalexander","membership":"invite"},"depth":9,"hashes":{"sha256":"8p+Ur4f8vLFX6mkIXhxI0kegPG7X3tWy56QmvBkExAg"},"origin":"matrix.org","origin_server_ts":1602087113066,"prev_events":["$1v-O6tNwhOZcA8bvCYY-Dnj1V2ZDE58lLPxtlV97S28"],"prev_state":[],"room_id":"!XbeXirGWSPXbEaGokF:matrix.org","sender":"@neilalexander:matrix.org","signatures":{"dendrite.neilalexander.dev":{"ed25519:BMJi":"05KQ5lPw0cSFsE4A0x1z7vi/3cc8bG4WHUsFWYkhxvk/XkXMGIYAYkpNThIvSeLfdcHlbm/k10AsBSKH8Uq4DA"},"matrix.org":{"ed25519:a_RXGa":"jeovuHr9E/x0sHbFkdfxDDYV/EyoeLi98douZYqZ02iYddtKhfB7R3WLay/a+D3V3V7IW0FUmPh/A404x5sYCw"}},"state_key":"@neilalexander:dendrite.neilalexander.dev","type":"m.room.member","unsigned":{"age":2512,"invite_room_state":[{"content":{"join_rule":"invite"},"sender":"@neilalexander:matrix.org","state_key":"","type":"m.room.join_rules"},{"content":{"avatar_url":"mxc://matrix.org/BpDaozLwgLnlNStxDxvLzhPr","displayname":"neilalexander","membership":"join"},"sender":"@neilalexander:matrix.org","state_key":"@neilalexander:matrix.org","type":"m.room.member"},{"content":{"name":"Test room"},"sender":"@neilalexander:matrix.org","state_key":"","type":"m.room.name"}]},"_room_version":"5"}`