Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions be/src/io/fs/packed_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "io/fs/packed_file_system.h"

#include <string_view>
#include <utility>

#include "common/status.h"
Expand All @@ -26,6 +27,42 @@

namespace doris::io {

namespace {

// Only keep packed-file aggregation for the first segment in a rowset.
// The path handled here is expected to look like:
// local/remote segment file: .../{rowset_id}_{segment_id}.dat
// local/remote index file: .../{rowset_id}_{segment_id}.idx
// The .idx case here is V2 only. V1 inverted-index tablets are filtered before
// PackedFileSystem is enabled, so V1 names like
// {rowset_id}_{segment_id}_{index_id}@{suffix}.idx never reach this helper.
// Multi-segment rowsets usually come from large loads or memory-pressure flushes,
// and continuing to buffer later segments in packed files can amplify memory usage.
// Non-rowset file names keep the legacy behavior to avoid changing unrelated callers.
bool should_use_packed_writer(std::string_view file_name) {
constexpr std::string_view kSegmentSuffix = ".dat";
constexpr std::string_view kIndexSuffix = ".idx";

size_t suffix_len = 0;
if (file_name.ends_with(kSegmentSuffix)) {
suffix_len = kSegmentSuffix.size();
} else if (file_name.ends_with(kIndexSuffix)) {
suffix_len = kIndexSuffix.size();
} else {
return true;
}

file_name.remove_suffix(suffix_len);
size_t pos = file_name.rfind('_');
if (pos == std::string_view::npos || pos + 1 >= file_name.size()) {
return true;
}

return file_name.substr(pos + 1) == "0";
}

} // namespace

PackedFileSystem::PackedFileSystem(FileSystemSPtr inner_fs, PackedAppendContext append_info)
: FileSystem(inner_fs->id(), inner_fs->type()),
_inner_fs(std::move(inner_fs)),
Expand Down Expand Up @@ -54,6 +91,11 @@ Status PackedFileSystem::create_file_impl(const Path& file, FileWriterPtr* write
FileWriterPtr inner_writer;
RETURN_IF_ERROR(_inner_fs->create_file(file, &inner_writer, opts));

if (!should_use_packed_writer(file.filename().native())) {
*writer = std::move(inner_writer);
return Status::OK();
}

// Wrap with PackedFileWriter
*writer = std::make_unique<PackedFileWriter>(std::move(inner_writer), file, _append_info);
return Status::OK();
Expand Down
51 changes: 51 additions & 0 deletions be/test/io/fs/packed_file_system_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,57 @@ TEST_F(PackedFileSystemTest, CreateFileWrapsWithPackedFileWriter) {
EXPECT_TRUE(st.ok());
}

TEST_F(PackedFileSystemTest, FirstSegmentDataFileUsesPackedWriter) {
PackedFileSystem merge_fs(_inner_fs, _append_info);

Path file_path("rowset_1_0.dat");
FileWriterPtr writer;
ASSERT_TRUE(merge_fs.create_file(file_path, &writer, nullptr).ok());
ASSERT_NE(writer, nullptr);

std::string data = "test";
Slice slice(data);
ASSERT_TRUE(writer->appendv(&slice, 1).ok());

ASSERT_NE(_inner_fs->last_writer(), nullptr);
EXPECT_EQ(_inner_fs->last_writer()->bytes_appended(), 0);
EXPECT_TRUE(writer->is_in_packed_file());
}

TEST_F(PackedFileSystemTest, LaterSegmentDataFileUsesDirectWriter) {
PackedFileSystem merge_fs(_inner_fs, _append_info);

Path file_path("rowset_1_1.dat");
FileWriterPtr writer;
ASSERT_TRUE(merge_fs.create_file(file_path, &writer, nullptr).ok());
ASSERT_NE(writer, nullptr);

std::string data = "test";
Slice slice(data);
ASSERT_TRUE(writer->appendv(&slice, 1).ok());

ASSERT_NE(_inner_fs->last_writer(), nullptr);
EXPECT_EQ(_inner_fs->last_writer()->bytes_appended(), data.size());
EXPECT_FALSE(writer->is_in_packed_file());
}

TEST_F(PackedFileSystemTest, LaterSegmentIndexFileUsesDirectWriter) {
PackedFileSystem merge_fs(_inner_fs, _append_info);

Path file_path("rowset_1_2.idx");
FileWriterPtr writer;
ASSERT_TRUE(merge_fs.create_file(file_path, &writer, nullptr).ok());
ASSERT_NE(writer, nullptr);

std::string data = "idx";
Slice slice(data);
ASSERT_TRUE(writer->appendv(&slice, 1).ok());

ASSERT_NE(_inner_fs->last_writer(), nullptr);
EXPECT_EQ(_inner_fs->last_writer()->bytes_appended(), data.size());
EXPECT_FALSE(writer->is_in_packed_file());
}

TEST_F(PackedFileSystemTest, OpenFileNotInMergeFile) {
PackedFileSystem merge_fs(_inner_fs, _append_info);

Expand Down
Loading