Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WithFileSize.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Processors/Formats/Impl/ArrowFieldIndexUtil.h>
Expand Down Expand Up @@ -46,7 +48,21 @@ std::unique_ptr<parquet::ParquetFileReader> ParquetMetaBuilder::openInputParquet
.seekable_read = true,
};
std::atomic<int> is_stopped{0};
auto arrow_file = asArrowFile(read_buffer, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
/// Parquet open reads the footer with RandomAccessFile::ReadAt(). Arrow's default ReadAt is Seek()+Read().
/// On HDFS EC (striped) files, libhdfs3's seek+read cursor path can return wrong bytes for sparse footer
/// reads while pread (readBigAt → hdfsPread → preadInternal) matches the range read path Spark uses.
std::shared_ptr<arrow::io::RandomAccessFile> arrow_file;
if (format_settings.seekable_read && isBufferWithFileSize(read_buffer))
{
if (auto * seekable = dynamic_cast<SeekableReadBuffer *>(&read_buffer);
seekable && seekable->supportsReadAt())
{
arrow_file = std::make_shared<RandomAccessFileFromRandomAccessReadBuffer>(
*seekable, getFileSizeFromReadBuffer(read_buffer), nullptr);
}
}
if (!arrow_file)
arrow_file = asArrowFile(read_buffer, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);

return parquet::ParquetFileReader::Open(arrow_file, parquet::default_reader_properties(), nullptr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <numeric>
#include <Formats/FormatFactory.h>
#include <IO/SeekableReadBuffer.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/NativeORCBlockInputFormat.h>
#include <Storages/SubstraitSource/OrcUtil.h>
#include <Poco/Util/AbstractConfiguration.h>
Expand Down Expand Up @@ -115,7 +114,7 @@ std::vector<StripeInformation> ORCFormatFile::collectRequiredStripes(DB::ReadBuf
.seekable_read = true,
};
std::atomic<int> is_stopped{0};
auto arrow_file = DB::asArrowFile(*read_buffer, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES);
auto arrow_file = OrcUtil::openArrowRandomAccessFileForOrc(*read_buffer, format_settings, is_stopped);
auto orc_reader = OrcUtil::createOrcReader(arrow_file);
total_stripes = orc_reader->getNumberOfStripes();

Expand Down
24 changes: 23 additions & 1 deletion cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
#include "OrcUtil.h"
#include <IO/Operators.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WithFileSize.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <arrow/result.h>
Expand Down Expand Up @@ -70,6 +72,26 @@ namespace ErrorCodes
}
namespace local_engine
{
std::shared_ptr<arrow::io::RandomAccessFile> OrcUtil::openArrowRandomAccessFileForOrc(
DB::ReadBuffer & read_buffer,
const DB::FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
std::shared_ptr<arrow::io::RandomAccessFile> arrow_file;
if (format_settings.seekable_read && DB::isBufferWithFileSize(read_buffer))
{
if (auto * seekable = dynamic_cast<DB::SeekableReadBuffer *>(&read_buffer);
seekable && seekable->supportsReadAt())
{
arrow_file = std::make_shared<DB::RandomAccessFileFromRandomAccessReadBuffer>(
*seekable, DB::getFileSizeFromReadBuffer(read_buffer), nullptr);
}
}
if (!arrow_file)
arrow_file = DB::asArrowFile(read_buffer, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES);
return arrow_file;
}

uint64_t ArrowInputFile::getLength() const
{
ORC_ASSIGN_OR_THROW(int64_t size, file->GetSize())
Expand Down Expand Up @@ -147,7 +169,7 @@ void OrcUtil::getFileReaderAndSchema(
const DB::FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = DB::asArrowFile(in, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES);
auto arrow_file = OrcUtil::openArrowRandomAccessFileForOrc(in, format_settings, is_stopped);
if (is_stopped)
return;

Expand Down
10 changes: 10 additions & 0 deletions cpp-ch/local-engine/Storages/SubstraitSource/OrcUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Processors/Formats/Impl/ORCBlockInputFormat.h>

#include <arrow/io/interfaces.h>
#include <atomic>

/// there are destructor not be overrided warnings in orc lib, ignore them
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsuggest-destructor-override"
Expand Down Expand Up @@ -49,6 +52,13 @@ class ArrowInputFile : public orc::InputStream
class OrcUtil
{
public:
/// Same as ParquetMetaBuilder::openInputParquetFile: on HDFS EC, Arrow's default ReadAt is Seek()+Read();
/// ORC postscript/footer reads need readBigAt (pread) for correct tail bytes.
static std::shared_ptr<arrow::io::RandomAccessFile> openArrowRandomAccessFileForOrc(
DB::ReadBuffer & read_buffer,
const DB::FormatSettings & format_settings,
std::atomic<int> & is_stopped);

static std::unique_ptr<orc::Reader> createOrcReader(std::shared_ptr<arrow::io::RandomAccessFile> file_);

static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,15 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder
modified_time = file_info.properties().modificationtime();
}

#if USE_PARQUET
if (file_info.has_parquet() || (file_info.has_iceberg() && file_info.iceberg().has_parquet()))
file_size = std::nullopt;
#endif
#if USE_ORC
if (file_info.has_orc() || (file_info.has_iceberg() && file_info.iceberg().has_orc()))
file_size = std::nullopt;
#endif

std::unique_ptr<SeekableReadBuffer> read_buffer;
if (!read_settings.enable_filesystem_cache)
{
Expand Down
Loading