From ea4c1c499dd29ac166f458f31420de25a3dc48e1 Mon Sep 17 00:00:00 2001 From: Ali Anwar Date: Mon, 2 Apr 2018 02:22:12 -0700 Subject: [PATCH] wip - cdap has conflict w/ the netty version packaged with spark. https://github.com/caskdata/cdap/pull/9712/commits/3653b0094d97c8d4d8a30a1f07aca0188c86236a#diff-bdf31212fe8216c030b14ea9045edf78 https://github.com/druid-io/druid/pull/5059 https://www.snip2code.com/Snippet/1811802/flink---beam https://groups.google.com/forum/#!topic/play-framework/TWa18IfZ5kA --- .../batch/dataproc/DataProcTwillPreparer.java | 771 ++++++++++++++++++ .../batch/dataproc/DataProcTwillRunner.java | 78 ++ .../cask/cdap/app/runtime/spark/DSPRMain.java | 52 +- 3 files changed, 884 insertions(+), 17 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillPreparer.java create mode 100644 cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillRunner.java diff --git a/cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillPreparer.java b/cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillPreparer.java new file mode 100644 index 000000000000..c24bd233fea1 --- /dev/null +++ b/cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillPreparer.java @@ -0,0 +1,771 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.cdap.internal.app.runtime.batch.dataproc; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.ByteStreams; +import com.google.common.io.OutputSupplier; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import joptsimple.OptionSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.twill.api.ClassAcceptor; +import org.apache.twill.api.Configs; +import org.apache.twill.api.EventHandlerSpecification; +import org.apache.twill.api.LocalFile; +import org.apache.twill.api.RunId; +import org.apache.twill.api.RuntimeSpecification; +import org.apache.twill.api.SecureStore; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillPreparer; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.LogEntry; +import org.apache.twill.api.logging.LogHandler; +import org.apache.twill.filesystem.Location; +import org.apache.twill.internal.ApplicationBundler; +import org.apache.twill.internal.Arguments; +import org.apache.twill.internal.Constants; +import org.apache.twill.internal.DefaultLocalFile; +import org.apache.twill.internal.DefaultRuntimeSpecification; +import org.apache.twill.internal.DefaultTwillSpecification; +import org.apache.twill.internal.EnvKeys; +import org.apache.twill.internal.JvmOptions; +import org.apache.twill.internal.LogOnlyEventHandler; +import org.apache.twill.internal.ProcessController; +import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.TwillRuntimeSpecification; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; +import org.apache.twill.internal.appmaster.ApplicationMasterMain; +import org.apache.twill.internal.container.TwillContainerMain; +import org.apache.twill.internal.io.LocationCache; +import org.apache.twill.internal.json.ArgumentsCodec; +import org.apache.twill.internal.json.LocalFileCodec; +import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; +import org.apache.twill.internal.utils.Dependencies; +import org.apache.twill.internal.utils.Paths; +import org.apache.twill.internal.utils.Resources; +import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory; +import org.apache.twill.internal.yarn.YarnAppClient; +import org.apache.twill.internal.yarn.YarnApplicationReport; +import org.apache.twill.internal.yarn.YarnUtils; +import org.apache.twill.launcher.FindFreePort; +import org.apache.twill.launcher.TwillLauncher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import javax.annotation.Nullable; + +/** + * + */ +public class DataProcTwillPreparer implements TwillPreparer { + private static final Logger LOG = LoggerFactory.getLogger(DataProcTwillPreparer.class); + private static final Function, String> CLASS_TO_NAME = new Function, String>() { + public String apply(Class cls) { + return cls.getName(); + } + }; + private final Configuration config; + private final TwillSpecification twillSpec; + private final String zkConnectString; + private final Location appLocation; + //private final YarnTwillControllerFactory controllerFactory; + private final RunId runId; + private final List logHandlers = Lists.newArrayList(); + private final List arguments = Lists.newArrayList(); + private final Set> dependencies = Sets.newIdentityHashSet(); + private final List resources = Lists.newArrayList(); + private final List classPaths = Lists.newArrayList(); + private final ListMultimap runnableArgs = ArrayListMultimap.create(); + private final Map> environments = Maps.newHashMap(); + private final List applicationClassPaths = Lists.newArrayList(); + //private final Credentials credentials; + private final Map> logLevels = Maps.newHashMap(); + private final LocationCache locationCache; + private final Map maxRetries = Maps.newHashMap(); + private final Map> runnableConfigs = Maps.newHashMap(); + private final Map runnableExtraOptions = Maps.newHashMap(); + private String extraOptions; + private JvmOptions.DebugOptions debugOptions; + private String schedulerQueue; + private ClassAcceptor classAcceptor; + private String classLoaderClassName; + + DataProcTwillPreparer(Configuration config, TwillSpecification twillSpec, RunId runId, String zkConnectString, + Location appLocation, @Nullable String extraOptions, LocationCache locationCache) { + //YarnTwillControllerFactory controllerFactory) { + this.debugOptions = JvmOptions.DebugOptions.NO_DEBUG; + this.config = config; + this.twillSpec = twillSpec; + this.runId = runId; + this.zkConnectString = zkConnectString; + this.appLocation = appLocation; + //this.controllerFactory = controllerFactory; + //this.credentials = this.createCredentials(); + this.extraOptions = extraOptions == null ? "" : extraOptions; + this.classAcceptor = new ClassAcceptor(); + this.locationCache = locationCache; + } + + private void confirmRunnableName(String runnableName) { + Preconditions.checkNotNull(runnableName); + Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName), + "Runnable %s is not defined in the application.", runnableName); + } + + @Override + public TwillPreparer withConfiguration(Map config) { + for (Map.Entry entry : config.entrySet()) { + this.config.set(entry.getKey(), entry.getValue()); + } + return this; + } + + @Override + public TwillPreparer withConfiguration(String runnableName, Map config) { + confirmRunnableName(runnableName); + runnableConfigs.put(runnableName, Maps.newHashMap(config)); + return this; + } + + @Override + public TwillPreparer addLogHandler(LogHandler handler) { + logHandlers.add(handler); + return this; + } + + @Override + public TwillPreparer setUser(String user) { + return this; + } + + @Override + public TwillPreparer setSchedulerQueue(String name) { + this.schedulerQueue = name; + return this; + } + + @Override + public TwillPreparer setJVMOptions(String options) { + Preconditions.checkArgument(options != null, "JVM options cannot be null."); + this.extraOptions = options; + return this; + } + + @Override + public TwillPreparer setJVMOptions(String runnableName, String options) { + confirmRunnableName(runnableName); + Preconditions.checkArgument(options != null, "JVM options cannot be null."); + runnableExtraOptions.put(runnableName, options); + return this; + } + + @Override + public TwillPreparer addJVMOptions(String options) { + Preconditions.checkArgument(options != null, "JVM options cannot be null."); + this.extraOptions = extraOptions.isEmpty() ? options : extraOptions + " " + options; + return this; + } + + @Override + public TwillPreparer enableDebugging(String... runnables) { + return enableDebugging(false, runnables); + } + + @Override + public TwillPreparer enableDebugging(boolean doSuspend, String... runnables) { + for (String runnableName : runnables) { + confirmRunnableName(runnableName); + } + this.debugOptions = new JvmOptions.DebugOptions(true, doSuspend, ImmutableSet.copyOf(runnables)); + return this; + } + + @Override + public TwillPreparer withApplicationArguments(String... args) { + return withApplicationArguments(ImmutableList.copyOf(args)); + } + + @Override + public TwillPreparer withApplicationArguments(Iterable args) { + Iterables.addAll(arguments, args); + return this; + } + + @Override + public TwillPreparer withArguments(String runnableName, String... args) { + return withArguments(runnableName, ImmutableList.copyOf(args)); + } + + @Override + public TwillPreparer withArguments(String runnableName, Iterable args) { + confirmRunnableName(runnableName); + runnableArgs.putAll(runnableName, args); + return this; + } + + @Override + public TwillPreparer withDependencies(Class... classes) { + return withDependencies(ImmutableList.copyOf(classes)); + } + + @Override + public TwillPreparer withDependencies(Iterable> classes) { + Iterables.addAll(dependencies, classes); + return this; + } + + @Override + public TwillPreparer withResources(URI... resources) { + return withResources(ImmutableList.copyOf(resources)); + } + + @Override + public TwillPreparer withResources(Iterable resources) { + Iterables.addAll(this.resources, resources); + return this; + } + + @Override + public TwillPreparer withClassPaths(String... classPaths) { + return withClassPaths(ImmutableList.copyOf(classPaths)); + } + + @Override + public TwillPreparer withClassPaths(Iterable classPaths) { + Iterables.addAll(this.classPaths, classPaths); + return this; + } + + @Override + public TwillPreparer withEnv(Map env) { + // Add the given environments to all runnables + for (String runnableName : twillSpec.getRunnables().keySet()) { + setEnv(runnableName, env, false); + } + return this; + } + + @Override + public TwillPreparer withEnv(String runnableName, Map env) { + confirmRunnableName(runnableName); + setEnv(runnableName, env, true); + return this; + } + + @Override + public TwillPreparer withApplicationClassPaths(String... classPaths) { + return withApplicationClassPaths(ImmutableList.copyOf(classPaths)); + } + + @Override + public TwillPreparer withApplicationClassPaths(Iterable classPaths) { + Iterables.addAll(this.applicationClassPaths, classPaths); + return this; + } + + @Override + public TwillPreparer withBundlerClassAcceptor(ClassAcceptor classAcceptor) { + this.classAcceptor = classAcceptor; + return this; + } + + @Override + public TwillPreparer withMaxRetries(String runnableName, int maxRetries) { + confirmRunnableName(runnableName); + this.maxRetries.put(runnableName, maxRetries); + return this; + } + + @Override + public TwillPreparer addSecureStore(SecureStore secureStore) { + return this; + } + + @Override + public TwillPreparer setLogLevel(LogEntry.Level logLevel) { + return setLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, logLevel)); + } + + @Override + public TwillPreparer setLogLevels(Map logLevels) { + Preconditions.checkNotNull(logLevels); + for (String runnableName : twillSpec.getRunnables().keySet()) { + saveLogLevels(runnableName, logLevels); + } + return this; + } + + @Override + public TwillPreparer setLogLevels(String runnableName, Map runnableLogLevels) { + confirmRunnableName(runnableName); + Preconditions.checkNotNull(runnableLogLevels); + Preconditions.checkArgument(!(logLevels.containsKey(Logger.ROOT_LOGGER_NAME) + && logLevels.get(Logger.ROOT_LOGGER_NAME) == null)); + saveLogLevels(runnableName, runnableLogLevels); + return this; + } + + @Override + public TwillPreparer setClassLoader(String classLoaderClassName) { + this.classLoaderClassName = classLoaderClassName; + return this; + } + + + @Override + public TwillController start() { + return start(Constants.APPLICATION_MAX_START_SECONDS, TimeUnit.SECONDS); + } + + @Override + public TwillController start(long timeout, TimeUnit timeoutUnit) { + return null; + } + + + /** + * Returns the local staging directory based on the configuration. + */ + private File getLocalStagingDir() { + return new File(config.get(Configs.Keys.LOCAL_STAGING_DIRECTORY, Configs.Defaults.LOCAL_STAGING_DIRECTORY)); + } + + /** + * Returns the extra options for the container JVM. + */ + private String addClassLoaderClassName(String extraOptions) { + if (classLoaderClassName == null) { + return extraOptions; + } + String classLoaderProperty = "-D" + Constants.TWILL_CONTAINER_CLASSLOADER + "=" + classLoaderClassName; + return extraOptions.isEmpty() ? classLoaderProperty : extraOptions + " " + classLoaderProperty; + } + + private void setEnv(String runnableName, Map env, boolean overwrite) { + Map environment = environments.get(runnableName); + if (environment == null) { + environment = new LinkedHashMap<>(env); + environments.put(runnableName, environment); + return; + } + + for (Map.Entry entry : env.entrySet()) { + if (overwrite || !environment.containsKey(entry.getKey())) { + environment.put(entry.getKey(), entry.getValue()); + } + } + } + + private void saveLogLevels(String runnableName, Map logLevels) { + Map newLevels = new HashMap<>(); + for (Map.Entry entry : logLevels.entrySet()) { + Preconditions.checkArgument(entry.getValue() != null, + "Log level cannot be null for logger {}", entry.getKey()); + newLevels.put(entry.getKey(), entry.getValue().name()); + } + this.logLevels.put(runnableName, newLevels); + } + + private LocalFile createLocalFile(String name, Location location) throws IOException { + return createLocalFile(name, location, false); + } + + private LocalFile createLocalFile(String name, Location location, boolean archive) throws IOException { + return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null); + } + + private void createTwillJar(final ApplicationBundler bundler, + final YarnAppClient yarnAppClient, + Map localFiles) throws IOException { + LOG.debug("Create and copy {}", Constants.Files.TWILL_JAR); + Location location = locationCache.get(Constants.Files.TWILL_JAR, new LocationCache.Loader() { + @Override + public void load(String name, Location targetLocation) throws IOException { + // Stuck in the yarnAppClient class to make bundler being able to pickup the right yarn-client version + bundler.createBundle(targetLocation, ApplicationMasterMain.class, + yarnAppClient.getClass(), TwillContainerMain.class, OptionSpec.class); + } + }); + + LOG.debug("Done {}", Constants.Files.TWILL_JAR); + localFiles.put(Constants.Files.TWILL_JAR, createLocalFile(Constants.Files.TWILL_JAR, location, true)); + } + + private void createApplicationJar(final ApplicationBundler bundler, + Map localFiles) throws IOException { + try { + final Set> classes = Sets.newIdentityHashSet(); + classes.addAll(dependencies); + + ClassLoader classLoader = getClassLoader(); + for (RuntimeSpecification spec : twillSpec.getRunnables().values()) { + classes.add(classLoader.loadClass(spec.getRunnableSpecification().getClassName())); + } + + // Add the TwillRunnableEventHandler class + if (twillSpec.getEventHandler() != null) { + classes.add(getClassLoader().loadClass(twillSpec.getEventHandler().getClassName())); + } + + // The location name is computed from the MD5 of all the classes names + // The localized name is always APPLICATION_JAR + List classList = Lists.newArrayList(Iterables.transform(classes, CLASS_TO_NAME)); + Collections.sort(classList); + Hasher hasher = Hashing.md5().newHasher(); + for (String name : classList) { + hasher.putString(name); + } + // Only depends on class list so that it can be reused across different launches + String name = hasher.hash().toString() + "-" + Constants.Files.APPLICATION_JAR; + + LOG.debug("Create and copy {}", Constants.Files.APPLICATION_JAR); + Location location = locationCache.get(name, new LocationCache.Loader() { + @Override + public void load(String name, Location targetLocation) throws IOException { + bundler.createBundle(targetLocation, classes); + } + }); + + LOG.debug("Done {}", Constants.Files.APPLICATION_JAR); + + localFiles.put(Constants.Files.APPLICATION_JAR, + createLocalFile(Constants.Files.APPLICATION_JAR, location, true)); + + } catch (ClassNotFoundException e) { + throw Throwables.propagate(e); + } + } + + private void createResourcesJar(ApplicationBundler bundler, Map localFiles) throws IOException { + // If there is no resources, no need to create the jar file. + if (resources.isEmpty()) { + return; + } + + LOG.debug("Create and copy {}", Constants.Files.RESOURCES_JAR); + Location location = createTempLocation(Constants.Files.RESOURCES_JAR); + bundler.createBundle(location, Collections.>emptyList(), resources); + LOG.debug("Done {}", Constants.Files.RESOURCES_JAR); + localFiles.put(Constants.Files.RESOURCES_JAR, createLocalFile(Constants.Files.RESOURCES_JAR, location, true)); + } + + private void createRuntimeConfigJar(Path dir, Map localFiles) throws IOException { + LOG.debug("Create and copy {}", Constants.Files.RUNTIME_CONFIG_JAR); + + // Jar everything under the given directory, which contains different files needed by AM/runnable containers + Location location = createTempLocation(Constants.Files.RUNTIME_CONFIG_JAR); + try ( + JarOutputStream jarOutput = new JarOutputStream(location.getOutputStream()); + DirectoryStream stream = Files.newDirectoryStream(dir) + ) { + for (Path path : stream) { + jarOutput.putNextEntry(new JarEntry(path.getFileName().toString())); + Files.copy(path, jarOutput); + jarOutput.closeEntry(); + } + } + + LOG.debug("Done {}", Constants.Files.RUNTIME_CONFIG_JAR); + localFiles.put(Constants.Files.RUNTIME_CONFIG_JAR, + createLocalFile(Constants.Files.RUNTIME_CONFIG_JAR, location, true)); + } + + /** + * Based on the given {@link TwillSpecification}, upload LocalFiles to Yarn Cluster. + * @param twillSpec The {@link TwillSpecification} for populating resource. + */ + private Multimap populateRunnableLocalFiles(TwillSpecification twillSpec) throws IOException { + Multimap localFiles = HashMultimap.create(); + + LOG.debug("Populating Runnable LocalFiles"); + for (Map.Entry entry: twillSpec.getRunnables().entrySet()) { + String runnableName = entry.getKey(); + for (LocalFile localFile : entry.getValue().getLocalFiles()) { + Location location; + + URI uri = localFile.getURI(); + if (appLocation.toURI().getScheme().equals(uri.getScheme())) { + // If the source file location is having the same scheme as the target location, no need to copy + location = appLocation.getLocationFactory().create(uri); + } else { + URL url = uri.toURL(); + LOG.debug("Create and copy {} : {}", runnableName, url); + // Preserves original suffix for expansion. + location = copyFromURL(url, + createTempLocation(Paths.addExtension(url.getFile(), localFile.getName()))); + LOG.debug("Done {} : {}", runnableName, url); + } + + localFiles.put(runnableName, + new DefaultLocalFile(localFile.getName(), location.toURI(), location.lastModified(), + location.length(), localFile.isArchive(), localFile.getPattern())); + } + } + LOG.debug("Done Runnable LocalFiles"); + return localFiles; + } + + private TwillRuntimeSpecification saveSpecification(TwillSpecification spec, Path targetFile) throws IOException { + final Multimap runnableLocalFiles = populateRunnableLocalFiles(spec); + + // Rewrite LocalFiles inside twillSpec + Map runtimeSpec = Maps.transformEntries( + spec.getRunnables(), new Maps.EntryTransformer() { + @Override + public RuntimeSpecification transformEntry(String key, RuntimeSpecification value) { + return new DefaultRuntimeSpecification(value.getName(), value.getRunnableSpecification(), + value.getResourceSpecification(), runnableLocalFiles.get(key)); + } + }); + + // Serialize into a local temp file. + LOG.debug("Creating {}", targetFile); + try (Writer writer = Files.newBufferedWriter(targetFile, StandardCharsets.UTF_8)) { + EventHandlerSpecification eventHandler = spec.getEventHandler(); + if (eventHandler == null) { + eventHandler = new LogOnlyEventHandler().configure(); + } + TwillSpecification newTwillSpec = + new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), + spec.getPlacementPolicies(), eventHandler); + Map configMap = Maps.newHashMap(); + for (Map.Entry entry : config) { + if (entry.getKey().startsWith("twill.")) { + configMap.put(entry.getKey(), entry.getValue()); + } + } + + TwillRuntimeSpecification twillRuntimeSpec = new TwillRuntimeSpecification( + newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(), + appLocation.toURI(), zkConnectString, runId, twillSpec.getName(), + config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS), + logLevels, maxRetries, configMap, runnableConfigs); + TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec, writer); + LOG.debug("Done {}", targetFile); + return twillRuntimeSpec; + } + } + + private void saveLogback(Path targetFile) throws IOException { + URL url = getClass().getClassLoader().getResource(Constants.Files.LOGBACK_TEMPLATE); + if (url == null) { + return; + } + + LOG.debug("Creating {}", targetFile); + try (InputStream is = url.openStream()) { + Files.copy(is, targetFile); + } + LOG.debug("Done {}", targetFile); + } + + /** + * Creates the launcher.jar for launch the main application. + */ + private void createLauncherJar(Map localFiles) throws URISyntaxException, IOException { + + LOG.debug("Create and copy {}", Constants.Files.LAUNCHER_JAR); + + Location location = locationCache.get(Constants.Files.LAUNCHER_JAR, new LocationCache.Loader() { + @Override + public void load(String name, Location targetLocation) throws IOException { + // Create a jar file with the TwillLauncher and FindFreePort and dependent classes inside. + try (JarOutputStream jarOut = new JarOutputStream(targetLocation.getOutputStream())) { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = getClass().getClassLoader(); + } + Dependencies.findClassDependencies(classLoader, new ClassAcceptor() { + @Override + public boolean accept(String className, URL classUrl, URL classPathUrl) { + try { + jarOut.putNextEntry(new JarEntry(className.replace('.', '/') + ".class")); + try (InputStream is = classUrl.openStream()) { + ByteStreams.copy(is, jarOut); + } + } catch (IOException e) { + throw Throwables.propagate(e); + } + return true; + } + }, TwillLauncher.class.getName(), FindFreePort.class.getName()); + } + } + }); + + LOG.debug("Done {}", Constants.Files.LAUNCHER_JAR); + + localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location)); + } + + private void saveClassPaths(Path targetDir) throws IOException { + Files.write(targetDir.resolve(Constants.Files.APPLICATION_CLASSPATH), + Joiner.on(':').join(applicationClassPaths).getBytes(StandardCharsets.UTF_8)); + Files.write(targetDir.resolve(Constants.Files.CLASSPATH), + Joiner.on(':').join(classPaths).getBytes(StandardCharsets.UTF_8)); + } + + private JvmOptions saveJvmOptions(final Path targetPath) throws IOException { + // Append runnable specific extra options. + Map runnableExtraOptions = Maps.newHashMap( + Maps.transformValues(this.runnableExtraOptions, new Function() { + @Override + public String apply(String options) { + return addClassLoaderClassName(extraOptions.isEmpty() ? options : extraOptions + " " + options); + } + })); + + String globalOptions = addClassLoaderClassName(extraOptions); + JvmOptions jvmOptions = new JvmOptions(globalOptions, runnableExtraOptions, debugOptions); + if (globalOptions.isEmpty() && runnableExtraOptions.isEmpty() + && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) { + // If no vm options, no need to localize the file. + return jvmOptions; + } + + LOG.debug("Creating {}", targetPath); + try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) { + new Gson().toJson(new JvmOptions(globalOptions, runnableExtraOptions, debugOptions), writer); + } + LOG.debug("Done {}", targetPath); + return jvmOptions; + } + + private void saveArguments(Arguments arguments, final Path targetPath) throws IOException { + LOG.debug("Creating {}", targetPath); + ArgumentsCodec.encode(arguments, new OutputSupplier() { + @Override + public Writer getOutput() throws IOException { + return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8); + } + }); + LOG.debug("Done {}", targetPath); + } + + private void saveEnvironments(Path targetPath) throws IOException { + if (environments.isEmpty()) { + return; + } + + LOG.debug("Creating {}", targetPath); + try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) { + new Gson().toJson(environments, writer); + } + LOG.debug("Done {}", targetPath); + } + + /** + * Serializes the information for files that are localized to all YARN containers. + */ + private void createLocalizeFilesJson(Map localFiles) throws IOException { + LOG.debug("Create and copy {}", Constants.Files.LOCALIZE_FILES); + Location location = createTempLocation(Constants.Files.LOCALIZE_FILES); + + // Serialize the list of LocalFiles, except the one we are generating here, as this file is used by AM only. + // This file should never use LocationCache. + try (Writer writer = new OutputStreamWriter(location.getOutputStream(), StandardCharsets.UTF_8)) { + new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec()) + .create().toJson(localFiles.values(), new TypeToken>() { + }.getType(), writer); + } + LOG.debug("Done {}", Constants.Files.LOCALIZE_FILES); + localFiles.put(Constants.Files.LOCALIZE_FILES, createLocalFile(Constants.Files.LOCALIZE_FILES, location)); + } + + private Location copyFromURL(URL url, Location target) throws IOException { + try ( + InputStream is = url.openStream(); + OutputStream os = new BufferedOutputStream(target.getOutputStream()) + ) { + ByteStreams.copy(is, os); + } + return target; + } + + private Location createTempLocation(String fileName) { + String name; + String suffix = Paths.getExtension(fileName); + + name = fileName.substring(0, fileName.length() - suffix.length() - 1); + + try { + return appLocation.append(name).getTempFile('.' + suffix); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + /** + * Returns the context ClassLoader if there is any, otherwise, returns ClassLoader of this class. + */ + private ClassLoader getClassLoader() { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + return classLoader == null ? getClass().getClassLoader() : classLoader; + } + + private ApplicationBundler createBundler(ClassAcceptor classAcceptor) { + return new ApplicationBundler(classAcceptor).setTempDir(getLocalStagingDir()); + } + +} diff --git a/cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillRunner.java b/cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillRunner.java new file mode 100644 index 000000000000..17bbc688cdf2 --- /dev/null +++ b/cdap-app-fabric/src/main/java/co/cask/cdap/internal/app/runtime/batch/dataproc/DataProcTwillRunner.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.cdap.internal.app.runtime.batch.dataproc; + +import org.apache.twill.api.ResourceSpecification; +import org.apache.twill.api.RunId; +import org.apache.twill.api.SecureStoreUpdater; +import org.apache.twill.api.TwillApplication; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillPreparer; +import org.apache.twill.api.TwillRunnable; +import org.apache.twill.api.TwillRunner; +import org.apache.twill.api.security.SecureStoreRenewer; +import org.apache.twill.common.Cancellable; + +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class DataProcTwillRunner implements TwillRunner { + + @Override + public TwillPreparer prepare(TwillRunnable twillRunnable) { + return null; + } + + @Override + public TwillPreparer prepare(TwillRunnable twillRunnable, ResourceSpecification resourceSpecification) { + return null; + } + + @Override + public TwillPreparer prepare(TwillApplication twillApplication) { + return null; + } + + @Override + public TwillController lookup(String s, RunId runId) { + return null; + } + + @Override + public Iterable lookup(String s) { + return null; + } + + @Override + public Iterable lookupLive() { + return null; + } + + @Override + public Cancellable scheduleSecureStoreUpdate(SecureStoreUpdater secureStoreUpdater, + long l, long l1, TimeUnit timeUnit) { + return null; + } + + @Override + public Cancellable setSecureStoreRenewer(SecureStoreRenewer secureStoreRenewer, + long l, long l1, long l2, TimeUnit timeUnit) { + return null; + } +} diff --git a/cdap-spark-core-base/src/main/java/co/cask/cdap/app/runtime/spark/DSPRMain.java b/cdap-spark-core-base/src/main/java/co/cask/cdap/app/runtime/spark/DSPRMain.java index a500c3b97512..77e8aaf9d6d1 100644 --- a/cdap-spark-core-base/src/main/java/co/cask/cdap/app/runtime/spark/DSPRMain.java +++ b/cdap-spark-core-base/src/main/java/co/cask/cdap/app/runtime/spark/DSPRMain.java @@ -27,7 +27,10 @@ import co.cask.cdap.app.runtime.ProgramOptions; import co.cask.cdap.common.app.RunIds; import co.cask.cdap.common.conf.CConfiguration; +import co.cask.cdap.common.io.Locations; import co.cask.cdap.common.lang.InstantiatorFactory; +import co.cask.cdap.common.lang.jar.BundleJarUtil; +import co.cask.cdap.common.utils.DirUtils; import co.cask.cdap.data.stream.StreamCoordinatorClient; import co.cask.cdap.internal.app.ApplicationSpecificationAdapter; import co.cask.cdap.internal.app.runtime.AbstractListener; @@ -43,6 +46,8 @@ import co.cask.cdap.proto.id.ProgramId; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.io.Files; import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Service; @@ -91,9 +96,8 @@ public class DSPRMain { + "nk-output-dir\\\",\\\"referenceName\\\":\\\"HDFSinkTest\\\",\\\"delimite" + "r\\\":\\\"|\\\"}}},{\\\"name\\\":\\\"source\\\",\\\"plugin\\\":{\\\"name\\\":\\\"Fil" + "e\\\",\\\"type\\\":\\\"batchsource\\\",\\\"properties\\\":{\\\"referenceName" - + "\\\":\\\"TestFile\\\",\\\"fileSystem\\\":\\\"Text\\\",\\\"path\\\":\\\"/Users/aa" - + "nwar/src/hydrator-plugins/core-plugins/target/junit555045997" - + "8545741420/junit5499898826057422683/test.txt\\\",\\\"format\\\":\\\"" + + "\\\":\\\"TestFile\\\",\\\"fileSystem\\\":\\\"Text\\\",\\\"path\\\":\\\"/tmp/input.txt" + + "\\\",\\\"format\\\":\\\"" + "text\\\",\\\"ignoreNonExistingFolders\\\":\\\"false\\\",\\\"pathField\\\":" + "\\\"file\\\",\\\"schema\\\":\\\"{\\\\\\\"type\\\\\\\":\\\\\\\"record\\\\\\\",\\\\\\\"name\\" + "\\\\\":\\\\\\\"file.record\\\\\\\",\\\\\\\"fields\\\\\\\":[{\\\\\\\"name\\\\\\\":\\\\\\\"of" @@ -118,9 +122,8 @@ public class DSPRMain { + "1\\\",\\\"phase\\\":{\\\"stagesByType\\\":{\\\"batchsource\\\":[{\\\"name\\\":" + "\\\"source\\\",\\\"plugin\\\":{\\\"type\\\":\\\"batchsource\\\",\\\"name\\\":\\\"F" + "ile\\\",\\\"properties\\\":{\\\"referenceName\\\":\\\"TestFile\\\",\\\"fileS" - + "ystem\\\":\\\"Text\\\",\\\"path\\\":\\\"/Users/aanwar/src/hydrator-plugi" - + "ns/core-plugins/target/junit5550459978545741420/junit5499898" - + "826057422683/test.txt\\\",\\\"format\\\":\\\"text\\\",\\\"ignoreNonExist" + + "ystem\\\":\\\"Text\\\",\\\"path\\\":\\\"/tmp/input.txt" + + "\\\",\\\"format\\\":\\\"text\\\",\\\"ignoreNonExist" + "ingFolders\\\":\\\"false\\\",\\\"pathField\\\":\\\"file\\\",\\\"schema\\\":\\\"{" + "\\\\\\\"type\\\\\\\":\\\\\\\"record\\\\\\\",\\\\\\\"name\\\\\\\":\\\\\\\"file.record\\\\\\\"" + ",\\\\\\\"fields\\\\\\\":[{\\\\\\\"name\\\\\\\":\\\\\\\"offset\\\\\\\",\\\\\\\"type\\\\\\\":\\" @@ -171,9 +174,8 @@ public class DSPRMain { + "puts\\\":[\\\"source\\\"],\\\"outputs\\\":[]},\\\"source\\\":{\\\"name\\\":\\\"s" + "ource\\\",\\\"plugin\\\":{\\\"type\\\":\\\"batchsource\\\",\\\"name\\\":\\\"File" + "\\\",\\\"properties\\\":{\\\"referenceName\\\":\\\"TestFile\\\",\\\"fileSyst" - + "em\\\":\\\"Text\\\",\\\"path\\\":\\\"/Users/aanwar/src/hydrator-plugins/" - + "core-plugins/target/junit5550459978545741420/junit5499898826" - + "057422683/test.txt\\\",\\\"format\\\":\\\"text\\\",\\\"ignoreNonExisting" + + "em\\\":\\\"Text\\\",\\\"path\\\":\\\"/tmp/input.txt" + + "\\\",\\\"format\\\":\\\"text\\\",\\\"ignoreNonExisting" + "Folders\\\":\\\"false\\\",\\\"pathField\\\":\\\"file\\\",\\\"schema\\\":\\\"{\\\\\\" + "\"type\\\\\\\":\\\\\\\"record\\\\\\\",\\\\\\\"name\\\\\\\":\\\\\\\"file.record\\\\\\\",\\\\" + "\\\"fields\\\\\\\":[{\\\\\\\"name\\\\\\\":\\\\\\\"offset\\\\\\\",\\\\\\\"type\\\\\\\":\\\\\\\"" @@ -211,9 +213,8 @@ public class DSPRMain { + "ne.spec\":\"{\\\"endingActions\\\":[],\\\"stages\\\":[{\\\"name\\\":\\\"sour" + "ce\\\",\\\"plugin\\\":{\\\"type\\\":\\\"batchsource\\\",\\\"name\\\":\\\"File\\\"," + "\\\"properties\\\":{\\\"referenceName\\\":\\\"TestFile\\\",\\\"fileSystem\\" - + "\":\\\"Text\\\",\\\"path\\\":\\\"/Users/aanwar/src/hydrator-plugins/cor" - + "e-plugins/target/junit5550459978545741420/junit5499898826057" - + "422683/test.txt\\\",\\\"format\\\":\\\"text\\\",\\\"ignoreNonExistingFol" + + "\":\\\"Text\\\",\\\"path\\\":\\\"/tmp/input.txt" + + "\\\",\\\"format\\\":\\\"text\\\",\\\"ignoreNonExistingFol" + "ders\\\":\\\"false\\\",\\\"pathField\\\":\\\"file\\\",\\\"schema\\\":\\\"{\\\\\\\"ty" + "pe\\\\\\\":\\\\\\\"record\\\\\\\",\\\\\\\"name\\\\\\\":\\\\\\\"file.record\\\\\\\",\\\\\\\"f" + "ields\\\\\\\":[{\\\\\\\"name\\\\\\\":\\\\\\\"offset\\\\\\\",\\\\\\\"type\\\\\\\":\\\\\\\"lon" @@ -372,9 +373,8 @@ public class DSPRMain { + "record\\\",\\\"name\\\":\\\"file.record\\\",\\\"fields\\\":[{\\\"name\\\":\\\"of" + "fset\\\",\\\"type\\\":\\\"long\\\"},{\\\"name\\\":\\\"body\\\",\\\"type\\\":[\\\"str" + "ing\\\",\\\"null\\\"]},{\\\"name\\\":\\\"file\\\",\\\"type\\\":[\\\"string\\\",\\\"n" - + "ull\\\"]}]}\",\"fileSystem\":\"Text\",\"path\":\"/Users/aanwar/src/hyd" - + "rator-plugins/core-plugins/target/junit5550459978545741420/j" - + "unit5499898826057422683/test.txt\",\"format\":\"text\",\"ignoreNon" + + "ull\\\"]}]}\",\"fileSystem\":\"Text\",\"path\":\"/tmp/input.txt" + + "\",\"format\":\"text\",\"ignoreNon" + "ExistingFolders\":\"false\",\"pathField\":\"file\",\"referenceName\":" + "\"TestFile\"},\"macros\":{\"lookupProperties\":[],\"macroFunctions\"" + ":[]}}}}}"; @@ -394,11 +394,27 @@ public static void main(String[] args) throws IOException, InterruptedException, Preconditions.checkArgument(jarLocation.exists()); + + + File tempDir = new File("/tmp/dsprmain_tmp"); + File programJarUnpacked = new File(tempDir, "unpacked"); + //programJarUnpacked.mkdirs(); + //try { + // File programJar = Locations.linkOrCopy(jarLocation, new File(tempDir, "program.jar")); + // // Unpack the JAR file + // BundleJarUtil.unJar(Files.newInputStreamSupplier(programJar), programJarUnpacked); + //} catch (IOException ioe) { + // throw ioe; + //} catch (Exception e) { + // // should not happen + // throw Throwables.propagate(e); + //} + ProgramDescriptor programDescriptor = new ProgramDescriptor(programId, GSON.fromJson(appSpecString, ApplicationSpecification.class)); -// See AbstractProgramRuntimeService#run + // See AbstractProgramRuntimeService#run Program program = Programs.create(cConf, sparkProgramRunner, programDescriptor, - jarLocation, new File("/tmp/spark_unpcked")); + jarLocation, programJarUnpacked); Map optionsMap = new HashMap<>(); @@ -435,6 +451,8 @@ public static void main(String[] args) throws IOException, InterruptedException, ProgramController controller = sparkProgramRunner.run(program, options); System.out.println("output: " + waitForCompletion(controller)); + + //DirUtils.deleteDirectoryContents(tempDir); } private static boolean waitForCompletion(ProgramController controller) throws InterruptedException {