Skip to content

Commit

Permalink
Merge pull request #508 from ivantopo/issue#502/allow-jvm-to-shutdown…
Browse files Browse the repository at this point in the history
…-when-no-reporters

turn all Kamon threads into daemon threads, except for reporters
  • Loading branch information
dpsoft authored Jan 30, 2018
2 parents 8231d18 + 083c31c commit 50e87dc
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 5 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ lazy val testkit = (project in file("kamon-testkit"))


lazy val coreTests = (project in file("kamon-core-tests"))
.settings(moduleName := "kamon-core-tests", resolvers += Resolver.mavenLocal)
.settings(
moduleName := "kamon-core-tests",
resolvers += Resolver.mavenLocal,
fork in Test := true)
.settings(noPublishing: _*)
.settings(commonSettings: _*)
.settings(
Expand Down
67 changes: 67 additions & 0 deletions kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package kamon

import java.io.File
import java.util.concurrent.TimeUnit

import com.typesafe.config.Config
import kamon.metric.PeriodSnapshot
import kamon.trace.Span
import org.scalatest.{Matchers, WordSpec}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{

"the Kamon lifecycle" should {
"keep the JVM running if reporters are running" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter"))
Thread.sleep(5000)
process.isAlive shouldBe true
process.destroyForcibly().waitFor(5, TimeUnit.SECONDS)
}

"let the JVM stop after all reporters are stopped" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithTemporaryReporter"))
Thread.sleep(2000)
process.isAlive shouldBe true

eventually(timeout(7 seconds)) {
process.isAlive shouldBe false
process.exitValue() shouldBe 0
}
}
}


def createProcessCommand(mainClass: String): String = {
System.getProperty("java.home") + File.separator + "bin" + File.separator + "java" +
" -cp " + System.getProperty("java.class.path") + " " + mainClass
}
}

class DummyMetricReporter extends MetricReporter {
override def start(): Unit = {}
override def stop(): Unit = {}
override def reconfigure(config: Config): Unit = {}
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {}
}

class DummySpanReporter extends SpanReporter {
override def start(): Unit = {}
override def stop(): Unit = {}
override def reconfigure(config: Config): Unit = {}
override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = {}
}

object KamonWithRunningReporter extends App {
Kamon.addReporter(new DummyMetricReporter())
Kamon.addReporter(new DummySpanReporter())
}

object KamonWithTemporaryReporter extends App {
Kamon.addReporter(new DummyMetricReporter())
Kamon.addReporter(new DummySpanReporter())

Thread.sleep(5000)
Kamon.stopAllReporters()
}
2 changes: 1 addition & 1 deletion kamon-core/src/main/scala/kamon/Kamon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
@volatile private var _filters = Filters.fromConfig(_config)

private val _clock = new Clock.Default()
private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler"))
private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler", daemon = true))
private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
Expand Down
2 changes: 1 addition & 1 deletion kamon-core/src/main/scala/kamon/ReporterRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object ReporterRegistry {

private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink {
private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry", daemon = true))
private val reporterCounter = new AtomicLong(0L)
private var registryConfiguration = readRegistryConfiguration(initialConfig)

Expand Down
6 changes: 4 additions & 2 deletions kamon-core/src/main/scala/kamon/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,27 @@ package object kamon {
/**
* Creates a thread factory that assigns the specified name to all created Threads.
*/
def threadFactory(name: String): ThreadFactory =
def threadFactory(name: String, daemon: Boolean = false): ThreadFactory =
new ThreadFactory {
val defaultFactory = Executors.defaultThreadFactory()

override def newThread(r: Runnable): Thread = {
val thread = defaultFactory.newThread(r)
thread.setName(name)
thread.setDaemon(daemon)
thread
}
}

def numberedThreadFactory(name: String): ThreadFactory =
def numberedThreadFactory(name: String, daemon: Boolean = false): ThreadFactory =
new ThreadFactory {
val count = new AtomicLong()
val defaultFactory = Executors.defaultThreadFactory()

override def newThread(r: Runnable): Thread = {
val thread = defaultFactory.newThread(r)
thread.setName(name + "-" + count.incrementAndGet().toString)
thread.setDaemon(daemon)
thread
}
}
Expand Down

0 comments on commit 50e87dc

Please sign in to comment.