Skip to content

Commit

Permalink
Distributed tracing support #2192 (#2282)
Browse files Browse the repository at this point in the history
Enables Tracing support via Zipkin and OpenTracer. 

It can be enabled via config

   tracing {
        zipkin {
             url = "http://localhost:9411" //url to connecto to zipkin server
             //sample-rate to decide a request is sampled or not.
             sample-rate = "0.01" // sample 1% of requests by default
        }
    }

Tracing enables tracking of request from Controller to Invoker
  • Loading branch information
sandeep-paliwal authored and chetanmeh committed Jun 29, 2018
1 parent e38ea35 commit 63496d5
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 12 deletions.
8 changes: 8 additions & 0 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ dependencies {
compile 'io.kamon:kamon-statsd_2.11:0.6.7'
//for mesos
compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.7'

//tracing support
compile 'io.opentracing:opentracing-api:0.31.0'
compile 'io.opentracing:opentracing-util:0.31.0'
compile 'io.opentracing.brave:brave-opentracing:0.31.0'
compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1'
compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1'

scoverage gradle.scoverage.deps
}

Expand Down
14 changes: 14 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,18 @@ whisk {
constraint-delimiter = " "//used to parse constraint strings
teardown-on-exit = true //set to true to disable the mesos framework on system exit; set for false for HA deployments
}

# tracing configuration
tracing {
cache-expiry = 30 seconds #how long to keep spans in cache. Set to appropriate value to trace long running requests
#Zipkin configuration. Uncomment following to enable zipkin based tracing
#zipkin {
# url = "http://localhost:9411" //url to connecto to zipkin server
//sample-rate to decide a request is sampled or not.
//sample-rate 0.5 eqauls to sampling 50% of the requests
//sample-rate of 1 means 100% sampling.
//sample-rate of 0 means no sampling
# sample-rate = "0.01" // sample 1% of requests by default
#}
}
}
11 changes: 11 additions & 0 deletions common/scala/src/main/scala/whisk/common/TransactionId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import akka.http.scaladsl.model.headers.RawHeader
import pureconfig.loadConfigOrThrow
import spray.json._
import whisk.core.ConfigKeys
import pureconfig._
import whisk.common.tracing.WhiskTracerProvider

import scala.util.Try

Expand Down Expand Up @@ -82,6 +84,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
}

MetricEmitter.emitCounterMetric(marker)

//tracing support
WhiskTracerProvider.tracer.startSpan(marker, this)
StartMarker(Instant.now, marker)
}

Expand Down Expand Up @@ -116,6 +121,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
}

MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)

//tracing support
WhiskTracerProvider.tracer.finishSpan(this)
}

/**
Expand Down Expand Up @@ -144,6 +152,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {

MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
MetricEmitter.emitCounterMetric(endMarker)

//tracing support
WhiskTracerProvider.tracer.error(this)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.common.tracing

import java.util.concurrent.TimeUnit

import brave.Tracing
import brave.opentracing.BraveTracer
import brave.sampler.Sampler
import com.github.benmanes.caffeine.cache.{Caffeine, Ticker}
import io.opentracing.propagation.{Format, TextMapExtractAdapter, TextMapInjectAdapter}
import io.opentracing.util.GlobalTracer
import io.opentracing.{Span, SpanContext, Tracer}
import pureconfig._
import whisk.common.{LogMarkerToken, TransactionId}
import whisk.core.ConfigKeys
import zipkin2.reporter.okhttp3.OkHttpSender
import zipkin2.reporter.{AsyncReporter, Sender}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration.Duration

/**
* OpenTracing based implementation for tracing
*/
class OpenTracer(val tracer: Tracer, tracingConfig: TracingConfig, ticker: Ticker = SystemTicker) extends WhiskTracer {
val spanMap = configureCache[String, List[Span]]()
val contextMap = configureCache[String, SpanContext]()

/**
* Start a Trace for given service.
*
* @param transactionId transactionId to which this Trace belongs.
* @return TracedRequest which provides details about current service being traced.
*/
override def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit = {
//initialize list for this transactionId
val spanList = spanMap.getOrElse(transactionId.meta.id, Nil)

val spanBuilder = tracer
.buildSpan(logMarker.action)
.withTag("transactionId", transactionId.meta.id)

val active = spanList match {
case Nil =>
//Check if any active context then resume from that else create a fresh span
contextMap
.get(transactionId.meta.id)
.map(spanBuilder.asChildOf)
.getOrElse(spanBuilder.ignoreActiveSpan())
.startActive(true)
.span()
case head :: _ =>
//Create a child span of current head
spanBuilder.asChildOf(head).startActive(true).span()
}
//add active span to list
spanMap.put(transactionId.meta.id, active :: spanList)
}

/**
* Finish a Trace associated with given transactionId.
*
* @param transactionId
*/
override def finishSpan(transactionId: TransactionId): Unit = {
clear(transactionId)
}

/**
* Register error
*
* @param transactionId
*/
override def error(transactionId: TransactionId): Unit = {
clear(transactionId)
}

/**
* Get the current TraceContext which can be used for downstream services
*
* @param transactionId
* @return
*/
override def getTraceContext(transactionId: TransactionId): Option[Map[String, String]] = {
spanMap
.get(transactionId.meta.id)
.flatMap(_.headOption)
.map { span =>
val map = mutable.Map.empty[String, String]
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapInjectAdapter(map.asJava))
map.toMap
}
}

/**
* Get the current TraceContext which can be used for downstream services
*
* @param transactionId
* @return
*/
override def setTraceContext(transactionId: TransactionId, context: Option[Map[String, String]]) = {
context.foreach { scalaMap =>
val ctx: SpanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(scalaMap.asJava))
contextMap.put(transactionId.meta.id, ctx)
}
}

