Log messages as they are consumed. Fix bug with remembering offsets

This commit is contained in:
Kegan Dougal 2017-03-27 11:58:16 +01:00
parent 02f1d353b9
commit 32ae63ee40
3 changed files with 18 additions and 8 deletions

View file

@ -1,6 +1,7 @@
package sync
import (
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/common"
sarama "gopkg.in/Shopify/sarama.v1"
@ -18,17 +19,25 @@ func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error)
return nil, err
}
return &Server{
roomServerConsumer: &common.ContinualConsumer{
Topic: cfg.RoomserverOutputTopic,
Consumer: kafkaConsumer,
PartitionStore: store,
},
}, nil
consumer := common.ContinualConsumer{
Topic: cfg.RoomserverOutputTopic,
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &Server{
roomServerConsumer: &consumer,
}
consumer.ProcessMessage = s.onMessage
return s, nil
}
// Start consuming from room servers
func (s *Server) Start() error {
return s.roomServerConsumer.Start()
}
func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
log.WithField("key", string(msg.Key)).WithField("val", string(msg.Value)).Info("Recv")
return nil
}

View file

@ -42,7 +42,7 @@ func main() {
cfg := config.Sync{
KafkaConsumerURIs: []string{"localhost:9092"},
RoomserverOutputTopic: "roomserverOutput",
DataSource: "postgres://user:pass@localhost/dendrite-sync-server?sslmode=disable",
DataSource: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable",
}
log.Info("Starting sync server")

View file

@ -57,6 +57,7 @@ func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]Part
if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
return nil, err
}
results = append(results, offset)
}
return results, nil
}