diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 488c94f49de..02e94531b96 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -64,6 +64,14 @@ dependencies { compile 'io.kamon:kamon-statsd_2.11:0.6.7' //for mesos compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.7' + + //tracing support + compile 'io.opentracing:opentracing-api:0.31.0' + compile 'io.opentracing:opentracing-util:0.31.0' + compile 'io.opentracing.brave:brave-opentracing:0.31.0' + compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1' + compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1' + scoverage gradle.scoverage.deps } diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 3ceaab8b77f..bbac5f4b646 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -188,4 +188,18 @@ whisk { constraint-delimiter = " "//used to parse constraint strings teardown-on-exit = true //set to true to disable the mesos framework on system exit; set for false for HA deployments } + + # tracing configuration + tracing { + cache-expiry = 30 seconds #how long to keep spans in cache. Set to appropriate value to trace long running requests + #Zipkin configuration. Uncomment following to enable zipkin based tracing + #zipkin { + # url = "http://localhost:9411" //url to connecto to zipkin server + //sample-rate to decide a request is sampled or not. + //sample-rate 0.5 eqauls to sampling 50% of the requests + //sample-rate of 1 means 100% sampling. + //sample-rate of 0 means no sampling + # sample-rate = "0.01" // sample 1% of requests by default + #} + } } diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala index cb16a42fd80..6441629f44e 100644 --- a/common/scala/src/main/scala/whisk/common/TransactionId.scala +++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala @@ -24,6 +24,8 @@ import akka.http.scaladsl.model.headers.RawHeader import pureconfig.loadConfigOrThrow import spray.json._ import whisk.core.ConfigKeys +import pureconfig._ +import whisk.common.tracing.WhiskTracerProvider import scala.util.Try @@ -82,6 +84,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal { } MetricEmitter.emitCounterMetric(marker) + + //tracing support + WhiskTracerProvider.tracer.startSpan(marker, this) StartMarker(Instant.now, marker) } @@ -116,6 +121,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal { } MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd) + + //tracing support + WhiskTracerProvider.tracer.finishSpan(this) } /** @@ -144,6 +152,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal { MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd) MetricEmitter.emitCounterMetric(endMarker) + + //tracing support + WhiskTracerProvider.tracer.error(this) } /** diff --git a/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala new file mode 100644 index 00000000000..09c105a5679 --- /dev/null +++ b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.common.tracing + +import java.util.concurrent.TimeUnit + +import brave.Tracing +import brave.opentracing.BraveTracer +import brave.sampler.Sampler +import com.github.benmanes.caffeine.cache.{Caffeine, Ticker} +import io.opentracing.propagation.{Format, TextMapExtractAdapter, TextMapInjectAdapter} +import io.opentracing.util.GlobalTracer +import io.opentracing.{Span, SpanContext, Tracer} +import pureconfig._ +import whisk.common.{LogMarkerToken, TransactionId} +import whisk.core.ConfigKeys +import zipkin2.reporter.okhttp3.OkHttpSender +import zipkin2.reporter.{AsyncReporter, Sender} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.duration.Duration + +/** + * OpenTracing based implementation for tracing + */ +class OpenTracer(val tracer: Tracer, tracingConfig: TracingConfig, ticker: Ticker = SystemTicker) extends WhiskTracer { + val spanMap = configureCache[String, List[Span]]() + val contextMap = configureCache[String, SpanContext]() + + /** + * Start a Trace for given service. + * + * @param transactionId transactionId to which this Trace belongs. + * @return TracedRequest which provides details about current service being traced. + */ + override def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit = { + //initialize list for this transactionId + val spanList = spanMap.getOrElse(transactionId.meta.id, Nil) + + val spanBuilder = tracer + .buildSpan(logMarker.action) + .withTag("transactionId", transactionId.meta.id) + + val active = spanList match { + case Nil => + //Check if any active context then resume from that else create a fresh span + contextMap + .get(transactionId.meta.id) + .map(spanBuilder.asChildOf) + .getOrElse(spanBuilder.ignoreActiveSpan()) + .startActive(true) + .span() + case head :: _ => + //Create a child span of current head + spanBuilder.asChildOf(head).startActive(true).span() + } + //add active span to list + spanMap.put(transactionId.meta.id, active :: spanList) + } + + /** + * Finish a Trace associated with given transactionId. + * + * @param transactionId + */ + override def finishSpan(transactionId: TransactionId): Unit = { + clear(transactionId) + } + + /** + * Register error + * + * @param transactionId + */ + override def error(transactionId: TransactionId): Unit = { + clear(transactionId) + } + + /** + * Get the current TraceContext which can be used for downstream services + * + * @param transactionId + * @return + */ + override def getTraceContext(transactionId: TransactionId): Option[Map[String, String]] = { + spanMap + .get(transactionId.meta.id) + .flatMap(_.headOption) + .map { span => + val map = mutable.Map.empty[String, String] + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapInjectAdapter(map.asJava)) + map.toMap + } + } + + /** + * Get the current TraceContext which can be used for downstream services + * + * @param transactionId + * @return + */ + override def setTraceContext(transactionId: TransactionId, context: Option[Map[String, String]]) = { + context.foreach { scalaMap => + val ctx: SpanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(scalaMap.asJava)) + contextMap.put(transactionId.meta.id, ctx) + } + } + + private def clear(transactionId: TransactionId): Unit = { + spanMap.get(transactionId.meta.id).foreach { + case head :: Nil => + head.finish() + spanMap.remove(transactionId.meta.id) + contextMap.remove(transactionId.meta.id) + case head :: tail => + head.finish() + spanMap.put(transactionId.meta.id, tail) + case Nil => + } + } + + private def configureCache[T, R](): collection.concurrent.Map[T, R] = + Caffeine + .newBuilder() + .ticker(ticker) + .expireAfterAccess(tracingConfig.cacheExpiry.toSeconds, TimeUnit.SECONDS) + .build() + .asMap() + .asScala + .asInstanceOf[collection.concurrent.Map[T, R]] +} + +trait WhiskTracer { + def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit = {} + def finishSpan(transactionId: TransactionId): Unit = {} + def error(transactionId: TransactionId): Unit = {} + def getTraceContext(transactionId: TransactionId): Option[Map[String, String]] = None + def setTraceContext(transactionId: TransactionId, context: Option[Map[String, String]]): Unit = {} +} + +object WhiskTracerProvider { + val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing) + + val tracer: WhiskTracer = createTracer(tracingConfig) + + private def createTracer(tracingConfig: TracingConfig): WhiskTracer = { + + tracingConfig.zipkin match { + case Some(zipkinConfig) => { + if (!GlobalTracer.isRegistered) { + val sender: Sender = OkHttpSender.create(zipkinConfig.generateUrl) + val spanReporter = AsyncReporter.create(sender) + val braveTracing = Tracing + .newBuilder() + .localServiceName(tracingConfig.component) + .spanReporter(spanReporter) + .sampler(Sampler.create(zipkinConfig.sampleRate.toFloat)) + .build() + + //register with OpenTracing + GlobalTracer.register(BraveTracer.create(braveTracing)) + + sys.addShutdownHook({ spanReporter.close() }) + } + } + case None => + } + + if (GlobalTracer.isRegistered) + new OpenTracer(GlobalTracer.get(), tracingConfig) + else + NoopTracer + } +} + +private object NoopTracer extends WhiskTracer +case class TracingConfig(component: String, cacheExpiry: Duration, zipkin: Option[ZipkinConfig] = None) +case class ZipkinConfig(url: String, sampleRate: String) { + def generateUrl = s"$url/api/v2/spans" +} +object SystemTicker extends Ticker { + override def read() = { + System.nanoTime() + } +} diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index fb20fda65be..be9f7b0b0a9 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -222,6 +222,9 @@ object ConfigKeys { val dockerContainerFactory = s"${docker}.container-factory" val runc = "whisk.runc" val runcTimeouts = s"$runc.timeouts" + + val tracing = "whisk.tracing" + val containerFactory = "whisk.container-factory" val containerArgs = s"$containerFactory.container-args" val containerPool = "whisk.container-pool" diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index eb960b3ca56..d540a54abde 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -49,7 +49,8 @@ case class ActivationMessage(override val transid: TransactionId, rootControllerIndex: ControllerInstanceId, blocking: Boolean, content: Option[JsObject], - cause: Option[ActivationId] = None) + cause: Option[ActivationId] = None, + traceContext: Option[Map[String, String]] = None) extends Message { override def serialize = ActivationMessage.serdes.write(this).compactPrint @@ -67,7 +68,7 @@ object ActivationMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(serdes.read(msg.parseJson)) private implicit val fqnSerdes = FullyQualifiedEntityName.serdes - implicit val serdes = jsonFormat9(ActivationMessage.apply) + implicit val serdes = jsonFormat10(ActivationMessage.apply) } /** diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf index a2886368e3f..77ce527c6d1 100644 --- a/core/controller/src/main/resources/application.conf +++ b/core/controller/src/main/resources/application.conf @@ -78,3 +78,13 @@ ssl-config.enabledCipherSuites = [ "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", ] + +whisk{ + # tracing configuration + tracing { + component = "Controller" + } +} + + + diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index 79da2d52369..49d84c27821 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -23,6 +23,7 @@ import akka.actor.ActorSystem import akka.event.Logging.InfoLevel import spray.json._ import whisk.common.{Logging, LoggingMarkers, TransactionId} +import whisk.common.tracing.WhiskTracerProvider import whisk.core.connector.ActivationMessage import whisk.core.controller.WhiskServices import whisk.core.database.NoDocumentException @@ -146,25 +147,29 @@ protected[actions] trait PrimitiveActions { // merge package parameters with action (action parameters supersede), then merge in payload val args = action.parameters merge payload + val activationId = activationIdFactory.make() + + val startActivation = transid.started( + this, + waitForResponse + .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING) + .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION), + logLevel = InfoLevel) + val startLoadbalancer = + transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${activationId}") + val message = ActivationMessage( transid, FullyQualifiedEntityName(action.namespace, action.name, Some(action.version)), action.rev, user, - activationIdFactory.make(), // activation id created here + activationId, // activation id created here activeAckTopicIndex, waitForResponse.isDefined, args, - cause = cause) + cause = cause, + WhiskTracerProvider.tracer.getTraceContext(transid)) - val startActivation = transid.started( - this, - waitForResponse - .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING) - .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION), - logLevel = InfoLevel) - val startLoadbalancer = - transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${message.activationId}") val postedFuture = loadBalancer.publish(action, message) postedFuture.flatMap { activeAckResponse => diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index ebd45e58387..00c33399aba 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -66,4 +66,9 @@ whisk { pause-grace = 50 milliseconds } } + + # tracing configuration + tracing { + component = "Invoker" + } } diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index ae4a1ae0816..87ca1e8018e 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -26,6 +26,7 @@ import akka.stream.ActorMaterializer import org.apache.kafka.common.errors.RecordTooLargeException import pureconfig._ import spray.json._ +import whisk.common.tracing.WhiskTracerProvider import whisk.common._ import whisk.core.{ConfigKeys, WhiskConfig} import whisk.core.connector._ @@ -192,6 +193,9 @@ class InvokerReactive( implicit val transid: TransactionId = msg.transid + //set trace context to continue tracing + WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) + if (!namespaceBlacklist.isBlacklisted(msg.user)) { val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) val namespace = msg.action.path diff --git a/tests/build.gradle b/tests/build.gradle index cf737edbf9f..919d27e4b8c 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -145,12 +145,15 @@ dependencies { compile 'com.typesafe.akka:akka-http-testkit_2.11:10.1.1' compile 'com.github.java-json-tools:json-schema-validator:2.2.8' compile "org.mockito:mockito-core:2.15.0" + compile 'io.opentracing:opentracing-mock:0.31.0' compile project(':common:scala') compile project(':core:controller') compile project(':core:invoker') compile project(':tools:admin') + + scoverage gradle.scoverage.deps } diff --git a/tests/src/test/scala/common/WskTracingTests.scala b/tests/src/test/scala/common/WskTracingTests.scala new file mode 100644 index 00000000000..516f8901881 --- /dev/null +++ b/tests/src/test/scala/common/WskTracingTests.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import io.opentracing.Span +import io.opentracing.mock.{MockSpan, MockTracer} +import com.github.benmanes.caffeine.cache.Ticker +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import pureconfig.loadConfigOrThrow +import whisk.common.{LoggingMarkers, TransactionId} +import whisk.common.tracing.{OpenTracer, TracingConfig} +import whisk.core.ConfigKeys + +import scala.ref.WeakReference +import org.scalatest.FlatSpec +import org.scalatest.Matchers + +@RunWith(classOf[JUnitRunner]) +class WskTracingTests extends FlatSpec with TestHelpers with Matchers { + + val tracer: MockTracer = new MockTracer() + val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing) + val ticker = new FakeTicker(System.nanoTime()) + val openTracer = new OpenTracer(tracer, tracingConfig, ticker) + + it should "create span and context and invalidate cache after expiry" in { + tracer.reset + + val transactionId: TransactionId = TransactionId.testing + var list: List[WeakReference[Span]] = List() + + openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId) + var ctx = openTracer.getTraceContext(transactionId) + openTracer.setTraceContext(transactionId, ctx) + ctx should be(defined) + + //advance ticker + ticker.time = System.nanoTime() + (tracingConfig.cacheExpiry.toNanos + 100) + ctx = openTracer.getTraceContext(transactionId) + ctx should not be (defined) + openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId) + openTracer.finishSpan(transactionId) + val finishedSpans = tracer.finishedSpans() + finishedSpans should have size 1 + //no parent for new span as cache expiry cleared spanMap and contextMap + finishedSpans.get(0).parentId() should be(0) + } + + it should "create a finished span" in { + tracer.reset + val transactionId: TransactionId = TransactionId.testing + openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId) + openTracer.finishSpan(transactionId) + val finishedSpans = tracer.finishedSpans() + finishedSpans should have size 1 + + } + + it should "create a child span" in { + tracer.reset + val transactionId: TransactionId = TransactionId.testing + openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId) + openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId) + openTracer.finishSpan(transactionId) + openTracer.finishSpan(transactionId) + val finishedSpans = tracer.finishedSpans() + finishedSpans should have size 2 + val parent: MockSpan = finishedSpans.get(1) + val child: MockSpan = finishedSpans.get(0) + child.parentId should be(parent.context().spanId) + + } + + it should "create a span with tag" in { + tracer.reset + val transactionId: TransactionId = TransactionId.testing + openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId) + openTracer.finishSpan(transactionId) + val finishedSpans = tracer.finishedSpans() + finishedSpans should have size 1 + val mockSpan: MockSpan = finishedSpans.get(0) + mockSpan.tags should not be null + mockSpan.tags should have size 1 + + } + + it should "create a valid trace context and use it" in { + tracer.reset + val transactionId: TransactionId = TransactionId.testing + openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId) + val context = openTracer.getTraceContext(transactionId) + openTracer.finishSpan(transactionId) + tracer.reset + //use context for new span + openTracer.setTraceContext(transactionId, context) + openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId) + openTracer.finishSpan(transactionId) + val finishedSpans = tracer.finishedSpans() + finishedSpans should have size 1 + val child: MockSpan = finishedSpans.get(0) + //This child span should have a parent as we have set trace context + child.parentId should be > 0L + } +} + +class FakeTicker(var time: Long) extends Ticker { + override def read() = { + time + } +}