Skip to content

Commit

Permalink
Merge pull request #15241 from cdapio/feature/event-reader-flow-control
Browse files Browse the repository at this point in the history
Added very basic flow control to subscriber
  • Loading branch information
codeNinjaDev authored Aug 2, 2023
2 parents 3a07df7 + 2a4cec6 commit 9d3ff77
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public class RunRecordMonitorService extends AbstractScheduledService {
private final int maxConcurrentRuns;
private ScheduledExecutorService executor;

/**
* Tracks the program runs.
*
* @param cConf configuration
* @param runtimeService service to get info on programs
* @param metricsCollectionService collect metrics
*/
@Inject
public RunRecordMonitorService(
CConfiguration cConf,
Expand Down Expand Up @@ -120,9 +127,7 @@ public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception
throw new Exception("None time-based UUIDs are not supported");
}

int launchingCount;

launchingCount = addRequest(programRunId);
int launchingCount = addRequest(programRunId);
int runningCount = getProgramsRunningCount();

LOG.info(
Expand All @@ -132,6 +137,18 @@ public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception
return new Counter(launchingCount, runningCount);
}

/**
* Get imprecise (due to data races) total number of launching and running programs.
*
* @return total number of launching and running program runs.
*/
public Counter getCount() {
int launchingCount = launchingQueue.size();
int runningCount = getProgramsRunningCount();

return new Counter(launchingCount, runningCount);
}

/**
* Add a new in-flight launch request.
*
Expand Down Expand Up @@ -240,19 +257,22 @@ private boolean isRunning(ProgramRunStatus status) {
return false;
}

class Counter {
/**
* Counts the concurrent program runs.
*/
public class Counter {

/**
* Total number of launch requests that have been accepted but still missing in metadata store +
* * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of
* run records with {@link ProgramRunStatus#STARTING} status
* run records with {@link ProgramRunStatus#STARTING} status.
*/
private final int launchingCount;

/**
* Total number of run records with {@link ProgramRunStatus#RUNNING status + Total number of run
* Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run
* records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with
* {@link ProgramRunStatus#RESUMING} status
* {@link ProgramRunStatus#RESUMING} status.
*/
private final int runningCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramReference;
import io.cdap.cdap.proto.id.ProgramRunId;
Expand Down Expand Up @@ -53,23 +54,30 @@ public class StartProgramEventSubscriber extends EventSubscriber {
private final CConfiguration cConf;
private final EventReaderProvider<StartProgramEvent> extensionProvider;
private final ProgramLifecycleService lifecycleService;
private final RunRecordMonitorService runRecordMonitorService;
private ScheduledExecutorService executor;
private Collection<EventReader<StartProgramEvent>> readers;
private ExecutorService threadPoolExecutor;
private int maxConcurrentRuns;

/**
* Create instance that handles StartProgramEvents.
*
* @param cConf CDAP configuration
* @param extensionProvider eventReaderProvider for StartProgramEvent Readers
* @param lifecycleService to publish start programs to TMS
* @param cConf CDAP configuration
* @param extensionProvider eventReaderProvider for StartProgramEvent Readers
* @param lifecycleService to publish start programs to TMS
* @param runRecordMonitorService basic flow-control
*/
@Inject
StartProgramEventSubscriber(CConfiguration cConf, EventReaderProvider<StartProgramEvent> extensionProvider,
ProgramLifecycleService lifecycleService) {
StartProgramEventSubscriber(CConfiguration cConf,
EventReaderProvider<StartProgramEvent> extensionProvider,
ProgramLifecycleService lifecycleService,
RunRecordMonitorService runRecordMonitorService) {
this.cConf = cConf;
this.extensionProvider = extensionProvider;
this.lifecycleService = lifecycleService;
this.runRecordMonitorService = runRecordMonitorService;
maxConcurrentRuns = -1;
}

@Override
Expand All @@ -93,6 +101,7 @@ public void initialize() {
threadPoolExecutor = new ThreadPoolExecutor(readers.size(), readers.size(), 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS);
}

@Override
Expand Down Expand Up @@ -123,12 +132,44 @@ protected void runOneIteration() throws Exception {
if (threadPoolExecutor != null) {
for (EventReader<StartProgramEvent> reader : readers) {
threadPoolExecutor.execute(() -> {
processEvents(reader);
if (runRecordMonitorService.isRunning()) {
// Only attempt to process event if there is no max or the current count is less than max
if (hasNominalCapacity()) {
processEvents(reader);
}
} else {
LOG.warn("RunRecordMonitorService not yet running, currently in state: {}."
+ " Status will be checked again in next attempt.", runRecordMonitorService.state());
}
});
}
}
}

/**
* Check if service has capacity (imprecise due to data races).
*
* @return true if there is capacity to pull new events
*/
@VisibleForTesting
boolean hasNominalCapacity() {
RunRecordMonitorService.Counter counter = runRecordMonitorService.getCount();
// no limit
if (maxConcurrentRuns <= 0) {
return true;
}
// allowed configurable range
double minimumFreeCapacity = cConf.getDouble(Constants.Event.MINIMUM_FREE_CAPACITY_BEFORE_PULL);
double usedCapacity = (double) (counter.getLaunchingCount() + counter.getRunningCount()
+ cConf.getInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE)) / maxConcurrentRuns;
boolean allowPull = (1 - usedCapacity) >= minimumFreeCapacity;
if (allowPull) {
return true;
}
LOG.trace("Not enough free capacity. {} < {} (minimum)", 1 - usedCapacity, minimumFreeCapacity);
return false;
}

@VisibleForTesting
void processEvents(EventReader<StartProgramEvent> reader) {
EventResult<StartProgramEvent> result = reader.pull(cConf.getInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.http.AppFabricTestBase;
import io.cdap.cdap.internal.events.dummy.DummyEventReader;
import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider;
Expand All @@ -48,6 +49,8 @@
public class StartProgramEventSubscriberTest extends AppFabricTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriberTest.class);
private ProgramLifecycleService lifecycleService;
private RunRecordMonitorService runRecordMonitorService;
private RunRecordMonitorService.Counter mockCounter;
private CConfiguration cConf;
private DummyEventReader<StartProgramEvent> eventReader;
private Injector injector;
Expand All @@ -56,12 +59,16 @@ public class StartProgramEventSubscriberTest extends AppFabricTestBase {
@Before
public void setup() {
lifecycleService = Mockito.mock(ProgramLifecycleService.class);
cConf = Mockito.mock(CConfiguration.class);
runRecordMonitorService = Mockito.mock(RunRecordMonitorService.class);
mockCounter = Mockito.mock(RunRecordMonitorService.Counter.class);
Mockito.doReturn(mockCounter).when(runRecordMonitorService).getCount();
cConf = CConfiguration.create();
eventReader = new DummyEventReader<>(mockedEvents());
injector = Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
bind(ProgramLifecycleService.class).toInstance(lifecycleService);
bind(RunRecordMonitorService.class).toInstance(runRecordMonitorService);
bind(CConfiguration.class).toInstance(cConf);
bind(new TypeLiteral<EventReaderProvider<StartProgramEvent>>() {
})
Expand All @@ -70,7 +77,7 @@ protected void configure() {
}
});
eventHandler = injector.getInstance(StartProgramEventSubscriber.class);
Mockito.doReturn(1).when(cConf).getInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE);
cConf.setInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE, 1);
}

@Test
Expand All @@ -89,6 +96,38 @@ public void testMessageWorkflow() throws Exception {
Mockito.verify(lifecycleService).run((ProgramReference) any(), any(), Mockito.anyBoolean());
}

@Test
public void testHasNominalCapacity_lackOfCapacity() {
Mockito.doReturn(0).when(mockCounter).getRunningCount();
Mockito.doReturn(0).when(mockCounter).getLaunchingCount();

// cConf.setDouble(Constants.Event.MINIMUM_FREE_CAPACITY_BEFORE_PULL, 0.1);
cConf.setInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE, 1);
cConf.setInt(Constants.AppFabric.MAX_CONCURRENT_RUNS, 1);

eventHandler.initialize();
// 0.0 free capacity < 0.1 default minimum.
assert !eventHandler.hasNominalCapacity();
Mockito.verify(mockCounter).getLaunchingCount();
Mockito.verify(mockCounter).getRunningCount();
}

