Summarise key change logs

This commit is contained in:
Kegan Dougal 2020-08-18 10:23:03 +01:00
parent a5a85c6a11
commit 1721ceb841

View file

@ -42,6 +42,7 @@ func (p *KeyChange) DefaultPartition() int32 {
// ProduceKeyChanges creates new change events for each key
func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
userToDeviceCount := make(map[string]int)
for _, key := range keys {
var m sarama.ProducerMessage
@ -62,12 +63,12 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
if err != nil {
return err
}
userToDeviceCount[key.UserID]++
}
for userID, count := range userToDeviceCount {
logrus.WithFields(logrus.Fields{
"user_id": key.UserID,
"device_id": key.DeviceID,
"partition": partition,
"offset": offset,
"len_key_bytes": len(key.KeyJSON),
"user_id": userID,
"num_key_changes": count,
}).Infof("Produced to key change topic '%s'", p.Topic)
}
return nil