diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 70e6e3d5d24b..14cf96e27f68 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -120,6 +120,7 @@ import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal; import io.cdap.cdap.internal.events.EventPublishManager; import io.cdap.cdap.internal.events.EventPublisher; +import io.cdap.cdap.internal.events.EventReaderProvider; import io.cdap.cdap.internal.events.EventSubscriber; import io.cdap.cdap.internal.events.EventSubscriberManager; import io.cdap.cdap.internal.events.EventWriterExtensionProvider; @@ -127,6 +128,8 @@ import io.cdap.cdap.internal.events.MetricsProvider; import io.cdap.cdap.internal.events.ProgramStatusEventPublisher; import io.cdap.cdap.internal.events.SparkProgramStatusMetricsProvider; +import io.cdap.cdap.internal.events.StartProgramEventReaderExtensionProvider; +import io.cdap.cdap.internal.events.StartProgramEventSubscriber; import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory; import io.cdap.cdap.internal.profile.ProfileService; import io.cdap.cdap.internal.provision.ProvisionerModule; @@ -149,6 +152,7 @@ import io.cdap.cdap.security.impersonation.UnsupportedUGIProvider; import io.cdap.cdap.security.store.SecureStoreHandler; import io.cdap.cdap.sourcecontrol.guice.SourceControlModule; +import io.cdap.cdap.spi.events.StartProgramEvent; import io.cdap.http.HttpHandler; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -400,8 +404,11 @@ protected void configure() { Multibinder.newSetBinder(binder(), EventPublisher.class); eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class); bind(EventPublishManager.class).in(Scopes.SINGLETON); + bind(new TypeLiteral>() {}) + .to(StartProgramEventReaderExtensionProvider.class); Multibinder eventSubscribersBinder = Multibinder.newSetBinder(binder(), EventSubscriber.class); + eventSubscribersBinder.addBinding().to(StartProgramEventSubscriber.class); bind(EventSubscriberManager.class).in(Scopes.SINGLETON); bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class); bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProvider.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProvider.java new file mode 100644 index 000000000000..778522ea2f1c --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProvider.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.events; + +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.lang.ClassPathResources; +import io.cdap.cdap.common.lang.FilterClassLoader; +import io.cdap.cdap.extension.AbstractExtensionLoader; +import io.cdap.cdap.spi.events.EventReader; +import io.cdap.cdap.spi.events.StartProgramEvent; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link StartProgramEventReaderExtensionProvider} + * which provides Event reader extension classes. + * extending from {@link AbstractExtensionLoader} + */ +public class StartProgramEventReaderExtensionProvider + extends AbstractExtensionLoader> + implements EventReaderProvider { + + private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventReaderExtensionProvider.class); + private static final Set ALLOWED_RESOURCES = createAllowedResources(); + private static final Set ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES); + private final Collection enabledEventReaders; + + /** + * Retrieve enabled event readers from cConf. + * + * @param cConf configuration + */ + @Inject + public StartProgramEventReaderExtensionProvider(CConfiguration cConf) { + super(cConf.get(Constants.Event.START_EVENTS_READER_EXTENSIONS_DIR, "")); + this.enabledEventReaders = cConf.getStringCollection( + Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST); + if (this.enabledEventReaders == null || this.enabledEventReaders.isEmpty()) { + return; + } + LOG.debug("Enabled event readers in config are {} .", enabledEventReaders); + } + + private static Set createAllowedResources() { + try { + return ClassPathResources.getResourcesWithDependencies(EventReader.class.getClassLoader(), + EventReader.class); + } catch (IOException e) { + throw new RuntimeException("Failed to trace dependencies for reader extension. " + + "Usage of events reader might fail.", e); + } + } + + public Map> loadEventReaders() { + return getAll(); + } + + @Override + protected Set getSupportedTypesForProvider(EventReader eventReader) { + if (enabledEventReaders == null || + !enabledEventReaders.contains(eventReader.getId())) { + LOG.debug("{} is not present in the allowed list of event readers.", + eventReader.getId()); + return Collections.emptySet(); + } + + return Collections.singleton(eventReader.getId()); + } + + @Override + protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() { + // Only allow spi classes. + return new FilterClassLoader.Filter() { + @Override + public boolean acceptResource(String resource) { + return ALLOWED_RESOURCES.contains(resource); + } + + @Override + public boolean acceptPackage(String packageName) { + return ALLOWED_PACKAGES.contains(packageName); + } + }; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java new file mode 100644 index 000000000000..9e066e3533d4 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java @@ -0,0 +1,171 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.events; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.internal.app.services.ProgramLifecycleService; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.ProgramReference; +import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.spi.events.EventReader; +import io.cdap.cdap.spi.events.EventResult; +import io.cdap.cdap.spi.events.StartProgramEvent; +import io.cdap.cdap.spi.events.StartProgramEventDetails; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.twill.api.RunId; +import org.apache.twill.common.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Periodically checks event reader for StartProgramEvents. If any events available, trigger + * startProgram method on each event. + */ +public class StartProgramEventSubscriber extends EventSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriber.class); + + private final CConfiguration cConf; + private final EventReaderProvider extensionProvider; + private final ProgramLifecycleService lifecycleService; + private ScheduledExecutorService executor; + private Collection> readers; + private ExecutorService threadPoolExecutor; + + /** + * Create instance that handles StartProgramEvents. + * + * @param cConf CDAP configuration + * @param extensionProvider eventReaderProvider for StartProgramEvent Readers + * @param lifecycleService to publish start programs to TMS + */ + @Inject + StartProgramEventSubscriber(CConfiguration cConf, EventReaderProvider extensionProvider, + ProgramLifecycleService lifecycleService) { + this.cConf = cConf; + this.extensionProvider = extensionProvider; + this.lifecycleService = lifecycleService; + } + + @Override + public void initialize() { + readers = new HashSet<>(extensionProvider.loadEventReaders().values()); + Iterator> eventReaderIterator + = readers.iterator(); + while (eventReaderIterator.hasNext()) { + EventReader reader = eventReaderIterator.next(); + try { + reader.initialize( + new DefaultEventReaderContext(String.format("%s.%s.", + Constants.Event.START_EVENT_PREFIX, reader.getId()), cConf)); + LOG.info("Successfully initialized Event Reader: {}", reader.getId()); + } catch (Exception e) { + LOG.error("Failed to initialize reader: {}", reader.getId(), e); + eventReaderIterator.remove(); + } + } + if (!readers.isEmpty()) { + threadPoolExecutor = new ThreadPoolExecutor(readers.size(), readers.size(), 60, + TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + } + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedDelaySchedule(0, + cConf.getInt(Constants.Event.START_PROGRAM_EVENT_READER_POLL_DELAY), TimeUnit.SECONDS); + } + + @Override + protected void runOneIteration() throws Exception { + if (threadPoolExecutor != null) { + for (EventReader reader : readers) { + threadPoolExecutor.execute(() -> { + processEvents(reader); + }); + } + } + } + + @VisibleForTesting + void processEvents(EventReader reader) { + EventResult result = reader.pull(cConf.getInt( + Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE)); + result.consumeEvents(this::startProgram); + } + + /** + * Attempt to publish program to TMS. + * + * @param event Event containing program info + * @throws RuntimeException if starting program fails + */ + private void startProgram(StartProgramEvent event) { + StartProgramEventDetails eventDetails = event.getEventDetails(); + try { + ProgramType programType = ProgramType.valueOfCategoryName(eventDetails.getProgramType()); + ProgramReference programReference = new ProgramReference(eventDetails.getNamespaceId(), + eventDetails.getAppId(), programType, + eventDetails.getProgramId()); + LOG.debug("Starting pipeline {}, with args: {}, programReference: {}", + eventDetails.getAppId(), eventDetails.getArgs(), programReference); + RunId runId = lifecycleService.run(programReference, eventDetails.getArgs(), true); + ProgramRunId programRunId = new ProgramRunId( + programReference.getNamespace(), programReference.getApplication(), programType, + programReference.getProgram(), runId.getId()); + LOG.info("Started pipeline, ProgramRunId: {}", programRunId.toMetadataEntity()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected final ScheduledExecutorService executor() { + executor = + Executors.newSingleThreadScheduledExecutor( + Threads.createDaemonThreadFactory("start-program-event-subscriber-scheduler")); + return executor; + } + + @Override + protected void startUp() throws Exception { + LOG.info("StartProgramEventSubscriber started."); + } + + @Override + protected void shutDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + if (threadPoolExecutor != null) { + threadPoolExecutor.shutdownNow(); + } + LOG.info("StartProgramEventSubscriber successfully shut down."); + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReader.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReader.java new file mode 100644 index 000000000000..f6ff6c24b1fd --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReader.java @@ -0,0 +1,81 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.events.dummy; + +import io.cdap.cdap.spi.events.Event; +import io.cdap.cdap.spi.events.EventReader; +import io.cdap.cdap.spi.events.EventReaderContext; +import io.cdap.cdap.spi.events.EventResult; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dummy implementation of {@link EventReader} mainly for test proposals. + */ +public class DummyEventReader implements EventReader { + + private static final Logger logger = LoggerFactory.getLogger(DummyEventReader.class); + private final Collection messages; + + public DummyEventReader(Collection messages) { + this.messages = messages; + } + + @Override + public String getId() { + return "dummy-event-reader"; + } + + @Override + public void initialize(EventReaderContext eventReaderContext) { + logger.info("Initializing DummyEventReader..."); + } + + @Override + public EventResult pull(int maxMessages) { + ArrayList sentMessages = new ArrayList<>(messages); + return new DummyEventResult(sentMessages.subList(0, maxMessages)); + } + + @Override + public void close() throws Exception { + logger.info("Closing dummy reader"); + } + + class DummyEventResult implements EventResult { + final Collection events; + + DummyEventResult(Collection events) { + this.events = events; + } + + @Override + public void consumeEvents(Consumer consumer) { + for (T evt : events) { + try { + consumer.accept(evt); + } catch (Exception e) { + logger.error("Error on consuming event {}", evt.getVersion(), e); + } + } + } + + @Override + public void close() throws Exception { + } + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReaderExtensionProvider.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReaderExtensionProvider.java new file mode 100644 index 000000000000..686290cfbaa9 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/dummy/DummyEventReaderExtensionProvider.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.events.dummy; + +import com.google.inject.Inject; +import io.cdap.cdap.internal.events.EventReaderProvider; +import io.cdap.cdap.spi.events.Event; +import io.cdap.cdap.spi.events.EventReader; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Dummy implementation for {@link EventReaderProvider} for test proposals. + */ +public class DummyEventReaderExtensionProvider implements EventReaderProvider { + + private final DummyEventReader eventReader; + + @Inject + public DummyEventReaderExtensionProvider(DummyEventReader eventReader) { + this.eventReader = eventReader; + } + + @Override + public Map loadEventReaders() { + Map map = new HashMap<>(); + map.put(this.eventReader.getClass().getName(), this.eventReader); + return Collections.unmodifiableMap(map); + } +} \ No newline at end of file diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProviderTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProviderTest.java new file mode 100644 index 000000000000..a260c32be20b --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventReaderExtensionProviderTest.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.events; + +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.internal.events.dummy.DummyEventReader; +import io.cdap.cdap.spi.events.EventReader; +import java.util.ArrayList; +import java.util.Set; +import org.junit.Assert; +import org.junit.Test; + + +/** + * Tests for {@link StartProgramEventReaderExtensionProvider} + */ +public class StartProgramEventReaderExtensionProviderTest { + + @Test + public void testEnabledEventReaderFilter() { + EventReader mockReader = new DummyEventReader(new ArrayList()); + CConfiguration cConf = CConfiguration.create(); + + StartProgramEventReaderExtensionProvider readerExtensionProvider1 + = new StartProgramEventReaderExtensionProvider(cConf); + Set test1 = readerExtensionProvider1.getSupportedTypesForProvider(mockReader); + Assert.assertTrue(test1.isEmpty()); + + //Test with reader class name enabled + cConf.setStrings(Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST, mockReader.getId()); + StartProgramEventReaderExtensionProvider readerExtensionProvider2 + = new StartProgramEventReaderExtensionProvider(cConf); + Set test2 = readerExtensionProvider2.getSupportedTypesForProvider(mockReader); + Assert.assertTrue(test2.contains(mockReader.getId())); + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java new file mode 100644 index 000000000000..b3070946eb37 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java @@ -0,0 +1,99 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.events; + +import static org.mockito.Matchers.any; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.internal.app.services.ProgramLifecycleService; +import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; +import io.cdap.cdap.internal.events.dummy.DummyEventReader; +import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider; +import io.cdap.cdap.proto.id.ProgramReference; +import io.cdap.cdap.spi.events.StartProgramEvent; +import io.cdap.cdap.spi.events.StartProgramEventDetails; +import java.util.ArrayList; +import java.util.Collection; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for the {@link StartProgramEventSubscriber}. + */ +public class StartProgramEventSubscriberTest extends AppFabricTestBase { + private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriberTest.class); + private ProgramLifecycleService lifecycleService; + private CConfiguration cConf; + private DummyEventReader eventReader; + private Injector injector; + private StartProgramEventSubscriber eventHandler; + + @Before + public void setup() { + lifecycleService = Mockito.mock(ProgramLifecycleService.class); + cConf = Mockito.mock(CConfiguration.class); + eventReader = new DummyEventReader<>(mockedEvents()); + injector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(ProgramLifecycleService.class).toInstance(lifecycleService); + bind(CConfiguration.class).toInstance(cConf); + bind(new TypeLiteral>() { + }) + .toInstance(new DummyEventReaderExtensionProvider(eventReader)); + bind(StartProgramEventSubscriber.class).in(Scopes.SINGLETON); + } + }); + eventHandler = injector.getInstance(StartProgramEventSubscriber.class); + Mockito.doReturn(1).when(cConf).getInt(Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE); + } + + @Test + public void testInitialize() { + eventHandler.initialize(); + } + + @Test + public void testMessageWorkflow() throws Exception { + assert (lifecycleService != null); + Mockito.doReturn(RunIds.generate()).when(lifecycleService).run((ProgramReference) any(), any(), + Mockito.anyBoolean()); + + eventHandler.initialize(); + eventHandler.processEvents(eventReader); + Mockito.verify(lifecycleService).run((ProgramReference) any(), any(), Mockito.anyBoolean()); + } + + private Collection mockedEvents() { + ArrayList eventList = new ArrayList<>(); + eventList.add(new StartProgramEvent(1, "1", + new StartProgramEventDetails("app1", + "namespace1", "id1", "workflows", null))); + return eventList; + } +} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 7df591afb0c9..4f20a52f3fa6 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2221,10 +2221,15 @@ public static final class Event { public static final String PROJECT_NAME = "event.project.name"; public static final String EVENTS_WRITER_PREFIX = "event.writer"; - public static final String EVENTS_WRITER_EXTENSIONS_DIR = "events.writer.extensions.dir"; - public static final String EVENTS_WRITER_EXTENSIONS_ENABLED_LIST = "events.writer.extensions.enabled.list"; + + public static final String START_PROGRAM_EVENT_READER_POLL_DELAY = "event.reader.start.poll.delay.seconds"; + public static final String START_PROGRAM_EVENT_FETCH_SIZE = "event.reader.start.fetch.size"; + public static final String START_EVENTS_READER_EXTENSIONS_DIR = "events.reader.extensions.start.dir"; + public static final String START_EVENTS_READER_EXTENSIONS_ENABLED_LIST = + "events.reader.extensions.start.enabled.list"; + public static final String START_EVENT_PREFIX = "event.reader.start"; } /** diff --git a/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java b/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java index 2f3db1284570..77b929554ffa 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java +++ b/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java @@ -211,6 +211,7 @@ protected EXTENSION prepareSystemExtension(EXTENSION extension) { private void putEntriesIfAbsent(Map result, Map entries) { for (Map.Entry entry : entries.entrySet()) { + // TODO: Handle duplicate keys (currently silently ignores duplicates) if (!result.containsKey(entry.getKey())) { result.put(entry.getKey(), entry.getValue()); } diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 59d8cd1cb8f9..58bc91bedc2d 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -5087,6 +5087,23 @@ + + event.reader.start.poll.delay.seconds + 10 + + Delay between each poll of start program event subscriber + + + + + event.reader.start.fetch.size + 1 + + Number of start program events to pull each iteration + + + + artifact.fetcher.bind.port