Skip to content

Commit

Permalink
StartProgram Event Handling
Browse files Browse the repository at this point in the history
  • Loading branch information
codeNinjaDev committed Jul 11, 2023
1 parent fb5c882 commit 5277104
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 3 deletions.
1 change: 0 additions & 1 deletion cdap-app-fabric/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-event-reader-spi</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@
import io.cdap.cdap.internal.capability.CapabilityModule;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandler;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventReaderHandlerManager;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventReaderHandler;
import io.cdap.cdap.internal.events.EventReaderHandlerManager;
import io.cdap.cdap.internal.events.EventReaderProvider;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
import io.cdap.cdap.internal.events.EventWriterProvider;
import io.cdap.cdap.internal.events.MetricsProvider;
import io.cdap.cdap.internal.events.ProgramStatusEventPublisher;
import io.cdap.cdap.internal.events.SparkProgramStatusMetricsProvider;
import io.cdap.cdap.internal.events.StartProgramEventHandler;
import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory;
import io.cdap.cdap.internal.profile.ProfileService;
import io.cdap.cdap.internal.provision.ProvisionerModule;
Expand All @@ -148,6 +151,7 @@
import io.cdap.cdap.security.impersonation.UnsupportedUGIProvider;
import io.cdap.cdap.security.store.SecureStoreHandler;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.http.HttpHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -399,6 +403,11 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
bind(new TypeLiteral<EventReaderProvider<StartProgramEvent>>() {})
.to(StartProgramEventHandler.StartProgramEventReaderExtensionProvider.class);
Multibinder<EventReaderHandler> eventHandlersBinder =
Multibinder.newSetBinder(binder(), EventReaderHandler.class);
eventHandlersBinder.addBinding().to(StartProgramEventHandler.class);
bind(EventReaderHandlerManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.lang.ClassPathResources;
import io.cdap.cdap.common.lang.FilterClassLoader;
import io.cdap.cdap.extension.AbstractExtensionLoader;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramReference;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.EventResult;
import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.cdap.spi.events.StartProgramEventDetails;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Periodically checks event reader for StartProgramEvents. If any events available, publish them to
* TMS.
*/
public class StartProgramEventHandler extends EventReaderHandler {

private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventHandler.class);
private Collection<EventReader<StartProgramEvent>> readers;
private final CConfiguration cConf;
private final EventReaderProvider<StartProgramEvent> extensionProvider;
private final ProgramLifecycleService lifecycleService;
private ScheduledExecutorService executor;

private ThreadPoolExecutor threadPoolExecutor;

/**
* Create instance that handles StartProgramEvents.
*
* @param cConf CDAP configuration
* @param extensionProvider eventReaderProvider for StartProgramEvent Readers
* @param lifecycleService to publish start programs to TMS
*/
@Inject
StartProgramEventHandler(CConfiguration cConf, EventReaderProvider<StartProgramEvent> extensionProvider,
ProgramLifecycleService lifecycleService) {
this.cConf = cConf;
this.extensionProvider = extensionProvider;
this.lifecycleService = lifecycleService;
threadPoolExecutor = null;
}

@Override
public boolean initialize() {
readers = extensionProvider.loadEventReaders().values();
if (readers.isEmpty()) {
return false;
}
threadPoolExecutor = new ThreadPoolExecutor(readers.size(), readers.size(), 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (EventReader reader : readers) {
reader.initialize(
new DefaultEventReaderContext(Constants.Event.START_EVENT_PREFIX,
reader.getClass().getSimpleName(), cConf));
}
return true;
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0,
cConf.getInt(Constants.Event.START_PROGRAM_EVENT_READER_POLL_DELAY), TimeUnit.SECONDS);
}

@Override
protected void runOneIteration() throws Exception {
if (threadPoolExecutor != null) {
for (EventReader<StartProgramEvent> reader : readers) {
threadPoolExecutor.execute(() -> {
EventResult<StartProgramEvent> result = reader.pull(1);
result.consumeEvents(this::startProgram);
});
}
}
}

/**
* Attempt to publish program to TMS.
*
* @param event Event containing program info
* @throws RuntimeException if starting program fails
*/
private void startProgram(StartProgramEvent event) {
StartProgramEventDetails eventDetails = event.getEventDetails();
try {
ProgramType programType = ProgramType.valueOfCategoryName(eventDetails.getProgramType());
ProgramReference programReference = new ProgramReference(eventDetails.getNamespaceId(),
eventDetails.getAppId(), programType,
eventDetails.getProgramId());
LOG.debug("Starting pipeline {}, with args: {}", eventDetails.getAppId(), eventDetails.getArgs());
RunId runId = lifecycleService.run(programReference, eventDetails.getArgs(), true);
LOG.info("Started pipeline, RunId: {}", runId);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected final ScheduledExecutorService executor() {
executor =
Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("start-program-event-handler-scheduler"));
return executor;
}

@Override
protected void startUp() throws Exception {
LOG.info("StartProgramEventHandler started.");
}

@Override
protected void shutDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
LOG.info("StartProgramEventHandler successfully shut down.");
}

/**
* Implementation of {@link StartProgramEventReaderExtensionProvider}
* which provides Event reader extension classes.
* extending from {@link AbstractExtensionLoader}
*/
public static class StartProgramEventReaderExtensionProvider
extends AbstractExtensionLoader<String, EventReader<StartProgramEvent>>
implements EventReaderProvider<StartProgramEvent> {

private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventReaderExtensionProvider.class);
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
private final Collection<String> enabledEventReaders;

/**
* Retrieve enabled event readers from cConf.
*
* @param cConf configuration
*/
@Inject
public StartProgramEventReaderExtensionProvider(CConfiguration cConf) {
super(cConf.get(Constants.Event.START_EVENTS_READER_EXTENSIONS_DIR) != null
? cConf.get(Constants.Event.START_EVENTS_READER_EXTENSIONS_DIR) : "");
this.enabledEventReaders = cConf.getStringCollection(
Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST);
if (this.enabledEventReaders == null || this.enabledEventReaders.isEmpty()) {
LOG.debug("No event readers enabled.");
return;
}
LOG.debug("Enabled event readers are {} .", enabledEventReaders);
}

private static Set<String> createAllowedResources() {
try {
return ClassPathResources.getResourcesWithDependencies(EventReader.class.getClassLoader(),
EventReader.class);
} catch (IOException e) {
throw new RuntimeException("Failed to trace dependencies for reader extension. "
+ "Usage of events reader might fail.", e);
}
}

public Map<String, EventReader<StartProgramEvent>> loadEventReaders() {
return getAll();
}

@Override
protected Set<String> getSupportedTypesForProvider(EventReader eventReader) {
return enabledEventReaders.stream().collect(Collectors.toSet());
}

@Override
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
// Only allow spi classes.
return new FilterClassLoader.Filter() {
@Override
public boolean acceptResource(String resource) {
return ALLOWED_RESOURCES.contains(resource);
}

@Override
public boolean acceptPackage(String packageName) {
return ALLOWED_PACKAGES.contains(packageName);
}
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import static org.mockito.Matchers.any;

import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.http.AppFabricTestBase;
import io.cdap.cdap.internal.events.dummy.DummyEventReader;
import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider;
import io.cdap.cdap.proto.id.ProgramReference;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.cdap.spi.events.StartProgramEventDetails;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Tests for the {@link StartProgramEventHandler}.
*/
public class StartProgramEventHandlerTest extends AppFabricTestBase {
private StartProgramEventHandler eventHandler;
private static final Logger LOGGER = LoggerFactory.getLogger(StartProgramEventHandlerTest.class);
private ProgramLifecycleService lifecycleService = Mockito.mock(ProgramLifecycleService.class);
private DummyEventReader<StartProgramEvent> eventReader = Mockito.mock(DummyEventReader.class);
@Mock
CConfiguration cConf = Mockito.mock(CConfiguration.class);
private final Injector injector = Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
bind(ProgramLifecycleService.class).toInstance(lifecycleService);
bind(CConfiguration.class).toInstance(cConf);
bind(EventReader.class).toInstance(eventReader);
bind(new TypeLiteral<EventReaderProvider<StartProgramEvent>>() {})
.toInstance(new DummyEventReaderExtensionProvider<StartProgramEvent>(eventReader));
bind(StartProgramEventHandler.class).in(Scopes.SINGLETON);
}
});

