diff --git a/README.md b/README.md index 303b004..3ee4095 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # scalatest-embedded-kafka -A library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.8.2.1 and ZooKeeper 3.4.6. +A library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.8.2.2 and ZooKeeper 3.4.6. Inspired by https://github.com/chbatey/kafka-unit @@ -7,7 +7,7 @@ Inspired by https://github.com/chbatey/kafka-unit scalatest-embedded-kafka is available on Bintray and Maven Central, compiled for both Scala 2.10 and 2.11 -* In your `build.sbt` file add the following dependency: `"net.manub" %% "scalatest-embedded-kafka" % "0.3.0" % "test"` +* In your `build.sbt` file add the following dependency: `"net.manub" %% "scalatest-embedded-kafka" % "0.4.1" % "test"` * Have your `Spec` extend the `EmbeddedKafka` trait. * Enclose the code that needs a running instance of Kafka within the `withRunningKafka` closure. diff --git a/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala b/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala index 3871e5f..422b3dd 100644 --- a/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala +++ b/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala @@ -161,14 +161,26 @@ sealed trait EmbeddedKafkaSupport { } object aKafkaProducer { + private[this] var producers = Vector.empty[KafkaProducer[_, _]] + + sys.addShutdownHook { + producers.foreach(_.close()) + } + def thatSerializesValuesWith[V](serializer: Class[_ <: Serializer[V]])(implicit config: EmbeddedKafkaConfig) = { - new KafkaProducer[String, V](basicKafkaConfig(config) +( + val producer = new KafkaProducer[String, V](basicKafkaConfig(config) + ( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName)) + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName) + ) + producers :+= producer + producer } - def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) = - new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer) + def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) = { + val producer = new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer) + producers :+= producer + producer + } def basicKafkaConfig[V](config: EmbeddedKafkaConfig): Map[String, String] = Map( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}", diff --git a/version.sbt b/version.sbt index c9f9c29..a6c955b 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.5.0-SNAPSHOT" \ No newline at end of file +version in ThisBuild := "0.4.1-SNAPSHOT" \ No newline at end of file