Skip to content

Commit

Permalink
Fixing the nightly build (GoogleCloudDataproc#1106)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrabinowitz authored Oct 25, 2023
1 parent d23d7ac commit 7a38afa
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -46,13 +45,9 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructType;
import scala.collection.mutable.ListBuffer;

/** Spark related utilities */
public class SparkBigQueryUtil {
Expand Down Expand Up @@ -286,20 +281,4 @@ public static ImmutableMap<String, String> extractJobLabels(SparkConf sparkConf)
.ifPresent(tag -> labels.put("dataproc_job_uuid", tag.substring(tag.lastIndexOf('_') + 1)));
return labels.build();
}

public static List<AttributeReference> schemaToAttributeReferences(StructType schema) {
List<AttributeReference> result =
Stream.of(schema.fields())
.map(
field ->
new AttributeReference(
field.name(),
field.dataType(),
field.nullable(),
field.metadata(),
NamedExpression.newExprId(),
new ListBuffer<String>().toStream()))
.collect(Collectors.toList());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
*/
package org.apache.spark.sql;

import com.google.cloud.spark.bigquery.SparkBigQueryUtil;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConverters;
import scala.collection.mutable.ListBuffer;

public class Scala213SparkSqlUtils extends SparkSqlUtils {

Expand Down Expand Up @@ -59,7 +61,18 @@ public ExpressionEncoder<Row> createExpressionEncoder(StructType schema) {
// `toAttributes` is protected[sql] starting spark 3.2.0, so we need this call to be in the same
// package. Since Scala 2.13/Spark 3.3 forbids it, the implementation has been ported to Java
public static scala.collection.immutable.Seq<AttributeReference> toAttributes(StructType schema) {
return JavaConverters.asScalaBuffer(SparkBigQueryUtil.schemaToAttributeReferences(schema))
return JavaConverters.asScalaBuffer(
Stream.of(schema.fields())
.map(
field ->
new AttributeReference(
field.name(),
field.dataType(),
field.nullable(),
field.metadata(),
NamedExpression.newExprId(),
new ListBuffer<String>().toStream()))
.collect(Collectors.toList()))
.toSeq();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
*/
package org.apache.spark.sql;

import com.google.cloud.spark.bigquery.SparkBigQueryUtil;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConverters;
import scala.collection.mutable.ListBuffer;

public class PreScala213SparkSqlUtils extends SparkSqlUtils {

Expand Down Expand Up @@ -57,8 +59,19 @@ public ExpressionEncoder<Row> createExpressionEncoder(StructType schema) {

// `toAttributes` is protected[sql] starting spark 3.2.0, so we need this call to be in the same
// package. Since Scala 2.13/Spark 3.3 forbids it, the implementation has been ported to Java
public static scala.collection.Seq<AttributeReference> toAttributes(StructType schema) {
return JavaConverters.asScalaBuffer(SparkBigQueryUtil.schemaToAttributeReferences(schema))
public static scala.collection.Seq<AttributeReference> toAttributes(StructType schema212) {
return JavaConverters.asScalaBuffer(
Stream.of(schema212.fields())
.map(
field212 ->
new AttributeReference(
field212.name(),
field212.dataType(),
field212.nullable(),
field212.metadata(),
NamedExpression.newExprId(),
new ListBuffer<String>().toStream()))
.collect(Collectors.toList()))
.toSeq();
}
}

0 comments on commit 7a38afa

Please sign in to comment.