Skip to content

Commit

Permalink
All producers are now closed on JVM exit.
Browse files Browse the repository at this point in the history
  • Loading branch information
manub committed Nov 21, 2015
1 parent b66e845 commit 5a0ce5e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# scalatest-embedded-kafka
A library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.8.2.1 and ZooKeeper 3.4.6.
A library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.8.2.2 and ZooKeeper 3.4.6.

Inspired by https://github.com/chbatey/kafka-unit

## How to use

scalatest-embedded-kafka is available on Bintray and Maven Central, compiled for both Scala 2.10 and 2.11

* In your `build.sbt` file add the following dependency: `"net.manub" %% "scalatest-embedded-kafka" % "0.3.0" % "test"`
* In your `build.sbt` file add the following dependency: `"net.manub" %% "scalatest-embedded-kafka" % "0.4.1" % "test"`
* Have your `Spec` extend the `EmbeddedKafka` trait.
* Enclose the code that needs a running instance of Kafka within the `withRunningKafka` closure.

Expand Down
20 changes: 16 additions & 4 deletions src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,26 @@ sealed trait EmbeddedKafkaSupport {
}

object aKafkaProducer {
private[this] var producers = Vector.empty[KafkaProducer[_, _]]

sys.addShutdownHook {
producers.foreach(_.close())
}

def thatSerializesValuesWith[V](serializer: Class[_ <: Serializer[V]])(implicit config: EmbeddedKafkaConfig) = {
new KafkaProducer[String, V](basicKafkaConfig(config) +(
val producer = new KafkaProducer[String, V](basicKafkaConfig(config) + (
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName))
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName)
)
producers :+= producer
producer
}

def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) =
new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer)
def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) = {
val producer = new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer)
producers :+= producer
producer
}

def basicKafkaConfig[V](config: EmbeddedKafkaConfig): Map[String, String] = Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.5.0-SNAPSHOT"
version in ThisBuild := "0.4.1-SNAPSHOT"

0 comments on commit 5a0ce5e

Please sign in to comment.