Skip to content

Commit

Permalink
https://github.com/gbif/pipelines/issues/1078
Browse files Browse the repository at this point in the history
  • Loading branch information
fmendezh committed Aug 16, 2024
1 parent 9214625 commit c01a9bc
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package org.gbif.occurrence.table.backfill;

import org.apache.avro.Schema;
import org.gbif.occurrence.download.hive.ExtensionTable;
import org.gbif.occurrence.download.hive.OccurrenceAvroHdfsTableDefinition;
import org.gbif.occurrence.download.hive.OccurrenceHDFSTableDefinition;
import org.gbif.occurrence.spark.udf.UDFS;

Expand Down Expand Up @@ -106,17 +108,14 @@ private SparkSession createSparkSession() {
.enableHiveSupport()
.config("spark.sql.catalog.iceberg.type", "hive")
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.defaultCatalog", "iceberg");
.config("spark.sql.defaultCatalog", "iceberg")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");

if (configuration.getHiveThriftAddress() != null) {
sparkBuilder
.config("hive.metastore.uris", configuration.getHiveThriftAddress())
.config("spark.sql.warehouse.dir", configuration.getWarehouseLocation());
}

if (configuration.isUsePartitionedTable()) {
sparkBuilder.config("spark.sql.sources.partitionOverwriteMode", "dynamic");
}
return sparkBuilder.getOrCreate();
}

Expand All @@ -126,8 +125,10 @@ private void createTableUsingSpark(SparkSession spark) {
fromAvroToTable(
spark,
getSnapshotPath(configuration.getCoreName()), // FROM
selectFromAvro(), // SELECT
configuration.getTableNameWithPrefix() // INSERT OVERWRITE INTO
configuration.getAvroTableName(),
occurrenceTableFields(), // SELECT
configuration.getTableNameWithPrefix(), // INSERT OVERWRITE INTO
OccurrenceAvroHdfsTableDefinition.avroDefinition()
);
}

Expand Down Expand Up @@ -235,37 +236,44 @@ private static boolean isDirectoryEmpty(String fromSourceDir, SparkSession spark
}

private void fromAvroToTable(
SparkSession spark, String fromSourceDir, Column[] select, String saveToTable) {
SparkSession spark, String fromSourceDir, String avroTableName, String selectFields, String saveToTable, Schema schema) {
if (!isDirectoryEmpty(fromSourceDir, spark)) {
if (configuration.isUsePartitionedTable()) {
spark.sql(" set hive.exec.dynamic.partition.mode=nonstrict");
}
Dataset<Row> input =
spark.read().format("avro").load(fromSourceDir + "/*.avro").select(select);

if (configuration.getTablePartitions() != null
&& input.rdd().getNumPartitions() > configuration.getTablePartitions()) {
log.info("Setting partitions options {}", configuration.getTablePartitions());
input =
input
.withColumn(
"_salted_key",
col("gbifid").cast(DataTypes.LongType).mod(configuration.getTablePartitions()))
.repartition(configuration.getTablePartitions())
.drop("_salted_key");
}

input.writeTo(saveToTable).createOrReplace();
createSourceAvroTable(spark, avroTableName, schema, fromSourceDir);
spark.sql(insertOverWrite(saveToTable, selectFields, avroTableName));
}
}

private String occurrenceTableFields() {
return OccurrenceHDFSTableDefinition.definition().stream()
// Excluding partitioned columns
.filter(field -> configuration.isUsePartitionedTable() && !field.getHiveField().equalsIgnoreCase("datasetkey"))
.map(field -> field.getHiveField() + " " + field.getHiveDataType())
.collect(Collectors.joining(", "));
}
private void createSourceAvroTable(SparkSession spark, String tableName, Schema schema, String location) {
// Create Hive Table if it doesn't exist
spark.sql("DROP TABLE IF EXISTS " + tableName);
spark.sql("CREATE EXTERNAL TABLE " + tableName + " USING avro " +
"OPTIONS( 'format' = 'avro', 'schema' = '" + schema.toString(true) + "') " +
"LOCATION '" + location + "' TBLPROPERTIES('iceberg.catalog'='location_based_table')");
}

