diff --git a/README.md b/README.md index b43a33a70..907a633c8 100644 --- a/README.md +++ b/README.md @@ -122,3 +122,24 @@ Below is a detailed list of connector options: ## Limitations The connector supports basic Spark types. Complex types are not currently supported. + + +## Known Issues + +### Cleanup on Vertica to Spark path + +When reading from vertica, parquet files used in intermediary are not currently cleaned up. This is a temporarily disabled feature while an issue with cleanup is investigated. + +The hadoop command line tool can be used to clean up files. + +```shell +hadoop fs -rm user/release/s2v/74727063_613a_49d0_98e4_e806f5301ecf +``` + +### Old timestamps + +If using very old dates and timestamps, you may run into an error like the following: + +``` +org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability. Or set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar. +``` diff --git a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipe.scala b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipe.scala index ac773b9df..b770dfdbe 100644 --- a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipe.scala +++ b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipe.scala @@ -114,6 +114,15 @@ class VerticaDistributedFilesystemReadPipe( } private def getPartitionInfo(fileMetadata: Seq[ParquetFileMetadata], rowGroupRoom: Int): PartitionInfo = { + + /** + * TODO If true, cleanup information will be added to partitions, so nodes will perform a coordinated cleanup of + * exported parquet files. + * + * Temporarily set to false as an issue with the file cleanup system is investigated + */ + val cleanup = false + // Now, create partitions splitting up files roughly evenly var i = 0 var partitions = List[VerticaDistributedFilesystemPartition]() @@ -127,7 +136,9 @@ class VerticaDistributedFilesystemReadPipe( while(j < size){ if(i == rowGroupRoom-1){ // Reached end of partition, cut off here val rangeIdx = incrementRangeMapGetIndex(rangeCountMap, m.filename) - val frange = ParquetFileRange(m.filename, low, j, Some(rangeIdx)) + + val frange = ParquetFileRange(m.filename, low, j, if(cleanup) Some(rangeIdx) else None) + curFileRanges = curFileRanges :+ frange val partition = VerticaDistributedFilesystemPartition(curFileRanges) partitions = partitions :+ partition @@ -137,7 +148,7 @@ class VerticaDistributedFilesystemReadPipe( } else if(j == size - 1){ // Reached end of file's row groups, add to file ranges val rangeIdx = incrementRangeMapGetIndex(rangeCountMap, m.filename) - val frange = ParquetFileRange(m.filename, low, j, Some(rangeIdx)) + val frange = ParquetFileRange(m.filename, low, j, if(cleanup) Some(rangeIdx) else None) curFileRanges = curFileRanges :+ frange i += 1 } diff --git a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipeTests.scala b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipeTests.scala index 9956befb5..d646a2837 100644 --- a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipeTests.scala +++ b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemReadPipeTests.scala @@ -351,12 +351,12 @@ class VerticaDistributedFilesystemReadPipeTests extends AnyFlatSpec with BeforeA case Right(partitionInfo) => val partitions = partitionInfo.partitionSeq assert(partitions.length == partitionCount) - assert(partitions(0).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,0,3,Some(0))) - assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,4,4,Some(1))) - assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname2,0,2,Some(0))) - assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname2,3,4,Some(1))) - assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname3,0,1,Some(0))) - assert(partitions(3).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname3,2,4,Some(1))) + assert(partitions(0).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,0,3,None)) + assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,4,4,None)) + assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname2,0,2,None)) + assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname2,3,4,None)) + assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname3,0,1,None)) + assert(partitions(3).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname3,2,4,None)) } }