Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix downstream kamon instrumentation #1489

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.typesafe.config.Config

final case class Envelope private (message: Any, sender: ActorRef) {

@noinline // not inlined to permit downstream bytecode instrumentation to attach context information to the Envelope
def copy(message: Any = message, sender: ActorRef = sender) = {
Envelope(message, sender)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ final case class ThreadPoolConfig(
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy)
extends ExecutorServiceFactoryProvider {
// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
// context information on the config
@noinline
def copy(
allowCorePoolTimeout: Boolean = allowCorePoolTimeout,
corePoolSize: Int = corePoolSize,
maxPoolSize: Int = maxPoolSize,
threadTimeout: Duration = threadTimeout,
queueFactory: ThreadPoolConfig.QueueFactory = queueFactory,
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy
): ThreadPoolConfig =
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent a little bit of time on this yesterday and I got it wrong - I thought that this would only work if it was declared with override because in theory this function overrides the default copy function that gets added to case classes.

I still think it is quite odd to instrument the copy function as a way to instrument pinned dispatchers. I raised kamon-io/Kamon#1366 about seeing if there is a better place to instrument this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this one left a slightly bad taste in my mouth and there probably is a better way. Ultimately I'd rather focus improvements on exposing deliberate instrumentation hooks from within pekko and changing the downstream to use them, because otherwise there's an inherent fragility... Any weaving approach can only work if there's a stable non-inlined API that's deliberately left for that purpose -- there's almost certainly places where we're currently only passing because of the whims of the scala compiler. However, such an approach will require considerably more time and thought. Since the Kamon library is probably one of the more widely used ways of instrumenting pekko, I think this is helpful as a stopgap that doesn't block people from adopting 1.1.x; but I completely understand your point of view here


class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = {
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ final class MessageBuffer private (private var _head: MessageBuffer.Node, privat

object MessageBuffer {
private final class Node(var next: Node, val message: Any, val ref: ActorRef) {
@noinline // not inlined to permit downstream bytecode instrumentation to apply context information on the Node to the message
def apply(f: (Any, ActorRef) => Unit): Unit = {
f(message, ref)
}
Expand Down
Loading