@Before
public void setup() {
eventHandler = injector.getInstance(StartProgramEventHandler.class);
cConf.setInt(Constants.AppFabric.PROGRAM_STATUS_RETRY_STRATEGY_PREFIX
+ Constants.Retry.MAX_TIME_SECS, 6000);
cConf.setInt(Constants.Event.EVENTS_READER_ACK_BUFFER, 0);
}


@Test
public void testInitialize() {
try {
eventHandler.initialize();
} catch (Exception ex) {
LOGGER.error("Error during Event Handler initialization.", ex);
Assert.fail("Error while initializing Event Handler");
}
}

@Test
public void testMessageWorkflow() throws Exception {
assert (lifecycleService != null);
Mockito.doReturn(RunIds.generate()).when(lifecycleService).run((ProgramReference) any(), any(),
Mockito.anyBoolean());
Mockito.when(eventReader.getMessages()).thenReturn(mockedEvents());

Mockito.doCallRealMethod().when(eventReader).pull(1);
Mockito.doCallRealMethod().when(eventReader).initialize(any());

try {
eventHandler.initialize();
} catch (Exception ex) {
LOGGER.error("Error during Event Handler initialization.", ex);
Assert.fail("Error while initializing Event Handler");
}
try {
eventHandler.runOneIteration();
} catch (Exception e) {
LOGGER.error("Error during message process.", e);
Assert.fail("Error during message process");
}
Mockito.verify(lifecycleService).run((ProgramReference) any(), any(), Mockito.anyBoolean());
}

private Collection<StartProgramEvent> mockedEvents() {
ArrayList<StartProgramEvent> eventList = new ArrayList<>();
eventList.add(new StartProgramEvent(1, "1",
new StartProgramEventDetails("app1",
"namespace1", "id1", "workflows", null)));
return eventList;
}
}
Loading

0 comments on commit 5277104

Please sign in to comment.