@Test
public void testHasNominalCapacity_sufficientCapacity() {
Mockito.doReturn(0).when(mockCounter).getRunningCount();
Mockito.doReturn(0).when(mockCounter).getLaunchingCount();

//cConf.setDouble(Constants.Event.MINIMUM_FREE_CAPACITY_BEFORE_PULL, 0.1);
cConf.setInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE, 1);
cConf.setInt(Constants.AppFabric.MAX_CONCURRENT_RUNS, 2);

eventHandler.initialize();
// 0.5 free capacity > 0.1 default minimum.
assert eventHandler.hasNominalCapacity();
Mockito.verify(mockCounter).getLaunchingCount();
Mockito.verify(mockCounter).getRunningCount();
}

private Collection<StartProgramEvent> mockedEvents() {
ArrayList<StartProgramEvent> eventList = new ArrayList<>();
eventList.add(new StartProgramEvent(1, "1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2214,6 +2214,8 @@ public static final class Event {
public static final String START_EVENTS_READER_EXTENSIONS_ENABLED_LIST =
"events.reader.extensions.start.enabled.list";
public static final String START_EVENT_PREFIX = "event.reader.start";
public static final String MINIMUM_FREE_CAPACITY_BEFORE_PULL =
"event.readers.capacity.before.pull";
}

/**
Expand Down
9 changes: 9 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5087,6 +5087,15 @@
</description>
</property>

<property>
<name>event.readers.capacity.before.pull</name>
<value>0.1</value>
<description>
Fraction of capacity that must be free before event readers pull and launch more program runs.
This fraction includes the fetch size of each pull.
</description>
</property>

<property>
<name>event.reader.start.poll.delay.seconds</name>
<value>10</value>
Expand Down

0 comments on commit 9d3ff77

Please sign in to comment.