diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 41476d96beab..412b315d6e66 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -450,10 +450,10 @@ std::unique_ptr parseSerdeParameters( void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, - const std::shared_ptr& hiveConfig, + const std::shared_ptr& hiveConfig, const Config* sessionProperties, - const std::shared_ptr& hiveTableHandle, - const std::shared_ptr& hiveSplit) { + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& hiveSplit) { configureReaderOptions( readerOptions, hiveConfig, @@ -465,10 +465,10 @@ void configureReaderOptions( void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, - const std::shared_ptr& hiveConfig, + const std::shared_ptr& hiveConfig, const Config* sessionProperties, const RowTypePtr& fileSchema, - const std::shared_ptr& hiveSplit, + const std::shared_ptr& hiveSplit, const std::unordered_map& tableParameters) { readerOptions.setLoadQuantum(hiveConfig->loadQuantum()); readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes()); @@ -502,10 +502,10 @@ void configureReaderOptions( void configureRowReaderOptions( dwio::common::RowReaderOptions& rowReaderOptions, const std::unordered_map& tableParameters, - const std::shared_ptr& scanSpec, - const std::shared_ptr& metadataFilter, + std::shared_ptr scanSpec, + std::shared_ptr metadataFilter, const RowTypePtr& rowType, - const std::shared_ptr& hiveSplit) { + std::shared_ptr hiveSplit) { auto skipRowsIt = tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount); if (skipRowsIt != tableParameters.end()) { @@ -569,15 +569,13 @@ bool applyPartitionFilter( } // namespace bool testFilters( - const common::ScanSpec* scanSpec, - const dwio::common::Reader* reader, + common::ScanSpec* scanSpec, + dwio::common::Reader* reader, const std::string& filePath, const std::unordered_map>& partitionKey, - const std::unordered_map>& + std::unordered_map>* partitionKeysHandle) { - VELOX_CHECK_EQ(partitionKey.size(), partitionKeysHandle.size()); - auto totalRows = reader->numberOfRows(); const auto& fileTypeWithId = reader->typeWithId(); const auto& rowType = reader->rowType(); @@ -588,11 +586,8 @@ bool testFilters( // If missing column is partition key. auto iter = partitionKey.find(name); if (iter != partitionKey.end() && iter->second.has_value()) { - auto handlesIter = partitionKeysHandle.find(name); - VELOX_CHECK(handlesIter != partitionKeysHandle.end()); - return applyPartitionFilter( - handlesIter->second->dataType()->kind(), + (*partitionKeysHandle)[name]->dataType()->kind(), iter->second.value(), child->filter()); } diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 2742866d516b..4c5fe743966f 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -61,34 +61,34 @@ std::shared_ptr makeScanSpec( void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, - const std::shared_ptr& config, + const std::shared_ptr& config, const Config* sessionProperties, - const std::shared_ptr& hiveTableHandle, - const std::shared_ptr& hiveSplit); + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& hiveSplit); void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, - const std::shared_ptr& hiveConfig, + const std::shared_ptr& hiveConfig, const Config* sessionProperties, const RowTypePtr& fileSchema, - const std::shared_ptr& hiveSplit, + const std::shared_ptr& hiveSplit, const std::unordered_map& tableParameters = {}); void configureRowReaderOptions( dwio::common::RowReaderOptions& rowReaderOptions, const std::unordered_map& tableParameters, - const std::shared_ptr& scanSpec, - const std::shared_ptr& metadataFilter, + std::shared_ptr scanSpec, + std::shared_ptr metadataFilter, const RowTypePtr& rowType, - const std::shared_ptr& hiveSplit); + std::shared_ptr hiveSplit); bool testFilters( - const common::ScanSpec* scanSpec, - const dwio::common::Reader* reader, + common::ScanSpec* scanSpec, + dwio::common::Reader* reader, const std::string& filePath, const std::unordered_map>& partitionKey, - const std::unordered_map>& + std::unordered_map>* partitionKeysHandle); std::unique_ptr createBufferedInput( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 173c59926a6b..33fe66298258 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -170,14 +170,14 @@ std::unique_ptr HiveDataSource::createSplitReader() { return SplitReader::create( split_, hiveTableHandle_, - &partitionKeys_, - connectorQueryCtx_, - hiveConfig_, + scanSpec_, readerOutputType_, - ioStats_, + &partitionKeys_, fileHandleFactory_, executor_, - scanSpec_); + connectorQueryCtx_, + hiveConfig_, + ioStats_); } void HiveDataSource::addSplit(std::shared_ptr split) { diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 48cc5ffba743..90c445e5be35 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -61,74 +61,75 @@ VectorPtr newConstantFromString( } // namespace std::unique_ptr SplitReader::create( - const std::shared_ptr& hiveSplit, - const std::shared_ptr& hiveTableHandle, - const std::unordered_map>* - partitionKeys, - const ConnectorQueryCtx* connectorQueryCtx, - const std::shared_ptr& hiveConfig, + const std::shared_ptr& + hiveSplit, + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& scanSpec, const RowTypePtr& readerOutputType, - const std::shared_ptr& ioStats, + std::unordered_map>* + partitionKeys, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec) { + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStats) { // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] if (hiveSplit->customSplitInfo.count("table_format") > 0 && hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") { return std::make_unique( hiveSplit, hiveTableHandle, - partitionKeys, - connectorQueryCtx, - hiveConfig, + scanSpec, readerOutputType, - ioStats, + partitionKeys, fileHandleFactory, executor, - scanSpec); + connectorQueryCtx, + hiveConfig, + ioStats); } else { return std::make_unique( hiveSplit, hiveTableHandle, - partitionKeys, - connectorQueryCtx, - hiveConfig, + scanSpec, readerOutputType, - ioStats, + partitionKeys, fileHandleFactory, executor, - scanSpec); + connectorQueryCtx, + hiveConfig, + ioStats); } } SplitReader::SplitReader( - const std::shared_ptr& hiveSplit, - const std::shared_ptr& hiveTableHandle, - const std::unordered_map>* - partitionKeys, - const ConnectorQueryCtx* connectorQueryCtx, - const std::shared_ptr& hiveConfig, + const std::shared_ptr& + hiveSplit, + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& scanSpec, const RowTypePtr& readerOutputType, - const std::shared_ptr& ioStats, + std::unordered_map>* + partitionKeys, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec) + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStats) : hiveSplit_(hiveSplit), hiveTableHandle_(hiveTableHandle), + scanSpec_(scanSpec), + readerOutputType_(readerOutputType), partitionKeys_(partitionKeys), + pool_(connectorQueryCtx->memoryPool()), + fileHandleFactory_(fileHandleFactory), + executor_(executor), connectorQueryCtx_(connectorQueryCtx), hiveConfig_(hiveConfig), - readerOutputType_(readerOutputType), ioStats_(ioStats), - fileHandleFactory_(fileHandleFactory), - executor_(executor), - pool_(connectorQueryCtx->memoryPool()), - scanSpec_(scanSpec), - baseReaderOpts_(connectorQueryCtx->memoryPool()), - emptySplit_(false) {} + baseReaderOpts_(connectorQueryCtx->memoryPool()) {} void SplitReader::configureReaderOptions( - std::shared_ptr randomSkip) { + std::shared_ptr randomSkip) { hive::configureReaderOptions( baseReaderOpts_, hiveConfig_, @@ -141,77 +142,6 @@ void SplitReader::configureReaderOptions( void SplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - createReader(); - - if (checkIfSplitIsEmpty(runtimeStats)) { - VELOX_CHECK(emptySplit_); - return; - } - - createRowReader(metadataFilter); -} - -uint64_t SplitReader::next(uint64_t size, VectorPtr& output) { - if (!baseReaderOpts_.randomSkip()) { - return baseRowReader_->next(size, output); - } - dwio::common::Mutation mutation; - mutation.randomSkip = baseReaderOpts_.randomSkip().get(); - return baseRowReader_->next(size, output, &mutation); -} - -void SplitReader::resetFilterCaches() { - if (baseRowReader_) { - baseRowReader_->resetFilterCaches(); - } -} - -bool SplitReader::emptySplit() const { - return emptySplit_; -} - -void SplitReader::resetSplit() { - hiveSplit_.reset(); -} - -int64_t SplitReader::estimatedRowSize() const { - if (!baseRowReader_) { - return DataSource::kUnknownRowSize; - } - - const auto size = baseRowReader_->estimatedRowSize(); - return size.value_or(DataSource::kUnknownRowSize); -} - -void SplitReader::updateRuntimeStats( - dwio::common::RuntimeStatistics& stats) const { - if (baseRowReader_) { - baseRowReader_->updateRuntimeStats(stats); - } -} - -bool SplitReader::allPrefetchIssued() const { - return baseRowReader_ && baseRowReader_->allPrefetchIssued(); -} - -std::string SplitReader::toString() const { - std::string partitionKeys; - std::for_each( - partitionKeys_->begin(), - partitionKeys_->end(), - [&](std::pair> - column) { partitionKeys += " " + column.second->toString(); }); - return fmt::format( - "SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}", - hiveSplit_->toString(), - scanSpec_->toString(), - readerOutputType_->toString(), - partitionKeys, - static_cast(baseReader_.get()), - static_cast(baseRowReader_.get())); -} - -void SplitReader::createReader() { VELOX_CHECK_NE( baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN); @@ -228,7 +158,6 @@ void SplitReader::createReader() { throw; } } - // Here we keep adding new entries to CacheTTLController when new fileHandles // are generated, if CacheTTLController was created. Creator of // CacheTTLController needs to make sure a size control strategy was available @@ -241,39 +170,28 @@ void SplitReader::createReader() { baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat()) ->createReader(std::move(baseFileInput), baseReaderOpts_); -} - -bool SplitReader::checkIfSplitIsEmpty( - dwio::common::RuntimeStatistics& runtimeStats) { - // emptySplit_ may already be set if the data file is not found. In this case - // we don't need to test further. - if (emptySplit_) { - return true; - } // Note that this doesn't apply to Hudi tables. - if (!baseReader_ || baseReader_->numberOfRows() == 0) { + emptySplit_ = false; + if (baseReader_->numberOfRows() == 0) { emptySplit_ = true; - } else { - // Check filters and see if the whole split can be skipped. Note that this - // doesn't apply to Hudi tables. - if (!testFilters( - scanSpec_.get(), - baseReader_.get(), - hiveSplit_->filePath, - hiveSplit_->partitionKeys, - *partitionKeys_)) { - ++runtimeStats.skippedSplits; - runtimeStats.skippedSplitBytes += hiveSplit_->length; - emptySplit_ = true; - } + return; } - return emptySplit_; -} + // Check filters and see if the whole split can be skipped. Note that this + // doesn't apply to Hudi tables. + if (!testFilters( + scanSpec_.get(), + baseReader_.get(), + hiveSplit_->filePath, + hiveSplit_->partitionKeys, + partitionKeys_)) { + emptySplit_ = true; + ++runtimeStats.skippedSplits; + runtimeStats.skippedSplitBytes += hiveSplit_->length; + return; + } -void SplitReader::createRowReader( - std::shared_ptr metadataFilter) { auto& fileType = baseReader_->rowType(); auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema()); @@ -374,6 +292,52 @@ std::vector SplitReader::adaptColumns( return columnTypes; } +uint64_t SplitReader::next(int64_t size, VectorPtr& output) { + if (!baseReaderOpts_.randomSkip()) { + return baseRowReader_->next(size, output); + } + dwio::common::Mutation mutation; + mutation.randomSkip = baseReaderOpts_.randomSkip().get(); + return baseRowReader_->next(size, output, &mutation); +} + +void SplitReader::resetFilterCaches() { + if (baseRowReader_) { + baseRowReader_->resetFilterCaches(); + } +} + +bool SplitReader::emptySplit() const { + return emptySplit_; +} + +void SplitReader::resetSplit() { + hiveSplit_.reset(); +} + +int64_t SplitReader::estimatedRowSize() const { + if (!baseRowReader_) { + return DataSource::kUnknownRowSize; + } + + auto size = baseRowReader_->estimatedRowSize(); + if (size.has_value()) { + return size.value(); + } + return DataSource::kUnknownRowSize; +} + +void SplitReader::updateRuntimeStats( + dwio::common::RuntimeStatistics& stats) const { + if (baseRowReader_) { + baseRowReader_->updateRuntimeStats(stats); + } +} + +bool SplitReader::allPrefetchIssued() const { + return baseRowReader_ && baseRowReader_->allPrefetchIssued(); +} + void SplitReader::setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, @@ -394,6 +358,25 @@ void SplitReader::setPartitionValue( spec->setConstantValue(constant); } +std::string SplitReader::toString() const { + std::string partitionKeys; + std::for_each( + partitionKeys_->begin(), + partitionKeys_->end(), + [&](std::pair< + const std::string, + std::shared_ptr> + column) { partitionKeys += " " + column.second->toString(); }); + return fmt::format( + "SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}", + hiveSplit_->toString(), + scanSpec_->toString(), + readerOutputType_->toString(), + partitionKeys, + static_cast(baseReader_.get()), + static_cast(baseRowReader_.get())); +} + } // namespace facebook::velox::connector::hive template <> diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index dc4144cf6d16..83cd6b3ae748 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -16,7 +16,6 @@ #pragma once -#include "velox/common/base/RandomUtil.h" #include "velox/connectors/hive/FileHandle.h" #include "velox/dwio/common/Options.h" @@ -55,30 +54,32 @@ class HiveConfig; class SplitReader { public: static std::unique_ptr create( - const std::shared_ptr& hiveSplit, - const std::shared_ptr& hiveTableHandle, - const std::unordered_map>* - partitionKeys, - const ConnectorQueryCtx* connectorQueryCtx, - const std::shared_ptr& hiveConfig, + const std::shared_ptr& + hiveSplit, + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& scanSpec, const RowTypePtr& readerOutputType, - const std::shared_ptr& ioStats, + std::unordered_map>* + partitionKeys, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec); + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStats); SplitReader( - const std::shared_ptr& hiveSplit, - const std::shared_ptr& hiveTableHandle, - const std::unordered_map>* - partitionKeys, - const ConnectorQueryCtx* connectorQueryCtx, - const std::shared_ptr& hiveConfig, + const std::shared_ptr& + hiveSplit, + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& scanSpec, const RowTypePtr& readerOutputType, - const std::shared_ptr& ioStats, + std::unordered_map>* + partitionKeys, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec); + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStats); virtual ~SplitReader() = default; @@ -87,13 +88,12 @@ class SplitReader { /// This function is used by different table formats like Iceberg and Hudi to /// do additional preparations before reading the split, e.g. Open delete - /// files or log files, and add column adapatations for metadata columns. It - /// would be called only once per incoming split + /// files or log files, and add column adapatations for metadata columns virtual void prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats); - virtual uint64_t next(uint64_t size, VectorPtr& output); + virtual uint64_t next(int64_t size, VectorPtr& output); void resetFilterCaches(); @@ -110,24 +110,8 @@ class SplitReader { std::string toString() const; protected: - /// Create the dwio::common::Reader object baseReader_, which will be used to - /// read the data file's metadata and schema - void createReader(); - - /// Check if the hiveSplit_ is empty. The split is considered empty when - /// 1) The data file is missing but the user chooses to ignore it - /// 2) The file does not contain any rows - /// 3) The data in the file does not pass the filters. The test is based on - /// the file metadata and partition key values - /// This function needs to be called after baseReader_ is created. - bool checkIfSplitIsEmpty(dwio::common::RuntimeStatistics& runtimeStats); - - /// Create the dwio::common::RowReader object baseRowReader_, which owns the - /// ColumnReaders that will be used to read the data - void createRowReader(std::shared_ptr metadataFilter); - - /// Different table formats may have different meatadata columns. - /// This function will be used to update the scanSpec for these columns. + // Different table formats may have different meatadata columns. This function + // will be used to update the scanSpec for these columns. virtual std::vector adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema); @@ -137,25 +121,24 @@ class SplitReader { const std::string& partitionKey, const std::optional& value) const; - std::shared_ptr hiveSplit_; - const std::shared_ptr hiveTableHandle_; - const std::unordered_map< - std::string, - std::shared_ptr>* const partitionKeys_; - const ConnectorQueryCtx* const connectorQueryCtx_; - const std::shared_ptr hiveConfig_; - - const RowTypePtr readerOutputType_; - const std::shared_ptr ioStats_; - FileHandleFactory* const fileHandleFactory_; - folly::Executor* const executor_; - memory::MemoryPool* const pool_; - + std::shared_ptr hiveSplit_; + std::shared_ptr hiveTableHandle_; std::shared_ptr scanSpec_; + RowTypePtr readerOutputType_; + std::unordered_map>* + partitionKeys_; + memory::MemoryPool* const pool_; std::unique_ptr baseReader_; std::unique_ptr baseRowReader_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + const ConnectorQueryCtx* const connectorQueryCtx_; + const std::shared_ptr hiveConfig_; + std::shared_ptr ioStats_; dwio::common::ReaderOptions baseReaderOpts_; dwio::common::RowReaderOptions baseRowReaderOpts_; + + private: bool emptySplit_; }; diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 58515256ca21..ea4313c9ee80 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -19,79 +19,68 @@ #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/dwio/common/BufferUtil.h" +#include "velox/dwio/common/Mutation.h" +#include "velox/dwio/common/Reader.h" using namespace facebook::velox::dwio::common; namespace facebook::velox::connector::hive::iceberg { IcebergSplitReader::IcebergSplitReader( - const std::shared_ptr& hiveSplit, - const std::shared_ptr& hiveTableHandle, - const std::unordered_map>* + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* partitionKeys, - const ConnectorQueryCtx* connectorQueryCtx, - const std::shared_ptr& hiveConfig, - const RowTypePtr& readerOutputType, - const std::shared_ptr& ioStats, - FileHandleFactory* const fileHandleFactory, + FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec) + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats) : SplitReader( hiveSplit, hiveTableHandle, - partitionKeys, - connectorQueryCtx, - hiveConfig, + scanSpec, readerOutputType, - ioStats, + partitionKeys, fileHandleFactory, executor, - scanSpec), - baseReadOffset_(0), - splitOffset_(0) {} + connectorQueryCtx, + hiveConfig, + ioStats) {} void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - createReader(); - - if (checkIfSplitIsEmpty(runtimeStats)) { - VELOX_CHECK(emptySplit_); - return; - } - - createRowReader(metadataFilter); - - std::shared_ptr icebergSplit = - std::dynamic_pointer_cast(hiveSplit_); + SplitReader::prepareSplit(metadataFilter, runtimeStats); baseReadOffset_ = 0; - splitOffset_ = baseRowReader_->nextRowNumber(); positionalDeleteFileReaders_.clear(); + splitOffset_ = baseRowReader_->nextRowNumber(); + + // TODO: Deserialize the std::vector deleteFiles. For now + // we assume it's already deserialized. + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(hiveSplit_); const auto& deleteFiles = icebergSplit->deleteFiles; for (const auto& deleteFile : deleteFiles) { - if (deleteFile.content == FileContent::kPositionalDeletes) { - if (deleteFile.recordCount > 0) { - positionalDeleteFileReaders_.push_back( - std::make_unique( - deleteFile, - hiveSplit_->filePath, - fileHandleFactory_, - connectorQueryCtx_, - executor_, - hiveConfig_, - ioStats_, - runtimeStats, - splitOffset_, - hiveSplit_->connectorId)); - } - } else { - VELOX_NYI(); - } + positionalDeleteFileReaders_.push_back( + std::make_unique( + deleteFile, + hiveSplit_->filePath, + fileHandleFactory_, + connectorQueryCtx_, + executor_, + hiveConfig_, + ioStats_, + runtimeStats, + splitOffset_, + hiveSplit_->connectorId)); } } -uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { +uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) { Mutation mutation; mutation.randomSkip = baseReaderOpts_.randomSkip().get(); mutation.deletedRows = nullptr; diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 070d519c5eaa..3aa5da127cbd 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -27,17 +27,17 @@ struct IcebergDeleteFile; class IcebergSplitReader : public SplitReader { public: IcebergSplitReader( - const std::shared_ptr& hiveSplit, - const std::shared_ptr& hiveTableHandle, - const std::unordered_map>* + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* partitionKeys, - const ConnectorQueryCtx* connectorQueryCtx, - const std::shared_ptr& hiveConfig, - const RowTypePtr& readerOutputType, - const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, - const std::shared_ptr& scanSpec); + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats); ~IcebergSplitReader() override = default; @@ -45,7 +45,7 @@ class IcebergSplitReader : public SplitReader { std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) override; - uint64_t next(uint64_t size, VectorPtr& output) override; + uint64_t next(int64_t size, VectorPtr& output) override; private: // The read offset to the beginning of the split in number of rows for the diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp index 9578d975ab12..b87007fb9804 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp @@ -30,8 +30,8 @@ PositionalDeleteFileReader::PositionalDeleteFileReader( FileHandleFactory* fileHandleFactory, const ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const std::shared_ptr& hiveConfig, - const std::shared_ptr& ioStats, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, dwio::common::RuntimeStatistics& runtimeStats, uint64_t splitOffset, const std::string& connectorId) diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h index 0d8884400134..e667e63da394 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h @@ -41,8 +41,8 @@ class PositionalDeleteFileReader { FileHandleFactory* fileHandleFactory, const ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const std::shared_ptr& hiveConfig, - const std::shared_ptr& ioStats, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, dwio::common::RuntimeStatistics& runtimeStats, uint64_t splitOffset, const std::string& connectorId); @@ -68,8 +68,8 @@ class PositionalDeleteFileReader { FileHandleFactory* const fileHandleFactory_; folly::Executor* const executor_; const ConnectorQueryCtx* const connectorQueryCtx_; - const std::shared_ptr hiveConfig_; - const std::shared_ptr ioStats_; + const std::shared_ptr hiveConfig_; + std::shared_ptr ioStats_; memory::MemoryPool* const pool_; std::shared_ptr filePathColumn_;