diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index c9b9e5d16..0d82f7a58 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -114,12 +114,14 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams - posUpdate := types.NewStreamToken(0, 0, 0, 0, map[string]*types.LogPosition{ - syncinternal.DeviceListLogName: { - Offset: msg.Offset, - Partition: msg.Partition, + posUpdate := types.StreamingToken{ + Logs: map[string]*types.LogPosition{ + syncinternal.DeviceListLogName: { + Offset: msg.Offset, + Partition: msg.Partition, + }, }, - }) + } for userID := range queryRes.UserIDsToCount { s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) } diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 6e87f7edc..f65db0a5b 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -16,13 +16,15 @@ import ( var ( syncingUser = "@alice:localhost" - emptyToken = types.NewStreamToken(0, 0, 0, 0, nil) - newestToken = types.NewStreamToken(0, 0, 0, 0, map[string]*types.LogPosition{ - DeviceListLogName: { - Offset: sarama.OffsetNewest, - Partition: 0, + emptyToken = types.StreamingToken{} + newestToken = types.StreamingToken{ + Logs: map[string]*types.LogPosition{ + DeviceListLogName: { + Offset: sarama.OffsetNewest, + Partition: 0, + }, }, - }) + } ) type mockKeyAPI struct{} diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 7ccf6384c..fe76b74e0 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -110,18 +110,18 @@ type StreamingToken struct { TypingPosition StreamPosition ReceiptPosition StreamPosition SendToDevicePosition StreamPosition - logs map[string]*LogPosition + Logs map[string]*LogPosition } func (t *StreamingToken) SetLog(name string, lp *LogPosition) { - if t.logs == nil { - t.logs = make(map[string]*LogPosition) + if t.Logs == nil { + t.Logs = make(map[string]*LogPosition) } - t.logs[name] = lp + t.Logs[name] = lp } func (t *StreamingToken) Log(name string) *LogPosition { - l, ok := t.logs[name] + l, ok := t.Logs[name] if !ok { return nil } @@ -135,7 +135,7 @@ func (t StreamingToken) String() string { t.ReceiptPosition, t.SendToDevicePosition, ) var logStrings []string - for name, lp := range t.logs { + for name, lp := range t.Logs { logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset) logStrings = append(logStrings, logStr) } @@ -156,12 +156,12 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { case t.SendToDevicePosition > other.SendToDevicePosition: return true } - for name := range t.logs { + for name := range t.Logs { otherLog := other.Log(name) if otherLog == nil { continue } - if t.logs[name].IsAfter(otherLog) { + if t.Logs[name].IsAfter(otherLog) { return true } } @@ -188,14 +188,14 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) case other.SendToDevicePosition > 0: ret.SendToDevicePosition = other.SendToDevicePosition } - ret.logs = make(map[string]*LogPosition) - for name := range t.logs { + ret.Logs = make(map[string]*LogPosition) + for name := range t.Logs { otherLog := other.Log(name) if otherLog == nil { continue } copy := *otherLog - ret.logs[name] = © + ret.Logs[name] = © } return ret } @@ -294,7 +294,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { TypingPosition: positions[1], ReceiptPosition: positions[2], SendToDevicePosition: positions[3], - logs: make(map[string]*LogPosition), + Logs: make(map[string]*LogPosition), } // dl-0-1234 // $log_name-$partition-$offset @@ -314,7 +314,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { if err != nil { return } - token.logs[segments[0]] = &LogPosition{ + token.Logs[segments[0]] = &LogPosition{ Partition: int32(partition), Offset: offset, }