private String insertOverWrite(String targetTableName, String selectFields, String sourceTable) {
return "INSERT OVERWRITE TABLE " + targetTableName +
(Strings.isNullOrEmpty(configuration.getDatasetKey())? " PARTITION (datasetkey = '" + configuration.getDatasetKey() + "') " : " ") +
"SELECT " + selectFields + " FROM " + sourceTable;
}

private void createExtensionTable(SparkSession spark, ExtensionTable extensionTable) {
spark.sql(
configuration.isUsePartitionedTable()
? createExtensionExternalTable(extensionTable)
: createExtensionTable(extensionTable));

List<Column> columns =
String select =
extensionTable.getFields().stream()
.filter(
field ->
Expand All @@ -279,24 +287,25 @@ private void createExtensionTable(SparkSession spark, ExtensionTable extensionTa
col(
field.substring(
field.indexOf('(') + 1, field.lastIndexOf(')'))))
.alias(field.substring(field.indexOf('(') + 1, field.lastIndexOf(')')))
: col(field))
.collect(Collectors.toList());
.alias(field.substring(field.indexOf('(') + 1, field.lastIndexOf(')'))).toString()
: col(field).toString())
.collect(Collectors.joining(","));

// Partitioned columns must be at the end
if (configuration.isUsePartitionedTable()) {
columns.add(col("datasetkey"));
}
fromAvroToTable(
spark,
getSnapshotPath(extensionTable.getDirectoryTableName()), // FROM sourceDir
columns.toArray(new Column[] {}), // SELECT
extensionTableName(extensionTable)); // INSERT OVERWRITE INTO
extensionAvroTableName(extensionTable),
select, // SELECT
extensionTableName(extensionTable),
extensionTable.getSchema()); // INSERT OVERWRITE INTO
}

private String extensionTableName(ExtensionTable extensionTable) {
return String.format(
"%s_ext_%s", configuration.getTableName(), extensionTable.getHiveTableName());
return configuration.getTableName() + "_ext_" + extensionTable.getHiveTableName();
}

private String extensionAvroTableName(ExtensionTable extensionTable) {
return configuration.getTableName() + "_ext" + extensionTable.getHiveTableName() + "_avro";
}

private String getPrefix() {
Expand All @@ -313,7 +322,7 @@ private String createExtensionTable(ExtensionTable extensionTable) {
.map(f -> f.name() + " STRING")
.collect(Collectors.joining(",\n"))
+ ')'
+ "STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"GZIP\")\n",
+ "STORED AS PARQUET TBLPROPERTIES ('parquet.compression'='GZIP')\n",
getPrefix() + extensionTableName(extensionTable));
}

Expand All @@ -327,8 +336,7 @@ private String createExtensionExternalTable(ExtensionTable extensionTable) {
.collect(Collectors.joining(",\n"))
+ ')'
+ "PARTITIONED BY(datasetkey STRING) "
+ "LOCATION '%s'"
+ "STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"GZIP\")\n",
+ "TBLPROPERTIES ('iceberg.catalog'='location_based_table')\n",
getPrefix() + extensionTableName(extensionTable),
Paths.get(configuration.getTargetDirectory(), extensionTable.getHiveTableName()));
}
Expand Down Expand Up @@ -356,7 +364,7 @@ public String createIfNotExistsGbifMultimedia() {
+ "(gbifid STRING, type STRING, format STRING, identifier STRING, references STRING, title STRING, description STRING,\n"
+ "source STRING, audience STRING, created STRING, creator STRING, contributor STRING,\n"
+ "publisher STRING, license STRING, rightsHolder STRING) \n"
+ "STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"GZIP\")",
+ "STORED AS PARQUET TBLPROPERTIES ('parquet.compression'='GZIP')",
getPrefix() + multimediaTableName());
}

Expand All @@ -365,54 +373,55 @@ private String multimediaTableName() {
}

