Skip to content

Commit

Permalink
spark sumbit not able to find resources
Browse files Browse the repository at this point in the history
  • Loading branch information
sahusanket committed Aug 8, 2023
1 parent e842370 commit eccc52e
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.cdap.cdap.etl.spark.function.JoinOnFunction;
import io.cdap.cdap.etl.spark.function.PluginFunctionContext;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.SQLContext;
Expand All @@ -74,6 +75,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -196,7 +198,9 @@ public void run(DatasetContext context) throws Exception {
BatchPhaseSpec phaseSpec = GSON.fromJson(sec.getSpecification().getProperty(Constants.PIPELINEID),
BatchPhaseSpec.class);

Path configFile = sec.getLocalizationContext().getLocalFile("HydratorSpark.config").toPath();
Path configFile = Paths.get(SparkFiles.get("HydratorSpark.config"));

// Path configFile = sec.getLocalizationContext().getLocalFile("HydratorSpark.config").toPath();
try (BufferedReader reader = Files.newBufferedReader(configFile, StandardCharsets.UTF_8)) {
String object = reader.readLine();
SparkBatchSourceSinkFactoryInfo sourceSinkInfo = GSON.fromJson(object, SparkBatchSourceSinkFactoryInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public Optional<RuntimeJobManager> getRuntimeJobManager(ProvisionerContext conte
conf.getGcsBucket() != null ? conf.getGcsBucket() : properties.get(DataprocUtils.BUCKET);
//TODO figure out bucket usage

Map<String, String> systemLabels = getSystemLabels();
Map<String, String> systemLabels = getCommonDataprocLabels(context);
LOG.warn(" SANKET : in : getRuntimeJobManager 4: bucket : {} ",bucket );
bucket = "serverlessdataproc" ; //TODO HARDCODED
return Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception {
String errorMessage = String.format("Error while launching job %s on cluster %s.",
getJobId(runInfo), clusterName);
// delete all uploaded gcs files in case of exception
DataprocUtils.deleteGCSPath(getStorageClient(), bucket, runRootPath);
DataprocUtils.deleteGcsPath(getStorageClient(), bucket, runRootPath);
DataprocUtils.emitMetric(provisionerContext, region,
"provisioner.submitJob.response.count", e);
// ResourceExhaustedException indicates Dataproc agent running on master node isn't emitting heartbeat.
Expand Down Expand Up @@ -801,6 +801,7 @@ public static Map<String, String> getProperties(RuntimeJobInfo runtimeJobInfo) {
// dataproc.performance.metrics.listener.enabled
// spark.extraListeners -> com.google.cloud.spark.performance.DataprocMetricsListener
properties.put("spark.extraListeners","");
properties.put("spark.dynamicAllocation.minExecutors","2");

return properties;
}
Expand Down

0 comments on commit eccc52e

Please sign in to comment.