diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 385db186c..71c6ec887 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -1,6 +1,7 @@ package sync import ( + log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/common" sarama "gopkg.in/Shopify/sarama.v1" @@ -18,17 +19,25 @@ func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error) return nil, err } - return &Server{ - roomServerConsumer: &common.ContinualConsumer{ - Topic: cfg.RoomserverOutputTopic, - Consumer: kafkaConsumer, - PartitionStore: store, - }, - }, nil + consumer := common.ContinualConsumer{ + Topic: cfg.RoomserverOutputTopic, + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &Server{ + roomServerConsumer: &consumer, + } + consumer.ProcessMessage = s.onMessage + return s, nil } // Start consuming from room servers func (s *Server) Start() error { return s.roomServerConsumer.Start() } + +func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { + log.WithField("key", string(msg.Key)).WithField("val", string(msg.Value)).Info("Recv") + return nil +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index 01415eaaf..83ea51c3d 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -42,7 +42,7 @@ func main() { cfg := config.Sync{ KafkaConsumerURIs: []string{"localhost:9092"}, RoomserverOutputTopic: "roomserverOutput", - DataSource: "postgres://user:pass@localhost/dendrite-sync-server?sslmode=disable", + DataSource: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable", } log.Info("Starting sync server") diff --git a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go index 904e0be81..5b7a3fd17 100644 --- a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go +++ b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go @@ -57,6 +57,7 @@ func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]Part if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { return nil, err } + results = append(results, offset) } return results, nil }