diff --git a/cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp b/cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp index 263d90528a81..d1c8efdb6db9 100644 --- a/cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp +++ b/cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp @@ -19,6 +19,8 @@ #include #include +#include +#include #include #include #include @@ -46,7 +48,21 @@ std::unique_ptr ParquetMetaBuilder::openInputParquet .seekable_read = true, }; std::atomic is_stopped{0}; - auto arrow_file = asArrowFile(read_buffer, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES); + /// Parquet open reads the footer with RandomAccessFile::ReadAt(). Arrow's default ReadAt is Seek()+Read(). + /// On HDFS EC (striped) files, libhdfs3's seek+read cursor path can return wrong bytes for sparse footer + /// reads while pread (readBigAt → hdfsPread → preadInternal) matches the range read path Spark uses. + std::shared_ptr arrow_file; + if (format_settings.seekable_read && isBufferWithFileSize(read_buffer)) + { + if (auto * seekable = dynamic_cast(&read_buffer); + seekable && seekable->supportsReadAt()) + { + arrow_file = std::make_shared( + *seekable, getFileSizeFromReadBuffer(read_buffer), nullptr); + } + } + if (!arrow_file) + arrow_file = asArrowFile(read_buffer, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES); return parquet::ParquetFileReader::Open(arrow_file, parquet::default_reader_properties(), nullptr); } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp index 854bd5a3ddba..7ea232d79027 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -115,7 +114,7 @@ std::vector ORCFormatFile::collectRequiredStripes(DB::ReadBuf .seekable_read = true, }; std::atomic is_stopped{0}; - auto arrow_file = DB::asArrowFile(*read_buffer, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES); + auto arrow_file = OrcUtil::openArrowRandomAccessFileForOrc(*read_buffer, format_settings, is_stopped); auto orc_reader = OrcUtil::createOrcReader(arrow_file); total_stripes = orc_reader->getNumberOfStripes(); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.cpp index f4dbebb5af72..20afb84a4d11 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.cpp @@ -16,6 +16,8 @@ */ #include "OrcUtil.h" #include +#include +#include #include #include #include @@ -70,6 +72,26 @@ namespace ErrorCodes } namespace local_engine { +std::shared_ptr OrcUtil::openArrowRandomAccessFileForOrc( + DB::ReadBuffer & read_buffer, + const DB::FormatSettings & format_settings, + std::atomic & is_stopped) +{ + std::shared_ptr arrow_file; + if (format_settings.seekable_read && DB::isBufferWithFileSize(read_buffer)) + { + if (auto * seekable = dynamic_cast(&read_buffer); + seekable && seekable->supportsReadAt()) + { + arrow_file = std::make_shared( + *seekable, DB::getFileSizeFromReadBuffer(read_buffer), nullptr); + } + } + if (!arrow_file) + arrow_file = DB::asArrowFile(read_buffer, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES); + return arrow_file; +} + uint64_t ArrowInputFile::getLength() const { ORC_ASSIGN_OR_THROW(int64_t size, file->GetSize()) @@ -147,7 +169,7 @@ void OrcUtil::getFileReaderAndSchema( const DB::FormatSettings & format_settings, std::atomic & is_stopped) { - auto arrow_file = DB::asArrowFile(in, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES); + auto arrow_file = OrcUtil::openArrowRandomAccessFileForOrc(in, format_settings, is_stopped); if (is_stopped) return; diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.h b/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.h index eefdff63bbe2..b56cf0191fa7 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.h @@ -18,6 +18,9 @@ #include #include +#include +#include + /// there are destructor not be overrided warnings in orc lib, ignore them #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wsuggest-destructor-override" @@ -49,6 +52,13 @@ class ArrowInputFile : public orc::InputStream class OrcUtil { public: + /// Same as ParquetMetaBuilder::openInputParquetFile: on HDFS EC, Arrow's default ReadAt is Seek()+Read(); + /// ORC postscript/footer reads need readBigAt (pread) for correct tail bytes. + static std::shared_ptr openArrowRandomAccessFileForOrc( + DB::ReadBuffer & read_buffer, + const DB::FormatSettings & format_settings, + std::atomic & is_stopped); + static std::unique_ptr createOrcReader(std::shared_ptr file_); static size_t countIndicesForType(std::shared_ptr type); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index 258a1b89d976..493482bb286c 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -299,6 +299,15 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder modified_time = file_info.properties().modificationtime(); } +#if USE_PARQUET + if (file_info.has_parquet() || (file_info.has_iceberg() && file_info.iceberg().has_parquet())) + file_size = std::nullopt; +#endif +#if USE_ORC + if (file_info.has_orc() || (file_info.has_iceberg() && file_info.iceberg().has_orc())) + file_size = std::nullopt; +#endif + std::unique_ptr read_buffer; if (!read_settings.enable_filesystem_cache) {