Skip to content

Commit

Permalink
StartProgramEvent Subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
codeNinjaDev committed Jul 19, 2023
1 parent eab9de6 commit 031752f
Show file tree
Hide file tree
Showing 7 changed files with 482 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventPublishManager;
import io.cdap.cdap.internal.events.EventPublisher;
import io.cdap.cdap.internal.events.EventReaderProvider;
import io.cdap.cdap.internal.events.EventSubscriber;
import io.cdap.cdap.internal.events.EventSubscriberManager;
import io.cdap.cdap.internal.events.EventWriterExtensionProvider;
import io.cdap.cdap.internal.events.EventWriterProvider;
import io.cdap.cdap.internal.events.MetricsProvider;
import io.cdap.cdap.internal.events.ProgramStatusEventPublisher;
import io.cdap.cdap.internal.events.SparkProgramStatusMetricsProvider;
import io.cdap.cdap.internal.events.StartProgramEventReaderExtensionProvider;
import io.cdap.cdap.internal.events.StartProgramEventSubscriber;
import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory;
import io.cdap.cdap.internal.profile.ProfileService;
import io.cdap.cdap.internal.provision.ProvisionerModule;
Expand All @@ -149,6 +152,7 @@
import io.cdap.cdap.security.impersonation.UnsupportedUGIProvider;
import io.cdap.cdap.security.store.SecureStoreHandler;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.http.HttpHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -400,8 +404,11 @@ protected void configure() {
Multibinder.newSetBinder(binder(), EventPublisher.class);
eventPublishersBinder.addBinding().to(ProgramStatusEventPublisher.class);
bind(EventPublishManager.class).in(Scopes.SINGLETON);
bind(new TypeLiteral<EventReaderProvider<StartProgramEvent>>() {})
.to(StartProgramEventReaderExtensionProvider.class);
Multibinder<EventSubscriber> eventSubscribersBinder =
Multibinder.newSetBinder(binder(), EventSubscriber.class);
eventSubscribersBinder.addBinding().to(StartProgramEventSubscriber.class);
bind(EventSubscriberManager.class).in(Scopes.SINGLETON);
bind(EventWriterProvider.class).to(EventWriterExtensionProvider.class);
bind(MetricsProvider.class).to(SparkProgramStatusMetricsProvider.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.lang.ClassPathResources;
import io.cdap.cdap.common.lang.FilterClassLoader;
import io.cdap.cdap.extension.AbstractExtensionLoader;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.StartProgramEvent;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link StartProgramEventReaderExtensionProvider}
* which provides Event reader extension classes.
* extending from {@link AbstractExtensionLoader}
*/
public class StartProgramEventReaderExtensionProvider
extends AbstractExtensionLoader<String, EventReader<StartProgramEvent>>
implements EventReaderProvider<StartProgramEvent> {

private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventReaderExtensionProvider.class);
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
private final Collection<String> enabledEventReaders;

/**
* Retrieve enabled event readers from cConf.
*
* @param cConf configuration
*/
@Inject
public StartProgramEventReaderExtensionProvider(CConfiguration cConf) {
super(cConf.get(Constants.Event.START_EVENTS_READER_EXTENSIONS_DIR, ""));
this.enabledEventReaders = cConf.getStringCollection(
Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST);
if (this.enabledEventReaders == null || this.enabledEventReaders.isEmpty()) {
return;
}
LOG.debug("Loaded event readers are {} .", enabledEventReaders);
}

private static Set<String> createAllowedResources() {
try {
return ClassPathResources.getResourcesWithDependencies(EventReader.class.getClassLoader(),
EventReader.class);
} catch (IOException e) {
throw new RuntimeException("Failed to trace dependencies for reader extension. "
+ "Usage of events reader might fail.", e);
}
}

public Map<String, EventReader<StartProgramEvent>> loadEventReaders() {
return getAll();
}

@Override
protected Set<String> getSupportedTypesForProvider(EventReader eventReader) {
if (enabledEventReaders == null ||
!enabledEventReaders.contains(eventReader.getClass().getName())) {
LOG.debug("{} is not present in the allowed list of event readers.",
eventReader.getClass().getName());
return Collections.emptySet();
}

return Collections.singleton(eventReader.getClass().getName());
}

@Override
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
// Only allow spi classes.
return new FilterClassLoader.Filter() {
@Override
public boolean acceptResource(String resource) {
return ALLOWED_RESOURCES.contains(resource);
}

@Override
public boolean acceptPackage(String packageName) {
return ALLOWED_PACKAGES.contains(packageName);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramReference;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.events.EventReader;
import io.cdap.cdap.spi.events.EventResult;
import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.cdap.spi.events.StartProgramEventDetails;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Periodically checks event reader for StartProgramEvents. If any events available, trigger
* startProgram method on each event.
*/
public class StartProgramEventSubscriber extends EventSubscriber {

private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriber.class);

private final CConfiguration cConf;
private final EventReaderProvider<StartProgramEvent> extensionProvider;
private final ProgramLifecycleService lifecycleService;
private ScheduledExecutorService executor;
private Collection<EventReader<StartProgramEvent>> readers;
private ExecutorService threadPoolExecutor;

/**
* Create instance that handles StartProgramEvents.
*
* @param cConf CDAP configuration
* @param extensionProvider eventReaderProvider for StartProgramEvent Readers
* @param lifecycleService to publish start programs to TMS
*/
@Inject
StartProgramEventSubscriber(CConfiguration cConf, EventReaderProvider<StartProgramEvent> extensionProvider,
ProgramLifecycleService lifecycleService) {
this.cConf = cConf;
this.extensionProvider = extensionProvider;
this.lifecycleService = lifecycleService;
}

@Override
public boolean initialize() {
readers = new HashSet<>(extensionProvider.loadEventReaders().values());
Iterator<EventReader<StartProgramEvent>> eventReaderIterator
= readers.iterator();
while (eventReaderIterator.hasNext()) {
EventReader<StartProgramEvent> reader = eventReaderIterator.next();
try {
reader.initialize(
new DefaultEventReaderContext(String.format("%s.%s.",
Constants.Event.START_EVENT_PREFIX, reader.getClass().getName()), cConf));
} catch (Exception e) {
LOG.error("Failed to initialize reader: {}", reader.getClass().getSimpleName(), e);
eventReaderIterator.remove();
}
}
if (readers.isEmpty()) {
return false;
}
threadPoolExecutor = new ThreadPoolExecutor(readers.size(), readers.size(), 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
return true;
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0,
cConf.getInt(Constants.Event.START_PROGRAM_EVENT_READER_POLL_DELAY), TimeUnit.SECONDS);
}

@Override
protected void runOneIteration() throws Exception {
if (threadPoolExecutor != null) {
for (EventReader<StartProgramEvent> reader : readers) {
threadPoolExecutor.execute(() -> {
processEvents(reader);
});
}
}
}

@VisibleForTesting
void processEvents(EventReader<StartProgramEvent> reader) {
EventResult<StartProgramEvent> result = reader.pull(cConf.getInt(
Constants.Event.START_PROGRAM_EVENT_FETCH_SIZE));
result.consumeEvents(this::startProgram);
}

/**
* Attempt to publish program to TMS.
*
* @param event Event containing program info
* @throws RuntimeException if starting program fails
*/
private void startProgram(StartProgramEvent event) {
StartProgramEventDetails eventDetails = event.getEventDetails();
try {
ProgramType programType = ProgramType.valueOfCategoryName(eventDetails.getProgramType());
ProgramReference programReference = new ProgramReference(eventDetails.getNamespaceId(),
eventDetails.getAppId(), programType,
eventDetails.getProgramId());
LOG.debug("Starting pipeline {}, with args: {}, programReference: {}",
eventDetails.getAppId(), eventDetails.getArgs(), programReference);
RunId runId = lifecycleService.run(programReference, eventDetails.getArgs(), true);
ProgramRunId programRunId = new ProgramRunId(
programReference.getNamespace(), programReference.getApplication(), programType,
programReference.getProgram(), runId.getId());
LOG.info("Started pipeline, ProgramRunId: {}", programRunId.toMetadataEntity());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected final ScheduledExecutorService executor() {
executor =
Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("start-program-event-handler-scheduler"));
return executor;
}

@Override
protected void startUp() throws Exception {
LOG.info("StartProgramEventSubscriber started.");
}

@Override
protected void shutDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdownNow();
}
LOG.info("StartProgramEventSubscriber successfully shut down.");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.events;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.internal.events.dummy.DummyEventReader;
import io.cdap.cdap.spi.events.EventReader;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;


/**
* Tests for {@link StartProgramEventReaderExtensionProvider}
*/
public class StartProgramEventReaderExtensionProviderTest {

@Test
public void testEnabledEventReaderFilter() {
EventReader mockReader = new DummyEventReader();
String mockReaderName = mockReader.getClass().getName();
CConfiguration cConf = CConfiguration.create();

StartProgramEventReaderExtensionProvider readerExtensionProvider1
= new StartProgramEventReaderExtensionProvider(cConf);
Set<String> test1 = readerExtensionProvider1.getSupportedTypesForProvider(mockReader);
Assert.assertTrue(test1.isEmpty());

//Test with reader class name enabled
cConf.setStrings(Constants.Event.START_EVENTS_READER_EXTENSIONS_ENABLED_LIST, mockReaderName);
StartProgramEventReaderExtensionProvider readerExtensionProvider2
= new StartProgramEventReaderExtensionProvider(cConf);
Set<String> test2 = readerExtensionProvider2.getSupportedTypesForProvider(mockReader);
Assert.assertTrue(test2.contains(mockReaderName));
}
}
Loading

0 comments on commit 031752f

Please sign in to comment.