public void insertOverwriteMultimediaTable(SparkSession spark) {
spark
.table("occurrence")
.select(
col("gbifid"),
from_json(
col("ext_multimedia"),
new ArrayType(
new StructType()
.add("type", "string", false)
.add("format", "string", false)
.add("identifier", "string", false)
.add("references", "string", false)
.add("title", "string", false)
.add("description", "string", false)
.add("source", "string", false)
.add("audience", "string", false)
.add("created", "string", false)
.add("creator", "string", false)
.add("contributor", "string", false)
.add("publisher", "string", false)
.add("license", "string", false)
.add("rightsHolder", "string", false),
true))
.alias("mm_record"))
.select(col("gbifid"), explode(col("mm_record")).alias("mm_record"))
.select(
col("gbifid"),
callUDF("cleanDelimiters", col("mm_record.type")).alias("type"),
callUDF("cleanDelimiters", col("mm_record.format")).alias("format"),
callUDF("cleanDelimiters", col("mm_record.identifier")).alias("identifier"),
callUDF("cleanDelimiters", col("mm_record.references")).alias("references"),
callUDF("cleanDelimiters", col("mm_record.title")).alias("title"),
callUDF("cleanDelimiters", col("mm_record.description")).alias("description"),
callUDF("cleanDelimiters", col("mm_record.source")).alias("source"),
callUDF("cleanDelimiters", col("mm_record.audience")).alias("audience"),
col("mm_record.created").alias("created"),
callUDF("cleanDelimiters", col("mm_record.creator")).alias("creator"),
callUDF("cleanDelimiters", col("mm_record.contributor")).alias("contributor"),
callUDF("cleanDelimiters", col("mm_record.publisher")).alias("publisher"),
callUDF("cleanDelimiters", col("mm_record.license")).alias("license"),
callUDF("cleanDelimiters", col("mm_record.rightsHolder")).alias("rightsHolder"))
.registerTempTable("mm_records");

spark.sql(
String.format(
"INSERT OVERWRITE TABLE %1$s_multimedia \n"
+ "SELECT gbifid, type, format, identifier, references, title, description, source, audience, created, creator, contributor, publisher, license, rightsHolder FROM mm_records",
configuration.getTableName()));
Dataset<Row> mmRecords = spark
.table(configuration.getTableName())
.select(
col("gbifid"),
from_json(
col("ext_multimedia"),
new ArrayType(
new StructType()
.add("type", "string", false)
.add("format", "string", false)
.add("identifier", "string", false)
.add("references", "string", false)
.add("title", "string", false)
.add("description", "string", false)
.add("source", "string", false)
.add("audience", "string", false)
.add("created", "string", false)
.add("creator", "string", false)
.add("contributor", "string", false)
.add("publisher", "string", false)
.add("license", "string", false)
.add("rightsHolder", "string", false),
true))
.alias("mm_record"))
.select(col("gbifid"), explode(col("mm_record")).alias("mm_record"))
.select(
col("gbifid"),
callUDF("cleanDelimiters", col("mm_record.type")).alias("type"),
callUDF("cleanDelimiters", col("mm_record.format")).alias("format"),
callUDF("cleanDelimiters", col("mm_record.identifier")).alias("identifier"),
callUDF("cleanDelimiters", col("mm_record.references")).alias("references"),
callUDF("cleanDelimiters", col("mm_record.title")).alias("title"),
callUDF("cleanDelimiters", col("mm_record.description")).alias("description"),
callUDF("cleanDelimiters", col("mm_record.source")).alias("source"),
callUDF("cleanDelimiters", col("mm_record.audience")).alias("audience"),
col("mm_record.created").alias("created"),
callUDF("cleanDelimiters", col("mm_record.creator")).alias("creator"),
callUDF("cleanDelimiters", col("mm_record.contributor")).alias("contributor"),
callUDF("cleanDelimiters", col("mm_record.publisher")).alias("publisher"),
callUDF("cleanDelimiters", col("mm_record.license")).alias("license"),
callUDF("cleanDelimiters", col("mm_record.rightsHolder")).alias("rightsHolder"));
if (configuration.getDatasetKey() != null) {
mmRecords = mmRecords.where("datasetkey = " + configuration.getDatasetKey());
}
mmRecords.createOrReplaceTempView("mm_records");

spark.sql("INSERT OVERWRITE TABLE " + configuration.getTableName() + "_multimedia \n" +
(Strings.isNullOrEmpty(configuration.getDatasetKey())? " PARTITION (datasetkey = '" + configuration.getDatasetKey() + "') " : " ") +
"SELECT gbifid, type, format, identifier, references, title, description, source, audience, created, creator, contributor, publisher, license, rightsHolder FROM mm_records");
}

