diff --git a/cdap-app-fabric/pom.xml b/cdap-app-fabric/pom.xml
index fe84f6d274ce..b43fee7793c3 100644
--- a/cdap-app-fabric/pom.xml
+++ b/cdap-app-fabric/pom.xml
@@ -273,7 +273,6 @@
io.cdap.cdap
cdap-event-reader-spi
${project.version}
- compile
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java
index 5addea741b20..596641633c0a 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java
@@ -118,14 +118,17 @@
import io.cdap.cdap.internal.capability.CapabilityModule;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandler;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
-import io.cdap.cdap.internal.events.EventReaderHandlerManager;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
+import io.cdap.cdap.internal.events.EventReaderHandler;
+import io.cdap.cdap.internal.events.EventReaderHandlerManager;
+import io.cdap.cdap.internal.events.EventReaderProvider;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
import io.cdap.cdap.internal.events.EventWriterProvider;
import io.cdap.cdap.internal.events.MetricsProvider;
import io.cdap.cdap.internal.events.ProgramStatusEventPublisher;
import io.cdap.cdap.internal.events.SparkProgramStatusMetricsProvider;
+import io.cdap.cdap.internal.events.StartProgramEventHandler;
import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory;
import io.cdap.cdap.internal.profile.ProfileService;
import io.cdap.cdap.internal.provision.ProvisionerModule;
@@ -148,6 +151,7 @@
import io.cdap.cdap.security.impersonation.UnsupportedUGIProvider;
import io.cdap.cdap.security.store.SecureStoreHandler;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
+import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.http.HttpHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -399,6 +403,11 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
+ bind(new TypeLiteral>() {})
+ .to(StartProgramEventHandler.StartProgramEventReaderExtensionProvider.class);
+ Multibinder eventHandlersBinder =
+ Multibinder.newSetBinder(binder(), EventReaderHandler.class);
+ eventHandlersBinder.addBinding().to(StartProgramEventHandler.class);
bind(EventReaderHandlerManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventHandler.java
new file mode 100644
index 000000000000..3bd9d4c2cb8b
--- /dev/null
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventHandler.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.internal.events;
+
+import com.google.inject.Inject;
+import io.cdap.cdap.common.conf.CConfiguration;
+import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.common.lang.ClassPathResources;
+import io.cdap.cdap.common.lang.FilterClassLoader;
+import io.cdap.cdap.extension.AbstractExtensionLoader;
+import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
+import io.cdap.cdap.proto.ProgramType;
+import io.cdap.cdap.proto.id.ProgramReference;
+import io.cdap.cdap.spi.events.EventReader;
+import io.cdap.cdap.spi.events.EventResult;
+import io.cdap.cdap.spi.events.StartProgramEvent;
+import io.cdap.cdap.spi.events.StartProgramEventDetails;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.twill.api.RunId;
+import org.apache.twill.common.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Periodically checks event reader for StartProgramEvents. If any events available, publish them to
+ * TMS.
+ */
+public class StartProgramEventHandler extends EventReaderHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventHandler.class);
+ private Collection> readers;
+ private final CConfiguration cConf;
+ private final EventReaderProvider extensionProvider;
+ private final ProgramLifecycleService lifecycleService;
+ private ScheduledExecutorService executor;
+
+ private ThreadPoolExecutor threadPoolExecutor;
+
+ /**
+ * Create instance that handles StartProgramEvents.
+ *
+ * @param cConf CDAP configuration
+ * @param extensionProvider eventReaderProvider for StartProgramEvent Readers
+ * @param lifecycleService to publish start programs to TMS
+ */
+ @Inject
+ StartProgramEventHandler(CConfiguration cConf, EventReaderProvider extensionProvider,
+ ProgramLifecycleService lifecycleService) {
+ this.cConf = cConf;
+ this.extensionProvider = extensionProvider;
+ this.lifecycleService = lifecycleService;
+ threadPoolExecutor = null;
+ }
+
+ @Override
+ public boolean initialize() {
+ readers = extensionProvider.loadEventReaders().values();
+ if (readers.isEmpty()) {
+ return false;
+ }
+ threadPoolExecutor = new ThreadPoolExecutor(readers.size(), readers.size(), 60,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+ for (EventReader reader : readers) {
+ reader.initialize(
+ new DefaultEventReaderContext(Constants.Event.START_EVENT_PREFIX,
+ reader.getClass().getSimpleName(), cConf));
+ }
+ return true;
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedDelaySchedule(0,
+ cConf.getInt(Constants.Event.START_PROGRAM_EVENT_READER_POLL_DELAY), TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void runOneIteration() throws Exception {
+ if (threadPoolExecutor != null) {
+ for (EventReader reader : readers) {
+ threadPoolExecutor.execute(() -> {
+ EventResult result = reader.pull(1);
+ result.consumeEvents(this::startProgram);
+ });
+ }
+ }
+ }
+
+ /**
+ * Attempt to publish program to TMS.
+ *
+ * @param event Event containing program info
+ * @throws RuntimeException if starting program fails
+ */
+ private void startProgram(StartProgramEvent event) {
+ StartProgramEventDetails eventDetails = event.getEventDetails();
+ try {
+ ProgramType programType = ProgramType.valueOfCategoryName(eventDetails.getProgramType());
+ ProgramReference programReference = new ProgramReference(eventDetails.getNamespaceId(),
+ eventDetails.getAppId(), programType,
+ eventDetails.getProgramId());
+ LOG.debug("Starting pipeline {}, with args: {}", eventDetails.getAppId(), eventDetails.getArgs());
+ RunId runId = lifecycleService.run(programReference, eventDetails.getArgs(), true);
+ LOG.info("Started pipeline, RunId: {}", runId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected final ScheduledExecutorService executor() {
+ executor =
+ Executors.newSingleThreadScheduledExecutor(
+ Threads.createDaemonThreadFactory("start-program-event-handler-scheduler"));
+ return executor;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOG.info("StartProgramEventHandler started.");
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ LOG.info("StartProgramEventHandler successfully shut down.");
+ }
+
+ /**
+ * Implementation of {@link StartProgramEventReaderExtensionProvider}
+ * which provides Event reader extension classes.
+ * extending from {@link AbstractExtensionLoader}
+ */
+ public static class StartProgramEventReaderExtensionProvider
+ extends AbstractExtensionLoader>
+ implements EventReaderProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventReaderExtensionProvider.class);
+ private static final Set ALLOWED_RESOURCES = createAllowedResources();
+ private static final Set ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
+ private final Collection enabledEventReaders;
+
+ /**
+ * Retrieve enabled event readers from cConf.
+ *
+ * @param cConf configuration
+ */
+ @Inject
+ public StartProgramEventReaderExtensionProvider(CConfiguration cConf) {
+ super(cConf.get(Constants.Event.START_EVENTS_READER_EXTENSIONS_DIR) != null
+ ? cConf.get(Constants.Event.START_EVENTS_READER_EXTENSIONS_DIR) : "");
+ this.enabledEventReaders = cConf.getStringCollection(
+ Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST);
+ if (this.enabledEventReaders == null || this.enabledEventReaders.isEmpty()) {
+ LOG.debug("No event readers enabled.");
+ return;
+ }
+ LOG.debug("Enabled event readers are {} .", enabledEventReaders);
+ }
+
+ private static Set createAllowedResources() {
+ try {
+ return ClassPathResources.getResourcesWithDependencies(EventReader.class.getClassLoader(),
+ EventReader.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to trace dependencies for reader extension. "
+ + "Usage of events reader might fail.", e);
+ }
+ }
+
+ public Map> loadEventReaders() {
+ return getAll();
+ }
+
+ @Override
+ protected Set getSupportedTypesForProvider(EventReader eventReader) {
+ return enabledEventReaders.stream().collect(Collectors.toSet());
+ }
+
+ @Override
+ protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
+ // Only allow spi classes.
+ return new FilterClassLoader.Filter() {
+ @Override
+ public boolean acceptResource(String resource) {
+ return ALLOWED_RESOURCES.contains(resource);
+ }
+
+ @Override
+ public boolean acceptPackage(String packageName) {
+ return ALLOWED_PACKAGES.contains(packageName);
+ }
+ };
+ }
+ }
+}
diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventHandlerTest.java
new file mode 100644
index 000000000000..f6a054c574f7
--- /dev/null
+++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventHandlerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.internal.events;
+
+import static org.mockito.Matchers.any;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import io.cdap.cdap.common.app.RunIds;
+import io.cdap.cdap.common.conf.CConfiguration;
+import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
+import io.cdap.cdap.internal.app.services.http.AppFabricTestBase;
+import io.cdap.cdap.internal.events.dummy.DummyEventReader;
+import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider;
+import io.cdap.cdap.proto.id.ProgramReference;
+import io.cdap.cdap.spi.events.EventReader;
+import io.cdap.cdap.spi.events.StartProgramEvent;
+import io.cdap.cdap.spi.events.StartProgramEventDetails;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for the {@link StartProgramEventHandler}.
+ */
+public class StartProgramEventHandlerTest extends AppFabricTestBase {
+ private StartProgramEventHandler eventHandler;
+ private static final Logger LOGGER = LoggerFactory.getLogger(StartProgramEventHandlerTest.class);
+ private ProgramLifecycleService lifecycleService = Mockito.mock(ProgramLifecycleService.class);
+ private DummyEventReader eventReader = Mockito.mock(DummyEventReader.class);
+ @Mock
+ CConfiguration cConf = Mockito.mock(CConfiguration.class);
+ private final Injector injector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(ProgramLifecycleService.class).toInstance(lifecycleService);
+ bind(CConfiguration.class).toInstance(cConf);
+ bind(EventReader.class).toInstance(eventReader);
+ bind(new TypeLiteral>() {})
+ .toInstance(new DummyEventReaderExtensionProvider(eventReader));
+ bind(StartProgramEventHandler.class).in(Scopes.SINGLETON);
+ }
+ });
+
+ @Before
+ public void setup() {
+ eventHandler = injector.getInstance(StartProgramEventHandler.class);
+ cConf.setInt(Constants.AppFabric.PROGRAM_STATUS_RETRY_STRATEGY_PREFIX
+ + Constants.Retry.MAX_TIME_SECS, 6000);
+ cConf.setInt(Constants.Event.EVENTS_READER_ACK_BUFFER, 0);
+ }
+
+
+ @Test
+ public void testInitialize() {
+ try {
+ eventHandler.initialize();
+ } catch (Exception ex) {
+ LOGGER.error("Error during Event Handler initialization.", ex);
+ Assert.fail("Error while initializing Event Handler");
+ }
+ }
+
+ @Test
+ public void testMessageWorkflow() throws Exception {
+ assert (lifecycleService != null);
+ Mockito.doReturn(RunIds.generate()).when(lifecycleService).run((ProgramReference) any(), any(),
+ Mockito.anyBoolean());
+ Mockito.when(eventReader.getMessages()).thenReturn(mockedEvents());
+
+ Mockito.doCallRealMethod().when(eventReader).pull(1);
+ Mockito.doCallRealMethod().when(eventReader).initialize(any());
+
+ try {
+ eventHandler.initialize();
+ } catch (Exception ex) {
+ LOGGER.error("Error during Event Handler initialization.", ex);
+ Assert.fail("Error while initializing Event Handler");
+ }
+ try {
+ eventHandler.runOneIteration();
+ } catch (Exception e) {
+ LOGGER.error("Error during message process.", e);
+ Assert.fail("Error during message process");
+ }
+ Mockito.verify(lifecycleService).run((ProgramReference) any(), any(), Mockito.anyBoolean());
+ }
+
+ private Collection mockedEvents() {
+ ArrayList eventList = new ArrayList<>();
+ eventList.add(new StartProgramEvent(1, "1",
+ new StartProgramEventDetails("app1",
+ "namespace1", "id1", "workflows", null)));
+ return eventList;
+ }
+}
diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProviderTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProviderTest.java
new file mode 100644
index 000000000000..0bb0386ab8d0
--- /dev/null
+++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProviderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.internal.events;
+
+import io.cdap.cdap.common.conf.CConfiguration;
+import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.internal.events.dummy.DummyEventReader;
+import io.cdap.cdap.spi.events.EventReader;
+import java.util.Set;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Tests for {@link StartProgramEventHandler.StartProgramEventReaderExtensionProvider}
+ */
+public class StartProgramEventReaderExtensionProviderTest {
+
+ @Test
+ public void testEnabledEventReaderFilter() {
+ EventReader mockReader = new DummyEventReader();
+ String mockReaderName = mockReader.getClass().getName();
+ CConfiguration cConf = CConfiguration.create();
+
+ StartProgramEventHandler.StartProgramEventReaderExtensionProvider readerExtensionProvider1
+ = new StartProgramEventHandler.StartProgramEventReaderExtensionProvider(cConf);
+ Set test1 = readerExtensionProvider1.getSupportedTypesForProvider(mockReader);
+ Assert.assertTrue(test1.isEmpty());
+
+ //Test with reader ID enabled
+ cConf.setStrings(Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST, mockReaderName);
+ StartProgramEventHandler.StartProgramEventReaderExtensionProvider readerExtensionProvider2
+ = new StartProgramEventHandler.StartProgramEventReaderExtensionProvider(cConf);
+ Set test2 = readerExtensionProvider2.getSupportedTypesForProvider(mockReader);
+ Assert.assertTrue(test2.contains(mockReaderName));
+ }
+}
diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java
index e71dbb791231..019ce077bf61 100644
--- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java
+++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java
@@ -2218,9 +2218,13 @@ public static final class Event {
public static final String EVENTS_WRITER_PREFIX = "event.writer";
public static final String EVENTS_WRITER_EXTENSIONS_DIR = "events.writer.extensions.dir";
public static final String EVENTS_WRITER_EXTENSIONS_ENABLED_LIST = "events.writer.extensions.enabled.list";
-
public static final String EVENTS_READER_PREFIX = "event.reader";
public static final String EVENTS_READER_ACK_BUFFER = "events.reader.buffer";
+ public static final String START_PROGRAM_EVENT_READER_POLL_DELAY = "event.startprogram.poll.delay.seconds";
+ public static final String START_EVENTS_READER_EXTENSIONS_DIR = "events.reader.extensions.start.dir";
+ public static final String START_EVENTS_READER_EXTENSIONS_ENABLED_LIST =
+ "events.reader.extensions.start.enabled.list";
+ public static final String START_EVENT_PREFIX = "start";
}
/**