private def clear(transactionId: TransactionId): Unit = {
spanMap.get(transactionId.meta.id).foreach {
case head :: Nil =>
head.finish()
spanMap.remove(transactionId.meta.id)
contextMap.remove(transactionId.meta.id)
case head :: tail =>
head.finish()
spanMap.put(transactionId.meta.id, tail)
case Nil =>
}
}

private def configureCache[T, R](): collection.concurrent.Map[T, R] =
Caffeine
.newBuilder()
.ticker(ticker)
.expireAfterAccess(tracingConfig.cacheExpiry.toSeconds, TimeUnit.SECONDS)
.build()
.asMap()
.asScala
.asInstanceOf[collection.concurrent.Map[T, R]]
}

trait WhiskTracer {
def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit = {}
def finishSpan(transactionId: TransactionId): Unit = {}
def error(transactionId: TransactionId): Unit = {}
def getTraceContext(transactionId: TransactionId): Option[Map[String, String]] = None
def setTraceContext(transactionId: TransactionId, context: Option[Map[String, String]]): Unit = {}
}

object WhiskTracerProvider {
val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing)

val tracer: WhiskTracer = createTracer(tracingConfig)

private def createTracer(tracingConfig: TracingConfig): WhiskTracer = {

tracingConfig.zipkin match {
case Some(zipkinConfig) => {
if (!GlobalTracer.isRegistered) {
val sender: Sender = OkHttpSender.create(zipkinConfig.generateUrl)
val spanReporter = AsyncReporter.create(sender)
val braveTracing = Tracing
.newBuilder()
.localServiceName(tracingConfig.component)
.spanReporter(spanReporter)
.sampler(Sampler.create(zipkinConfig.sampleRate.toFloat))
.build()

//register with OpenTracing
GlobalTracer.register(BraveTracer.create(braveTracing))

sys.addShutdownHook({ spanReporter.close() })
}
}
case None =>
}

if (GlobalTracer.isRegistered)
new OpenTracer(GlobalTracer.get(), tracingConfig)
else
NoopTracer
}
}

private object NoopTracer extends WhiskTracer
case class TracingConfig(component: String, cacheExpiry: Duration, zipkin: Option[ZipkinConfig] = None)
case class ZipkinConfig(url: String, sampleRate: String) {
def generateUrl = s"$url/api/v2/spans"
}
object SystemTicker extends Ticker {
override def read() = {
System.nanoTime()
}
}
3 changes: 3 additions & 0 deletions common/scala/src/main/scala/whisk/core/WhiskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ object ConfigKeys {
val dockerContainerFactory = s"${docker}.container-factory"
val runc = "whisk.runc"
val runcTimeouts = s"$runc.timeouts"

val tracing = "whisk.tracing"

val containerFactory = "whisk.container-factory"
val containerArgs = s"$containerFactory.container-args"
val containerPool = "whisk.container-pool"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ case class ActivationMessage(override val transid: TransactionId,
rootControllerIndex: ControllerInstanceId,
blocking: Boolean,
content: Option[JsObject],
cause: Option[ActivationId] = None)
cause: Option[ActivationId] = None,
traceContext: Option[Map[String, String]] = None)
extends Message {

override def serialize = ActivationMessage.serdes.write(this).compactPrint
Expand All @@ -67,7 +68,7 @@ object ActivationMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))

private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
implicit val serdes = jsonFormat9(ActivationMessage.apply)
implicit val serdes = jsonFormat10(ActivationMessage.apply)
}

/**
Expand Down
10 changes: 10 additions & 0 deletions core/controller/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,13 @@ ssl-config.enabledCipherSuites = [
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
]

whisk{
# tracing configuration
tracing {
component = "Controller"
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import spray.json._
import whisk.common.{Logging, LoggingMarkers, TransactionId}
import whisk.common.tracing.WhiskTracerProvider
import whisk.core.connector.ActivationMessage
import whisk.core.controller.WhiskServices
import whisk.core.database.NoDocumentException
Expand Down Expand Up @@ -146,25 +147,29 @@ protected[actions] trait PrimitiveActions {

// merge package parameters with action (action parameters supersede), then merge in payload
val args = action.parameters merge payload
val activationId = activationIdFactory.make()

val startActivation = transid.started(
this,
waitForResponse
.map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
.getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
logLevel = InfoLevel)
val startLoadbalancer =
transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${activationId}")

val message = ActivationMessage(
transid,
FullyQualifiedEntityName(action.namespace, action.name, Some(action.version)),
action.rev,
user,
activationIdFactory.make(), // activation id created here
activationId, // activation id created here
activeAckTopicIndex,
waitForResponse.isDefined,
args,
cause = cause)
cause = cause,
WhiskTracerProvider.tracer.getTraceContext(transid))

val startActivation = transid.started(
this,
waitForResponse
.map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
.getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
logLevel = InfoLevel)
val startLoadbalancer =
transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${message.activationId}")
val postedFuture = loadBalancer.publish(action, message)

postedFuture.flatMap { activeAckResponse =>
Expand Down
5 changes: 5 additions & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ whisk {
pause-grace = 50 milliseconds
}
}

# tracing configuration
tracing {
component = "Invoker"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
import spray.json._
import whisk.common.tracing.WhiskTracerProvider
import whisk.common._
import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.core.connector._
Expand Down Expand Up @@ -192,6 +193,9 @@ class InvokerReactive(

implicit val transid: TransactionId = msg.transid

//set trace context to continue tracing
WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)

if (!namespaceBlacklist.isBlacklisted(msg.user)) {
val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel)
val namespace = msg.action.path
Expand Down
Loading

0 comments on commit 63496d5

Please sign in to comment.