Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC =act Make use of externalSubmit to yield. #666

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ object ActorSystemSpec {
override protected[pekko] def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = {
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint)
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean = {
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint, needYield)
doneIt.switchOn {
TestKit.awaitCond(mbox.actor.actor != null, 1.second)
mbox.actor.actor match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }
import pekko.util.JavaVersion

object ForkJoinPoolStarvationSpec {
val config = ConfigFactory.parseString("""
Expand Down Expand Up @@ -63,8 +62,8 @@ class ForkJoinPoolStarvationSpec extends PekkoSpec(ForkJoinPoolStarvationSpec.co

"not starve tasks arriving from external dispatchers under high internal traffic" in {
// TODO issue #31117: starvation with JDK 17 FJP
if (JavaVersion.majorVersion >= 17)
pending
// if (JavaVersion.majorVersion >= 17)
// pending
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it locally.


// Two busy actors that will occupy the threads of the dispatcher
// Since they submit to the local task queue via fork, they can starve external submissions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ private[pekko] trait LoadMetrics { self: Executor =>
def atFullThrottle(): Boolean
}

/**
* INTERNAL API
*/
private[pekko] trait LazyExecuteSupport {
self: Executor =>
def lazyExecute(runnable: Runnable): Unit = self.execute(runnable)
}

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -160,7 +168,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
*/
final def attach(actor: ActorCell): Unit = {
register(actor)
registerForExecution(actor.mailbox, false, true)
registerForExecution(actor.mailbox, false, true, false)
}

/**
Expand Down Expand Up @@ -288,7 +296,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
protected[pekko] def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
registerForExecution(mbox, false, false)
registerForExecution(mbox, false, false, false)
}

/**
Expand All @@ -313,7 +321,8 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
protected[pekko] def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean

// TODO check whether this should not actually be a property of the mailbox
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[pekko] class BalancingDispatcher(

override protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
if (!registerForExecution(receiver.mailbox, false, false)) teamWork()
if (!registerForExecution(receiver.mailbox, false, false, false)) teamWork()
}

protected def teamWork(): Unit =
Expand All @@ -118,7 +118,7 @@ private[pekko] class BalancingDispatcher(
case lm: LoadMetrics => !lm.atFullThrottle()
case _ => true
})
&& !registerForExecution(i.next.mailbox, false, false))
&& !registerForExecution(i.next.mailbox, false, false, true))
scheduleOne(i)

scheduleOne()
Expand Down
23 changes: 15 additions & 8 deletions actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ package org.apache.pekko.dispatch

import java.util.concurrent.{ ExecutorService, RejectedExecutionException }
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import scala.annotation.nowarn

import org.apache.pekko
import pekko.actor.ActorCell
import pekko.dispatch.sysmsg.SystemMessage
Expand Down Expand Up @@ -71,7 +68,7 @@ class Dispatcher(
protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
registerForExecution(mbox, true, false, false)
}

/**
Expand All @@ -80,7 +77,7 @@ class Dispatcher(
protected[pekko] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
val mbox = receiver.mailbox
mbox.systemEnqueue(receiver.self, invocation)
registerForExecution(mbox, false, true)
registerForExecution(mbox, false, true, false)
}

/**
Expand Down Expand Up @@ -130,16 +127,17 @@ class Dispatcher(
protected[pekko] override def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = {
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { // This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService.execute(mbox)
submit(mbox, needYield)
true
} catch {
case _: RejectedExecutionException =>
try {
executorService.execute(mbox)
submit(mbox, needYield)
true
} catch { // Retry once
case e: RejectedExecutionException =>
Expand All @@ -152,6 +150,15 @@ class Dispatcher(
} else false
}

@inline
private def submit(mbox: Mailbox, needYield: Boolean): Unit = {
if (needYield && executorService.executor.isInstanceOf[LazyExecuteSupport]) {
executorService.executor.asInstanceOf[LazyExecuteSupport].lazyExecute(mbox)
} else {
executorService.execute(mbox)
}
}

override val toString: String = Logging.simpleName(this) + "[" + id + "]"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
package org.apache.pekko.dispatch

import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory }

import com.typesafe.config.Config

import java.lang.invoke.{ MethodHandle, MethodHandles, MethodType }

object ForkJoinExecutorConfigurator {

/**
Expand All @@ -28,7 +29,7 @@ object ForkJoinExecutorConfigurator {
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
asyncMode: Boolean)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode)
with LoadMetrics {
with LoadMetrics with LazyExecuteSupport {
def this(
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
Expand All @@ -42,6 +43,33 @@ object ForkJoinExecutorConfigurator {
else
throw new NullPointerException("Runnable was null")

private val lazyExecuteHandle: MethodHandle = {
import org.apache.pekko.util.JavaVersion._
if (11 <= majorVersion && majorVersion <= 18) {
val method = classOf[ForkJoinPool].getDeclaredMethod("externalPush", classOf[ForkJoinTask[_]])
method.setAccessible(true)
MethodHandles.lookup().unreflect(method)
} else if (majorVersion >= 20) {
val mt = MethodType.methodType(classOf[ForkJoinTask[_]], classOf[ForkJoinTask[_]])
MethodHandles.publicLookup()
.findVirtual(classOf[ForkJoinPool], "externalSubmit", mt)
} else null
}

override def lazyExecute(r: Runnable): Unit = {
import org.apache.pekko.util.JavaVersion._
if (majorVersion < 11 || majorVersion == 19 || lazyExecuteHandle == null) {
super.execute(r)
} else {
val task: ForkJoinTask[_] = if (r ne null)
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[_]] else new PekkoForkJoinTask(r)
else
throw new NullPointerException("Runnable was null")
lazyExecuteHandle.invoke(this, task)
}

}

def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue)
}
} finally {
setAsIdle() // Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
dispatcher.registerForExecution(this, false, false, true)
}
}

Expand Down
2 changes: 2 additions & 0 deletions project/JdkOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ object JdkOptions extends AutoPlugin {
if (isJdk17orHigher) {
// for aeron
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" ::
// for fork join pool
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" ::
// for LevelDB
"--add-opens=java.base/java.nio=ALL-UNNAMED" :: Nil
} else Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) exte
protected[pekko] override def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = false
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean = false

protected[pekko] override def shutdownTimeout = 1 second

Expand Down
Loading