Skip to content

Commit

Permalink
[Core][VL] Add random parquet data generator and ShuffleWriterFuzzerT…
Browse files Browse the repository at this point in the history
…est (#3584)
  • Loading branch information
marin-ma authored Jan 15, 2024
1 parent 9b32548 commit eb35adb
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 0 deletions.
7 changes: 7 additions & 0 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@
<version>1.3.0</version>
<scope>compile</scope>
</dependency>
<!-- Java Faker for generating random data -->
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
29 changes: 29 additions & 0 deletions backends-velox/src/test/java/io/glutenproject/tags/FuzzerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.tags;

import org.scalatest.TagAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@TagAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface FuzzerTest {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.benchmarks

import io.glutenproject.benchmarks.ShuffleWriterFuzzerTest.{Failed, OOM, Successful, TestResult}
import io.glutenproject.execution.VeloxWholeStageTransformerSuite
import io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget
import io.glutenproject.tags.{FuzzerTest, SkipTestTags}

import org.apache.spark.SparkConf

object ShuffleWriterFuzzerTest {
trait TestResult {
val seed: Long

def getSeed: Long = seed
}
case class Successful(seed: Long) extends TestResult
case class Failed(seed: Long) extends TestResult
case class OOM(seed: Long) extends TestResult
}

@FuzzerTest
@SkipTestTags
class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite {
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

private val dataGenerator = RandomParquetDataGenerator(System.currentTimeMillis())
private val outputPath = getClass.getResource("/").getPath + "fuzzer_output.parquet"

private val REPARTITION_SQL = (numPartitions: Int) =>
s"select /*+ REPARTITION($numPartitions) */ * from tbl"
private val AGG_REPARTITION_SQL =
"""select count(*) as cnt, f_1, f_2, f_3, f_4, f_5, f_6
|from tbl group by f_1, f_2, f_3, f_4, f_5, f_6
|order by cnt, f_1, f_2, f_3, f_4, f_5, f_6""".stripMargin

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.plugins", "io.glutenproject.GlutenPlugin")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "512MB")
.set("spark.driver.memory", "4g")
.set("spark.driver.maxResultSize", "4g")
.set("spark.gluten.sql.debug", "true")
.set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0")
}

def getRootCause(e: Throwable): Throwable = {
if (e.getCause == null) {
return e
}
getRootCause(e.getCause)
}

def executeQuery(sql: String): TestResult = {
try {
System.gc()
dataGenerator.generateRandomData(spark, outputPath)
spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl")
runQueryAndCompare(sql, true, false)(_ => {})
Successful(dataGenerator.getSeed)
} catch {
case oom: ThrowOnOomMemoryTarget.OutOfMemoryException =>
logError(s"Out of memory while running test with seed: ${dataGenerator.getSeed}", oom)
OOM(dataGenerator.getSeed)
case t: Throwable =>
if (
getRootCause(t).getMessage.contains(
classOf[ThrowOnOomMemoryTarget.OutOfMemoryException].getName)
) {
logError(s"Out of memory while running test with seed: ${dataGenerator.getSeed}", t)
OOM(dataGenerator.getSeed)
} else {
logError(s"Failed to run test with seed: ${dataGenerator.getSeed}", t)
Failed(dataGenerator.getSeed)
}
}
}

def repeatQuery(sql: String, iterations: Int, testName: String): Unit = {
val result = (0 until iterations)
.map {
i =>
logWarning(
s"==============================> " +
s"Started iteration $i (seed: ${dataGenerator.getSeed})")
val result = executeQuery(sql)
dataGenerator.reFake(System.currentTimeMillis())
result
}
val oom = result.filter(_.isInstanceOf[OOM]).map(_.getSeed)
if (oom.nonEmpty) {
logError(s"Out of memory while running test '$testName' with seed: ${oom.mkString(", ")}")
}
val failed = result.filter(_.isInstanceOf[Failed]).map(_.getSeed)
assert(failed.isEmpty, s"Failed to run test '$testName' with seed: ${failed.mkString(",")}")
}

private val REPARTITION_TEST_NAME = (numPartitions: Int) => s"repartition - $numPartitions"
for (numPartitions <- Seq(1, 3, 10, 100, 1000, 4000, 8000)) {
val testName = REPARTITION_TEST_NAME(numPartitions)
test(testName) {
repeatQuery(REPARTITION_SQL(numPartitions), 10, testName)
}
}

private val AGG_TEST_NAME = "with aggregation"
ignore(AGG_TEST_NAME) {
repeatQuery(AGG_REPARTITION_SQL, 10, AGG_TEST_NAME)
}

ignore("reproduce") {
// Replace sql with the actual failed sql.
val sql = REPARTITION_SQL(100)
// Replace seed '0L' with the actual failed seed.
Seq(0L).foreach {
seed =>
dataGenerator.reFake(seed)
logWarning(
s"==============================> " +
s"Started reproduction (seed: ${dataGenerator.getSeed})")
val result = executeQuery(sql)
assert(result.isInstanceOf[Successful], s"Failed to run 'reproduce' with seed: $seed")
}
}
}
7 changes: 7 additions & 0 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
<!-- Java Faker for generating random data -->
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.benchmarks

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

import com.github.javafaker.Faker

import java.sql.Date
import java.util.Random

case class RandomParquetDataGenerator(initialSeed: Long = 0L) {
private var seed: Long = initialSeed
private var faker = new Faker(new Random(seed))

def reFake(newSeed: Long): Unit = {
seed = newSeed
faker = new Faker(new Random(seed))
}

def getSeed: Long = {
seed
}

def getFaker: Faker = {
faker
}

def generateRow(schema: StructType, probabilityOfNull: Double = 0): Row = {
val values = schema.fields.map(field => generateDataForType(field.dataType, probabilityOfNull))
Row.fromSeq(values)
}

def generateDataForType(dataType: DataType, probabilityOfNull: Double): Any = {
require(
probabilityOfNull >= 0 && probabilityOfNull <= 1,
"Probability should be between 0 and 1")

if (faker.random().nextDouble() < probabilityOfNull) {
return null
}

dataType match {
case BooleanType => faker.bool().bool()
case ByteType => faker.number().numberBetween(Byte.MinValue, Byte.MaxValue).toByte
case ShortType => faker.number().numberBetween(Short.MinValue, Short.MaxValue).toShort
case IntegerType => faker.number().numberBetween(Int.MinValue, Int.MaxValue)
case LongType => faker.number().numberBetween(Long.MinValue, Long.MaxValue)
case FloatType =>
faker.number().randomDouble(2, Float.MinValue.toInt, Float.MaxValue.toInt).toFloat
case DoubleType =>
faker.number().randomDouble(2, Double.MinValue.toLong, Double.MaxValue.toLong)
case DateType => new Date(faker.date().birthday().getTime)
// case TimestampType => new Timestamp(faker.date().birthday().getTime)
case t: DecimalType =>
BigDecimal(
faker.number().randomDouble(t.scale, 0, Math.pow(10, t.precision - t.scale).toLong))
case StringType => faker.lorem().characters(0, 1000)
case BinaryType => faker.lorem().characters(10).getBytes
case ArrayType(elementType, _) =>
Seq.fill(faker.number().numberBetween(1, 5))(
generateDataForType(elementType, probabilityOfNull))
case MapType(keyType, valueType, _) =>
Map(generateDataForType(keyType, 0) -> generateDataForType(valueType, probabilityOfNull))
case struct: StructType => generateRow(struct)
case _ =>
throw new UnsupportedOperationException(
s"Data generation not supported for type: $dataType")
}
}

def generateRandomData(
spark: SparkSession,
schema: StructType,
numRows: Int,
outputPath: String): Unit = {
val data = (0 until numRows).map(_ => generateRow(schema, faker.random().nextDouble()))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.coalesce(1)
.write
.mode("overwrite")
.parquet(outputPath)
}

def generateRandomData(spark: SparkSession, outputPath: String): Unit = {
val schema = generateRandomSchema()
val numRows = faker.random().nextInt(1000, 300000)
generateRandomData(spark, schema, numRows, outputPath)
}

var fieldIndex = 0
def fieldName: String = {
fieldIndex += 1
s"f_$fieldIndex"
}

// Candidate fields
val numericFields: List[() => StructField] = List(
() => StructField(fieldName, BooleanType, nullable = true),
() => StructField(fieldName, ByteType, nullable = true),
() => StructField(fieldName, ShortType, nullable = true),
() => StructField(fieldName, IntegerType, nullable = true),
() => StructField(fieldName, LongType, nullable = true),
() => StructField(fieldName, FloatType, nullable = true),
() => StructField(fieldName, DoubleType, nullable = true),
() => StructField(fieldName, DateType, nullable = true),
// () => StructField(fieldName, TimestampType, nullable = true),
() => StructField(fieldName, DecimalType(10, 2), nullable = true)
)

val binaryFields: List[() => StructField] = List(
() => StructField(fieldName, StringType, nullable = true),
() => StructField(fieldName, BinaryType, nullable = true)
)

val complexFields: List[() => StructField] = List(
() => StructField(fieldName, ArrayType(StringType, containsNull = true), nullable = true),
() =>
StructField(
fieldName,
MapType(StringType, IntegerType, valueContainsNull = true),
nullable = true),
() =>
StructField(
fieldName,
StructType(
Seq(
StructField(fieldName, StringType, nullable = true),
StructField(fieldName, DoubleType, nullable = true)
)),
nullable = true)
)

val candidateFields: List[() => StructField] =
numericFields ++ binaryFields ++ complexFields

// Function to generate random schema with n fields
def generateRandomSchema(n: Int): StructType = {
fieldIndex = 0
val selectedFields = {
(0 until 3).map(_ => numericFields(faker.random().nextInt(numericFields.length))()) ++
(0 until 3).map(_ => binaryFields(faker.random().nextInt(binaryFields.length))()) ++
(0 until Math.max(0, n - 6))
.map(_ => candidateFields(faker.random().nextInt(candidateFields.length))())
}
StructType(selectedFields)
}

// Generate random schema with [10, 30) fields
def generateRandomSchema(): StructType = {
generateRandomSchema(faker.random().nextInt(4, 24))
}
}

// An example to demonstrate how to use RandomParquetDataGenerator to generate input data.
object RandomParquetDataGenerator {
def main(args: Array[String]): Unit = {
val spark =
SparkSession.builder().master("local[1]").appName("Random Data Generator").getOrCreate()

val seed: Long = 0L
val outputPath = s"${seed}_output.parquet"

RandomParquetDataGenerator(seed).generateRandomData(spark, outputPath)
}
}

0 comments on commit eb35adb

Please sign in to comment.