Export Logs in StreamingToken

This commit is contained in:
Neil Alexander 2020-12-10 18:39:28 +00:00
parent e524b10fb9
commit a390479909
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 28 additions and 24 deletions

View file

@ -114,12 +114,14 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.NewStreamToken(0, 0, 0, 0, map[string]*types.LogPosition{
posUpdate := types.StreamingToken{
Logs: map[string]*types.LogPosition{
syncinternal.DeviceListLogName: {
Offset: msg.Offset,
Partition: msg.Partition,
},
})
},
}
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}

View file

@ -16,13 +16,15 @@ import (
var (
syncingUser = "@alice:localhost"
emptyToken = types.NewStreamToken(0, 0, 0, 0, nil)
newestToken = types.NewStreamToken(0, 0, 0, 0, map[string]*types.LogPosition{
emptyToken = types.StreamingToken{}
newestToken = types.StreamingToken{
Logs: map[string]*types.LogPosition{
DeviceListLogName: {
Offset: sarama.OffsetNewest,
Partition: 0,
},
})
},
}
)
type mockKeyAPI struct{}

View file

@ -110,18 +110,18 @@ type StreamingToken struct {
TypingPosition StreamPosition
ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition
logs map[string]*LogPosition
Logs map[string]*LogPosition
}
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
if t.logs == nil {
t.logs = make(map[string]*LogPosition)
if t.Logs == nil {
t.Logs = make(map[string]*LogPosition)
}
t.logs[name] = lp
t.Logs[name] = lp
}
func (t *StreamingToken) Log(name string) *LogPosition {
l, ok := t.logs[name]
l, ok := t.Logs[name]
if !ok {
return nil
}
@ -135,7 +135,7 @@ func (t StreamingToken) String() string {
t.ReceiptPosition, t.SendToDevicePosition,
)
var logStrings []string
for name, lp := range t.logs {
for name, lp := range t.Logs {
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
logStrings = append(logStrings, logStr)
}
@ -156,12 +156,12 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
case t.SendToDevicePosition > other.SendToDevicePosition:
return true
}
for name := range t.logs {
for name := range t.Logs {
otherLog := other.Log(name)
if otherLog == nil {
continue
}
if t.logs[name].IsAfter(otherLog) {
if t.Logs[name].IsAfter(otherLog) {
return true
}
}
@ -188,14 +188,14 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
case other.SendToDevicePosition > 0:
ret.SendToDevicePosition = other.SendToDevicePosition
}
ret.logs = make(map[string]*LogPosition)
for name := range t.logs {
ret.Logs = make(map[string]*LogPosition)
for name := range t.Logs {
otherLog := other.Log(name)
if otherLog == nil {
continue
}
copy := *otherLog
ret.logs[name] = &copy
ret.Logs[name] = &copy
}
return ret
}
@ -294,7 +294,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
TypingPosition: positions[1],
ReceiptPosition: positions[2],
SendToDevicePosition: positions[3],
logs: make(map[string]*LogPosition),
Logs: make(map[string]*LogPosition),
}
// dl-0-1234
// $log_name-$partition-$offset
@ -314,7 +314,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
if err != nil {
return
}
token.logs[segments[0]] = &LogPosition{
token.Logs[segments[0]] = &LogPosition{
Partition: int32(partition),
Offset: offset,
}