Skip to content

Commit

Permalink
Disable cleanup (#50)
Browse files Browse the repository at this point in the history
* Disable cleanup

* Fix tests

* Update documentation

* Resolve PR comments

* Add markdown codeblock
  • Loading branch information
alexr-bq authored Mar 23, 2021
1 parent 08c08b4 commit 4b1b0b0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,24 @@ Below is a detailed list of connector options:
## Limitations

The connector supports basic Spark types. Complex types are not currently supported.


## Known Issues

### Cleanup on Vertica to Spark path

When reading from vertica, parquet files used in intermediary are not currently cleaned up. This is a temporarily disabled feature while an issue with cleanup is investigated.

The hadoop command line tool can be used to clean up files.

```shell
hadoop fs -rm user/release/s2v/74727063_613a_49d0_98e4_e806f5301ecf
```

### Old timestamps

If using very old dates and timestamps, you may run into an error like the following:

```
org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability. Or set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.
```
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ class VerticaDistributedFilesystemReadPipe(
}

private def getPartitionInfo(fileMetadata: Seq[ParquetFileMetadata], rowGroupRoom: Int): PartitionInfo = {

/**
* TODO If true, cleanup information will be added to partitions, so nodes will perform a coordinated cleanup of
* exported parquet files.
*
* Temporarily set to false as an issue with the file cleanup system is investigated
*/
val cleanup = false

// Now, create partitions splitting up files roughly evenly
var i = 0
var partitions = List[VerticaDistributedFilesystemPartition]()
Expand All @@ -127,7 +136,9 @@ class VerticaDistributedFilesystemReadPipe(
while(j < size){
if(i == rowGroupRoom-1){ // Reached end of partition, cut off here
val rangeIdx = incrementRangeMapGetIndex(rangeCountMap, m.filename)
val frange = ParquetFileRange(m.filename, low, j, Some(rangeIdx))

val frange = ParquetFileRange(m.filename, low, j, if(cleanup) Some(rangeIdx) else None)

curFileRanges = curFileRanges :+ frange
val partition = VerticaDistributedFilesystemPartition(curFileRanges)
partitions = partitions :+ partition
Expand All @@ -137,7 +148,7 @@ class VerticaDistributedFilesystemReadPipe(
}
else if(j == size - 1){ // Reached end of file's row groups, add to file ranges
val rangeIdx = incrementRangeMapGetIndex(rangeCountMap, m.filename)
val frange = ParquetFileRange(m.filename, low, j, Some(rangeIdx))
val frange = ParquetFileRange(m.filename, low, j, if(cleanup) Some(rangeIdx) else None)
curFileRanges = curFileRanges :+ frange
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,12 @@ class VerticaDistributedFilesystemReadPipeTests extends AnyFlatSpec with BeforeA
case Right(partitionInfo) =>
val partitions = partitionInfo.partitionSeq
assert(partitions.length == partitionCount)
assert(partitions(0).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,0,3,Some(0)))
assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,4,4,Some(1)))
assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname2,0,2,Some(0)))
assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname2,3,4,Some(1)))
assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname3,0,1,Some(0)))
assert(partitions(3).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname3,2,4,Some(1)))
assert(partitions(0).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,0,3,None))
assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname1,4,4,None))
assert(partitions(1).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname2,0,2,None))
assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname2,3,4,None))
assert(partitions(2).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(1) == ParquetFileRange(fname3,0,1,None))
assert(partitions(3).asInstanceOf[VerticaDistributedFilesystemPartition].fileRanges(0) == ParquetFileRange(fname3,2,4,None))
}
}

Expand Down

0 comments on commit 4b1b0b0

Please sign in to comment.