diff --git a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp index f2c8ddddc845..779e79416fb2 100644 --- a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp +++ b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp @@ -217,7 +217,7 @@ Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t dest_size, size_t offs result = b; skipResult = skipToNextMarker(SplittableBzip2ReadBuffer::BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); - auto * seekable = dynamic_cast(in.get()); + // auto * seekable = dynamic_cast(in.get()); // std::cout << "skipResult:" << skipResult << " position:" << seekable->getPosition() << " b:" << b << std::endl; changeStateToProcessABlock(); } @@ -280,8 +280,8 @@ bool SplittableBzip2ReadBuffer::nextImpl() if (last_block_need_special_process && offset) { /// Trim the last incomplete line from [dest, dest+offset), and record it in last_incomplete_line - bool reach_last_block = (result == BZip2Constants::END_OF_STREAM); - if (reach_last_block) + bool reach_eof = (result == BZip2Constants::END_OF_STREAM); + if (reach_eof) { LOG_DEBUG( getLogger("SplittableBzip2ReadBuffer"), @@ -294,7 +294,7 @@ bool SplittableBzip2ReadBuffer::nextImpl() auto * pos = find_last_symbols_or_null<'\n'>(dest, end); if (!pos) { - if (reach_last_block) + if (reach_eof) offset = 0; else throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find row delimiter in working buffer with size:{}", offset); @@ -307,7 +307,11 @@ bool SplittableBzip2ReadBuffer::nextImpl() if (pos + 1 < end && *(pos + 1) == '\r') offset++; - last_incomplete_line.assign(&dest[offset], old_offset - offset); + if (!reach_eof) + { + /// Only record last incomplete line when eof not reached + last_incomplete_line.assign(&dest[offset], old_offset - offset); + } } } diff --git a/cpp-ch/local-engine/examples/CMakeLists.txt b/cpp-ch/local-engine/examples/CMakeLists.txt index 21af44372d8e..04f6f7088fcb 100644 --- a/cpp-ch/local-engine/examples/CMakeLists.txt +++ b/cpp-ch/local-engine/examples/CMakeLists.txt @@ -1,17 +1,17 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements. See the NOTICE file distributed with this work for -# additional information regarding copyright ownership. The ASF licenses this -# file to You under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. clickhouse_add_executable(signal_demo signal_demo.cpp) target_link_libraries(signal_demo PRIVATE gluten_clickhouse_backend_libs diff --git a/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp b/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp index 749d91a4ba88..af3fcab46b73 100644 --- a/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp +++ b/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp @@ -39,7 +39,7 @@ int main() // String hdfs_file_path = "/path/to/bzip2/file"; setenv("LIBHDFS3_CONF", "/data1/clickhouse_official/conf/hdfs-site.bigocluster.xml", true); /// NOLINT String hdfs_uri = "hdfs://bigocluster"; - String hdfs_file_path = "/data/apps/imo_us/imo_data_logs_etl/data/imo_data_logs/Room.leave_room_event/day=2024-10-09/part-00000.bz2"; + String hdfs_file_path = "/data/apps/imo_us/imo_data_logs_etl/data/imo_data_logs/Room.leave_room_event/day=2024-04-17/part-00000.bz2"; ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); ReadSettings read_settings; std::unique_ptr in = std::make_unique(hdfs_uri, hdfs_file_path, *config, read_settings, 0, true); @@ -48,11 +48,11 @@ int main() // size_t start = 805306368; // size_t end = 1073900813; size_t start = 0; - size_t end = 268576809; + size_t end = 268660564; bounded_in->seek(start, SEEK_SET); bounded_in->setReadUntilPosition(end); - std::unique_ptr decompressed = std::make_unique(std::move(bounded_in), false, false); + std::unique_ptr decompressed = std::make_unique(std::move(bounded_in), false, true); String download_path = "./download_" + std::to_string(start) + "_" + std::to_string(end) + ".txt"; WriteBufferFromFile write_buffer(download_path);