private String createTableIfNotExists() {
Expand All @@ -427,13 +436,13 @@ private String createParquetTableIfNotExists() {
+ OccurrenceHDFSTableDefinition.definition().stream()
.map(field -> field.getHiveField() + " " + field.getHiveDataType())
.collect(Collectors.joining(", \n"))
+ ") STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"SNAPPY\")",
+ ") STORED AS PARQUET TBLPROPERTIES ('parquet.compression'='SNAPPY')",
configuration.getTableNameWithPrefix());
}

private String createPartitionedTableIfNotExists() {
return String.format(
"CREATE EXTERNAL TABLE IF NOT EXISTS %s ("
"CREATE TABLE IF NOT EXISTS %s ("
+ OccurrenceHDFSTableDefinition.definition().stream()
.filter(
field ->
Expand All @@ -443,46 +452,9 @@ private String createPartitionedTableIfNotExists() {
.equalsIgnoreCase("datasetkey")) // Excluding partitioned columns
.map(field -> field.getHiveField() + " " + field.getHiveDataType())
.collect(Collectors.joining(", "))
+ ") "
+ "PARTITIONED BY(datasetkey STRING) "
+ "STORED AS PARQUET "
+ "LOCATION '%s'"
+ "TBLPROPERTIES (\"parquet.compression\"=\"GZIP\", \"auto.purge\"=\"true\")",
configuration.getTableNameWithPrefix(),
Paths.get(configuration.getTargetDirectory(), configuration.getCoreName().toLowerCase()));
}

private Column[] selectFromAvro() {
List<Column> columns =
OccurrenceHDFSTableDefinition.definition().stream()
.filter(
field ->
!configuration.isUsePartitionedTable()
|| !field
.getHiveField()
.equalsIgnoreCase(
"datasetkey")) // Partitioned columns must be at the end
.map(
field ->
field.getInitializer().equals(field.getHiveField())
? col(field.getHiveField())
: callUDF(
field
.getInitializer()
.substring(0, field.getInitializer().indexOf("(")),
col(field.getHiveField()))
.alias(field.getHiveField()))
.collect(Collectors.toList());

// Partitioned columns must be at the end
if (configuration.isUsePartitionedTable()) {
columns.add(col("datasetkey"));
}
Column[] selectColumns = columns.toArray(new Column[] {});
log.info(
"Selecting columns from Avro {}",
columns.stream().map(Column::toString).collect(Collectors.joining(", ")));
return selectColumns;
+ ") PARTITIONED BY(datasetkey STRING) USING iceberg "
+ "TBLPROPERTIES ('parquet.compression'='GZIP', 'auto.purge'='true')",
configuration.getTableNameWithPrefix());
}

private void swapTables(Command command, SparkSession spark) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,16 @@ public JsonPOJOBuilder.Value findPOJOBuilderConfig(AnnotatedClass ac) {

private String prefixTable;

private String datasetKey;

public String getTableNameWithPrefix() {
return Strings.isNullOrEmpty(prefixTable) ? tableName : prefixTable + "_" + tableName;
String datasetKeyPostFix = Strings.isNullOrEmpty(datasetKey)? "" : '_' + datasetKey;
String tableNameWithPrefix = Strings.isNullOrEmpty(prefixTable) ? tableName : prefixTable + "_" + tableName;
return tableNameWithPrefix + datasetKeyPostFix;
}

public String getAvroTableName() {
return getTableNameWithPrefix() + "_avro";
}

@Nullable private final String warehouseLocation;
Expand Down

0 comments on commit c01a9bc

Please sign in to comment.