Skip to content

Commit

Permalink
Merge pull request #58 from drpacman/master
Browse files Browse the repository at this point in the history
Added Spray client tracing support
  • Loading branch information
levkhomich committed May 14, 2015
2 parents 9713d0a + bb0e091 commit 2c463a1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ class TracingExtensionImpl(system: ActorSystem) extends Extension {
def finish(ts: BaseTracingSupport): Unit =
addAnnotation(ts, thrift.zipkinConstants.SERVER_SEND, send = true)

def finishChildRequest(ts: BaseTracingSupport): Unit =
addAnnotation(ts, thrift.zipkinConstants.CLIENT_RECV, send = true)

def submitSpans(spans: TraversableOnce[thrift.Span]): Unit =
if (isEnabled)
holder ! SubmitSpans(spans)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ private[tracing] class SpanHolder(spans: Agent[mutable.Map[Long, thrift.Span]])
val a = new thrift.Annotation(adjustedMicroTime(timestamp), msg)
a.set_host(endpointFor(tracingId))
spanInt.add_to_annotations(a)
if (a.value == thrift.zipkinConstants.SERVER_SEND) {
if (a.value == thrift.zipkinConstants.SERVER_SEND ||
a.value == thrift.zipkinConstants.CLIENT_RECV) {
enqueue(tracingId, cancelJob = true)
}
}
Expand Down
9 changes: 8 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ object Dependencies {
"io.spray" %% "spray-routing" % "1.3.2"
}

def sprayClient(scalaVersion: String): ModuleID = {
if (scalaVersion.startsWith("2.10"))
"io.spray" % "spray-client" % "1.3.1"
else
"io.spray" %% "spray-client" % "1.3.2"
}

val akkaActor = "com.typesafe.akka" %% "akka-actor" % AkkaVersion
val akkaAgent = "com.typesafe.akka" %% "akka-agent" % AkkaVersion
val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M4"
Expand Down Expand Up @@ -205,7 +212,7 @@ object Dependencies {
val thrift = Seq(Compile.libThrift, Compile.slf4jLog4j12)

def spray(scalaVersion: String): Seq[ModuleID] =
Seq(Compile.sprayRouting(scalaVersion))
Seq(Compile.sprayRouting(scalaVersion), Compile.sprayClient(scalaVersion))

def test(scalaVersion: String): Seq[ModuleID] =
Seq(Test.specs, Test.finagle, Test.playTest, Test.akkaTest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.github.levkhomich.akka.tracing.http

import spray.http.{ HttpResponse, HttpRequest }
import spray.client.pipelining._
import akka.actor.ActorSystem
import com.github.levkhomich.akka.tracing._

case class TracedClientRequest() extends TracingSupport

trait TracedSprayPipeline {

val system: ActorSystem
def sendAndReceive: SendReceive

implicit lazy val trace: TracingExtensionImpl = TracingExtension(system)
import system.dispatcher

def tracedPipeline[T](trigger: BaseTracingSupport) = {
val clientReq = TracedClientRequest().asChildOf(trigger)
addHeader("X-B3-TraceId", Span.asString(clientReq.tracingId))
addHeader("X-B3-Sampled", trace.getId(clientReq.tracingId).isDefined.toString) ~>
startTrace(clientReq) ~>
sendAndReceive ~>
completeTrace(clientReq)
}

def startTrace(clientReq: BaseTracingSupport)(req: HttpRequest): HttpRequest = {
trace.record(clientReq, thrift.zipkinConstants.CLIENT_SEND)
trace.record(clientReq, s"requesting to ${req.uri}")
req
}

def completeTrace(clientReq: BaseTracingSupport)(response: HttpResponse): HttpResponse = {
trace.record(clientReq, s"response code ${response.status}")
trace.finishChildRequest(clientReq)
response
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.github.levkhomich.akka.tracing.http

import java.util.concurrent.TimeoutException
import scala.concurrent.duration._

import spray.http._
import spray.client.pipelining._
import scala.concurrent.Future
import com.github.levkhomich.akka.tracing._
import org.specs2.mutable.Specification
import org.specs2.matcher.FutureMatchers

class TracedPipelineSpec extends Specification with FutureMatchers with TracingTestCommons with TracingTestActorSystem with MockCollector { self =>

sequential

def mockedPipeline(mockResponse: HttpResponse) = new TracedSprayPipeline {
val system = self.system
override def sendAndReceive = {
case x: HttpRequest =>
Future.successful(mockResponse)
}
}

val bodyEntity = spray.http.HttpEntity("test")

"tracedPipeline" should {
"Generate a sampled span when pipeline is used" in {
val mockResponse = HttpResponse(StatusCodes.OK, bodyEntity)
val pipeline = mockedPipeline(mockResponse)
val mockTrigger = nextRandomMessage
trace.forcedSample(mockTrigger, "test trace")
pipeline.tracedPipeline[String](mockTrigger)(Get("http://test.com"))
trace.finish(mockTrigger)
expectSpans(2)
}
}

step {
shutdown()
}
}

0 comments on commit 2c463a1

Please sign in to comment.