From 2a4cec6d32d9978bd62a6b8088cffa426807abbc Mon Sep 17 00:00:00 2001 From: Peter Chacko Date: Tue, 18 Jul 2023 17:44:04 +0000 Subject: [PATCH] Added very basic flow control to subscriber --- .../app/services/RunRecordMonitorService.java | 34 +++++++++--- .../events/StartProgramEventSubscriber.java | 53 ++++++++++++++++--- .../StartProgramEventSubscriberTest.java | 43 ++++++++++++++- .../io/cdap/cdap/common/conf/Constants.java | 2 + .../src/main/resources/cdap-default.xml | 9 ++++ 5 files changed, 126 insertions(+), 15 deletions(-) diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java index 37d67d3b1bb2..6fd3a23bfc70 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java @@ -61,6 +61,13 @@ public class RunRecordMonitorService extends AbstractScheduledService { private final int maxConcurrentRuns; private ScheduledExecutorService executor; + /** + * Tracks the program runs. + * + * @param cConf configuration + * @param runtimeService service to get info on programs + * @param metricsCollectionService collect metrics + */ @Inject public RunRecordMonitorService( CConfiguration cConf, @@ -120,9 +127,7 @@ public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception throw new Exception("None time-based UUIDs are not supported"); } - int launchingCount; - - launchingCount = addRequest(programRunId); + int launchingCount = addRequest(programRunId); int runningCount = getProgramsRunningCount(); LOG.info( @@ -132,6 +137,18 @@ public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception return new Counter(launchingCount, runningCount); } + /** + * Get imprecise (due to data races) total number of launching and running programs. + * + * @return total number of launching and running program runs. + */ + public Counter getCount() { + int launchingCount = launchingQueue.size(); + int runningCount = getProgramsRunningCount(); + + return new Counter(launchingCount, runningCount); + } + /** * Add a new in-flight launch request. * @@ -240,19 +257,22 @@ private boolean isRunning(ProgramRunStatus status) { return false; } - class Counter { + /** + * Counts the concurrent program runs. + */ + public class Counter { /** * Total number of launch requests that have been accepted but still missing in metadata store + * * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of - * run records with {@link ProgramRunStatus#STARTING} status + * run records with {@link ProgramRunStatus#STARTING} status. */ private final int launchingCount; /** - * Total number of run records with {@link ProgramRunStatus#RUNNING status + Total number of run + * Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run * records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with - * {@link ProgramRunStatus#RESUMING} status + * {@link ProgramRunStatus#RESUMING} status. */ private final int runningCount; diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java index 57591d32276e..e88502095182 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java @@ -21,6 +21,7 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; +import io.cdap.cdap.internal.app.services.RunRecordMonitorService; import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.id.ProgramReference; import io.cdap.cdap.proto.id.ProgramRunId; @@ -53,23 +54,30 @@ public class StartProgramEventSubscriber extends EventSubscriber { private final CConfiguration cConf; private final EventReaderProvider extensionProvider; private final ProgramLifecycleService lifecycleService; + private final RunRecordMonitorService runRecordMonitorService; private ScheduledExecutorService executor; private Collection> readers; private ExecutorService threadPoolExecutor; + private int maxConcurrentRuns; /** * Create instance that handles StartProgramEvents. * - * @param cConf CDAP configuration - * @param extensionProvider eventReaderProvider for StartProgramEvent Readers - * @param lifecycleService to publish start programs to TMS + * @param cConf CDAP configuration + * @param extensionProvider eventReaderProvider for StartProgramEvent Readers + * @param lifecycleService to publish start programs to TMS + * @param runRecordMonitorService basic flow-control */ @Inject - StartProgramEventSubscriber(CConfiguration cConf, EventReaderProvider extensionProvider, - ProgramLifecycleService lifecycleService) { + StartProgramEventSubscriber(CConfiguration cConf, + EventReaderProvider extensionProvider, + ProgramLifecycleService lifecycleService, + RunRecordMonitorService runRecordMonitorService) { this.cConf = cConf; this.extensionProvider = extensionProvider; this.lifecycleService = lifecycleService; + this.runRecordMonitorService = runRecordMonitorService; + maxConcurrentRuns = -1; } @Override @@ -93,6 +101,7 @@ public void initialize() { threadPoolExecutor = new ThreadPoolExecutor(readers.size(), readers.size(), 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); } + maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS); } @Override @@ -123,12 +132,44 @@ protected void runOneIteration() throws Exception { if (threadPoolExecutor != null) { for (EventReader reader : readers) { threadPoolExecutor.execute(() -> { - processEvents(reader); + if (runRecordMonitorService.isRunning()) { + // Only attempt to process event if there is no max or the current count is less than max + if (hasNominalCapacity()) { + processEvents(reader); + } + } else { + LOG.warn("RunRecordMonitorService not yet running, currently in state: {}." + + " Status will be checked again in next attempt.", runRecordMonitorService.state()); + } }); } } } + /** + * Check if service has capacity (imprecise due to data races). + * + * @return true if there is capacity to pull new events + */ + @VisibleForTesting + boolean hasNominalCapacity() { + RunRecordMonitorService.Counter counter = runRecordMonitorService.getCount(); + // no limit + if (maxConcurrentRuns <= 0) { + return true; + } + // allowed configurable range + double minimumFreeCapacity = cConf.getDouble(Constants.Event.MINIMUM_FREE_CAPACITY_BEFORE_PULL); + double usedCapacity = (double) (counter.getLaunchingCount() + counter.getRunningCount() + + cConf.getInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE)) / maxConcurrentRuns; + boolean allowPull = (1 - usedCapacity) >= minimumFreeCapacity; + if (allowPull) { + return true; + } + LOG.trace("Not enough free capacity. {} < {} (minimum)", 1 - usedCapacity, minimumFreeCapacity); + return false; + } + @VisibleForTesting void processEvents(EventReader reader) { EventResult result = reader.pull(cConf.getInt( diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java index b3070946eb37..83f59b7e16f6 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java @@ -27,6 +27,7 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; +import io.cdap.cdap.internal.app.services.RunRecordMonitorService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; import io.cdap.cdap.internal.events.dummy.DummyEventReader; import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider; @@ -48,6 +49,8 @@ public class StartProgramEventSubscriberTest extends AppFabricTestBase { private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriberTest.class); private ProgramLifecycleService lifecycleService; + private RunRecordMonitorService runRecordMonitorService; + private RunRecordMonitorService.Counter mockCounter; private CConfiguration cConf; private DummyEventReader eventReader; private Injector injector; @@ -56,12 +59,16 @@ public class StartProgramEventSubscriberTest extends AppFabricTestBase { @Before public void setup() { lifecycleService = Mockito.mock(ProgramLifecycleService.class); - cConf = Mockito.mock(CConfiguration.class); + runRecordMonitorService = Mockito.mock(RunRecordMonitorService.class); + mockCounter = Mockito.mock(RunRecordMonitorService.Counter.class); + Mockito.doReturn(mockCounter).when(runRecordMonitorService).getCount(); + cConf = CConfiguration.create(); eventReader = new DummyEventReader<>(mockedEvents()); injector = Guice.createInjector(new AbstractModule() { @Override protected void configure() { bind(ProgramLifecycleService.class).toInstance(lifecycleService); + bind(RunRecordMonitorService.class).toInstance(runRecordMonitorService); bind(CConfiguration.class).toInstance(cConf); bind(new TypeLiteral>() { }) @@ -70,7 +77,7 @@ protected void configure() { } }); eventHandler = injector.getInstance(StartProgramEventSubscriber.class); - Mockito.doReturn(1).when(cConf).getInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE); + cConf.setInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE, 1); } @Test @@ -89,6 +96,38 @@ public void testMessageWorkflow() throws Exception { Mockito.verify(lifecycleService).run((ProgramReference) any(), any(), Mockito.anyBoolean()); } + @Test + public void testHasNominalCapacity_lackOfCapacity() { + Mockito.doReturn(0).when(mockCounter).getRunningCount(); + Mockito.doReturn(0).when(mockCounter).getLaunchingCount(); + + // cConf.setDouble(Constants.Event.MINIMUM_FREE_CAPACITY_BEFORE_PULL, 0.1); + cConf.setInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE, 1); + cConf.setInt(Constants.AppFabric.MAX_CONCURRENT_RUNS, 1); + + eventHandler.initialize(); + // 0.0 free capacity < 0.1 default minimum. + assert !eventHandler.hasNominalCapacity(); + Mockito.verify(mockCounter).getLaunchingCount(); + Mockito.verify(mockCounter).getRunningCount(); + } + + @Test + public void testHasNominalCapacity_sufficientCapacity() { + Mockito.doReturn(0).when(mockCounter).getRunningCount(); + Mockito.doReturn(0).when(mockCounter).getLaunchingCount(); + + //cConf.setDouble(Constants.Event.MINIMUM_FREE_CAPACITY_BEFORE_PULL, 0.1); + cConf.setInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE, 1); + cConf.setInt(Constants.AppFabric.MAX_CONCURRENT_RUNS, 2); + + eventHandler.initialize(); + // 0.5 free capacity > 0.1 default minimum. + assert eventHandler.hasNominalCapacity(); + Mockito.verify(mockCounter).getLaunchingCount(); + Mockito.verify(mockCounter).getRunningCount(); + } + private Collection mockedEvents() { ArrayList eventList = new ArrayList<>(); eventList.add(new StartProgramEvent(1, "1", diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 4f20a52f3fa6..028d8dea492a 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2230,6 +2230,8 @@ public static final class Event { public static final String START_EVENTS_READER_EXTENSIONS_ENABLED_LIST = "events.reader.extensions.start.enabled.list"; public static final String START_EVENT_PREFIX = "event.reader.start"; + public static final String MINIMUM_FREE_CAPACITY_BEFORE_PULL = + "event.readers.capacity.before.pull"; } /** diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 3dc9401f7061..83f1bcb9301f 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -5087,6 +5087,15 @@ + + event.readers.capacity.before.pull + 0.1 + + Fraction of capacity that must be free before event readers pull and launch more program runs. + This fraction includes the fetch size of each pull. + + + event.reader.start.poll.delay.seconds 10