From 3680d7636bad7706e38337cbcc2e554ba8f87c20 Mon Sep 17 00:00:00 2001 From: shosseinimotlagh Date: Mon, 9 Feb 2026 12:45:14 -0800 Subject: [PATCH] Make chunk_size to uint64_t to avoid overflow and support chunk size greater equal than 4GB --- conanfile.py | 2 +- src/include/homestore/blkdata_service.hpp | 2 +- src/include/homestore/index_service.hpp | 2 +- src/include/homestore/logstore_service.hpp | 2 +- src/lib/blkalloc/bitmap_blk_allocator.cpp | 1 + src/lib/blkdata_svc/blkdata_service.cpp | 2 +- src/lib/device/device.h | 26 +++--- src/lib/device/device_manager.cpp | 2 +- src/lib/device/hs_super_blk.h | 2 +- src/lib/index/index_service.cpp | 2 +- src/lib/index/wb_cache.cpp | 18 +++-- src/lib/logstore/log_store_service.cpp | 2 +- src/lib/replication/repl_dev/solo_repl_dev.h | 4 +- .../replication/service/generic_repl_svc.cpp | 2 +- .../replication/service/raft_repl_service.cpp | 18 ++--- src/tests/test_index_gc.cpp | 81 +++++++++---------- 16 files changed, 80 insertions(+), 88 deletions(-) diff --git a/conanfile.py b/conanfile.py index 637a67aed..82691ad35 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.3.3" + version = "8.0.0" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/blkdata_service.hpp b/src/include/homestore/blkdata_service.hpp index 518ad26e7..aa2dd3388 100644 --- a/src/include/homestore/blkdata_service.hpp +++ b/src/include/homestore/blkdata_service.hpp @@ -72,7 +72,7 @@ class BlkDataService { * @param chunk_size The size of chunks to use for the virtual device, in bytes. */ void create_vdev(uint64_t size, HSDevType devType, uint32_t blk_size, blk_allocator_type_t alloc_type, - chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint32_t chunk_size); + chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint64_t chunk_size); /** * @brief Opens a virtual device with the specified virtual device information. diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index 0483a9e55..31c120ef7 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -63,7 +63,7 @@ class IndexService { // Creates the vdev that is needed to initialize the device void create_vdev(uint64_t size, HSDevType devType, uint32_t num_chunks, chunk_selector_type_t chunk_sel_type = chunk_selector_type_t::ROUND_ROBIN, - uint32_t chunk_size = 0); + uint64_t chunk_size = 0); // Open the existing vdev which is represnted by the vdev_info_block shared< VirtualDev > open_vdev(const vdev_info& vb, bool load_existing); std::shared_ptr< ChunkSelector > get_chunk_selector() { return m_custom_chunk_selector; }; diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 745cc28c6..a38e5fdf0 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -160,7 +160,7 @@ class LogStoreService { */ void device_truncate(); - folly::Future< std::error_code > create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size); + folly::Future< std::error_code > create_vdev(uint64_t size, HSDevType devType, uint64_t chunk_size); std::shared_ptr< VirtualDev > open_vdev(const vdev_info& vinfo, bool load_existing); std::shared_ptr< JournalVirtualDev > get_vdev() const { return m_logdev_vdev; } std::vector< std::shared_ptr< LogDev > > get_all_logdevs(); diff --git a/src/lib/blkalloc/bitmap_blk_allocator.cpp b/src/lib/blkalloc/bitmap_blk_allocator.cpp index 8e6e068e9..96e7e1d10 100644 --- a/src/lib/blkalloc/bitmap_blk_allocator.cpp +++ b/src/lib/blkalloc/bitmap_blk_allocator.cpp @@ -59,6 +59,7 @@ void BitmapBlkAllocator::cp_flush(CP*) { if (m_is_disk_bm_dirty.load()) { sisl::byte_array bitmap_buf = acquire_underlying_buffer(); + BLKALLOC_LOG(INFO, "Flushing bitmap (name: {} id: {}) size = {}", get_name(), m_chunk_id, bitmap_buf->size()); if (m_meta_blk_cookie) { meta_service().update_sub_sb(bitmap_buf->cbytes(), bitmap_buf->size(), m_meta_blk_cookie); } else { diff --git a/src/lib/blkdata_svc/blkdata_service.cpp b/src/lib/blkdata_svc/blkdata_service.cpp index b0b73a5ef..7a5977bba 100644 --- a/src/lib/blkdata_svc/blkdata_service.cpp +++ b/src/lib/blkdata_svc/blkdata_service.cpp @@ -39,7 +39,7 @@ BlkDataService::~BlkDataService() = default; // first-time boot path void BlkDataService::create_vdev(uint64_t size, HSDevType devType, uint32_t blk_size, blk_allocator_type_t alloc_type, - chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint32_t chunk_size) { + chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint64_t chunk_size) { hs_vdev_context vdev_ctx; vdev_ctx.type = hs_vdev_type_t::DATA_VDEV; diff --git a/src/lib/device/device.h b/src/lib/device/device.h index b0780e34d..5a23a6b59 100644 --- a/src/lib/device/device.h +++ b/src/lib/device/device.h @@ -43,18 +43,18 @@ struct vdev_info { uint32_t num_mirrors{0}; // 12: Total number of mirrors uint32_t blk_size{0}; // 16: IO block size for this vdev uint32_t num_primary_chunks{0}; // 20: number of primary chunks - uint32_t chunk_size{0}; // 24: chunk size used in vdev. - vdev_size_type_t size_type{}; // 28: Whether its a static or dynamic type. - uint8_t slot_allocated{0}; // 29: Is this current slot allocated - uint8_t failed{0}; // 30: set to true if disk is replaced - uint8_t hs_dev_type{0}; // 31: PDev dev type (as in fast or data) - uint8_t multi_pdev_choice{0}; // 32: Choice when multiple pdevs are present (vdev_multi_pdev_opts_t) - char name[max_name_len]; // 33: Name of the vdev - uint16_t checksum{0}; // 97: Checksum of this entire Block - uint8_t alloc_type; // 98: Allocator type of this vdev - uint8_t chunk_sel_type; // 99: Chunk Selector type of this vdev_id - uint8_t use_slab_allocator{0}; // 100: Use slab allocator for this vdev - uint8_t padding[154]{}; // 101: Padding to make it 256 bytes + uint64_t chunk_size{0}; // 24: chunk size used in vdev. + vdev_size_type_t size_type{}; // 32: Whether its a static or dynamic type. + uint8_t slot_allocated{0}; // 33: Is this current slot allocated + uint8_t failed{0}; // 34: set to true if disk is replaced + uint8_t hs_dev_type{0}; // 35: PDev dev type (as in fast or data) + uint8_t multi_pdev_choice{0}; // 36: Choice when multiple pdevs are present (vdev_multi_pdev_opts_t) + char name[max_name_len]; // 37: Name of the vdev + uint16_t checksum{0}; // 101: Checksum of this entire Block + uint8_t alloc_type; // 102: Allocator type of this vdev + uint8_t chunk_sel_type; // 103: Chunk Selector type of this vdev_id + uint8_t use_slab_allocator{0}; // 104: Use slab allocator for this vdev + uint8_t padding[150]{}; // 105: Padding to make it 256 bytes uint8_t user_private[user_private_size]{}; // 128: User sepcific information uint32_t get_vdev_id() const { return vdev_id; } @@ -107,7 +107,7 @@ struct vdev_parameters { // NOTE: If pdev opts is ALL_PDEV_STRIPED, then num_chunks would round off // to number of pdevs evenly uint32_t blk_size; // Block size vdev operates on - uint32_t chunk_size{}; // Chunk size provided for dynamic vdev. + uint64_t chunk_size{}; // Chunk size provided for dynamic vdev. HSDevType dev_type; // Which physical device type this vdev belongs to (FAST or DATA) blk_allocator_type_t alloc_type; // which allocator type this vdev wants to be with; chunk_selector_type_t chunk_sel_type; // which chunk selector type this vdev wants to be with; diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index e916e0a4c..e17ead052 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -415,7 +415,7 @@ void DeviceManager::compose_vparam(uint64_t vdev_id, vdev_parameters& vparam, st } // Based on the min chunk size, we calculate the max number of chunks that can be created in each target pdev - uint32_t min_chunk_size = hs_super_blk::min_chunk_size(vparam.dev_type); + uint64_t min_chunk_size = hs_super_blk::min_chunk_size(vparam.dev_type); // FIXME: it is possible that each vdev is less than max_num_chunks, but total is more than MAX_CHUNKS_IN_SYSTEM. // uint32 convert is safe as it only overflow when vdev size > 64PB with 16MB min_chunk_size. uint32_t max_num_chunks = std::min(uint32_t(vparam.vdev_size / min_chunk_size), hs_super_blk::MAX_CHUNKS_IN_SYSTEM); diff --git a/src/lib/device/hs_super_blk.h b/src/lib/device/hs_super_blk.h index 18a9e963b..0c015dc0b 100644 --- a/src/lib/device/hs_super_blk.h +++ b/src/lib/device/hs_super_blk.h @@ -207,7 +207,7 @@ class hs_super_blk { // for a device with 100MB and min_chunk_size = 16MB, we should get 6 chunks, not 7. return dinfo.dev_size / min_c_size; } - static uint32_t min_chunk_size(HSDevType dtype) { + static uint64_t min_chunk_size(HSDevType dtype) { uint64_t min_chunk_size = (dtype == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE; #ifdef _PRERELEASE auto chunk_size = iomgr_flip::instance()->get_test_flip< long >("set_minimum_chunk_size"); diff --git a/src/lib/index/index_service.cpp b/src/lib/index/index_service.cpp index a4c4cd71c..e4b2450df 100644 --- a/src/lib/index/index_service.cpp +++ b/src/lib/index/index_service.cpp @@ -45,7 +45,7 @@ IndexService::IndexService(std::unique_ptr< IndexServiceCallbacks > cbs, shared< } void IndexService::create_vdev(uint64_t size, HSDevType devType, uint32_t num_chunks, - chunk_selector_type_t chunk_sel_type, uint32_t chunk_size) { + chunk_selector_type_t chunk_sel_type, uint64_t chunk_size) { auto const atomic_page_size = hs()->device_mgr()->atomic_page_size(devType); hs_vdev_context vdev_ctx; vdev_ctx.type = hs_vdev_type_t::INDEX_VDEV; diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index 769861e75..016207771 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -818,7 +818,11 @@ bool IndexWBCache::was_node_committed(IndexBufferPtr const& buf) { //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { +#ifdef _PRERELEASE LOGTRACEMOD(wbcache, "Starting Index CP Flush with cp \ndag={}", cp_ctx->to_string_with_dags()); +#else + LOGINFOMOD(wbcache, "Starting Index CP Flush with cp {}", cp_ctx->id()); +#endif // #ifdef _PRERELEASE // static int id = 0; // auto filename = "cp_" + std::to_string(id++) + "_" + std::to_string(rand() % 100) + ".dot"; @@ -833,6 +837,7 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { } else { CP_PERIODIC_LOG(DEBUG, unmove(cp_ctx->id()), "Btree does not have any dirty buffers to flush"); } + cp_ctx->complete(true); return folly::makeFuture< bool >(true); // nothing to flush } @@ -946,20 +951,19 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferP if (next_buf) { do_flush_one_buf(cp_ctx, next_buf, false); } else if (!has_more) { - for (const auto& ordinal : m_updated_ordinals) { - LOGTRACEMOD(wbcache, "Updating sb for ordinal {}", ordinal); - index_service().write_sb(ordinal); - } - // We are done flushing the buffers, We flush the vdev to persist the vdev bitmaps and free blks // Pick a CP Manager blocking IO fiber to execute the cp flush of vdev iomanager.run_on_forget(cp_mgr().pick_blocking_io_fiber(), [this, cp_ctx]() { + for (const auto& ordinal : m_updated_ordinals) { + LOGTRACEMOD(wbcache, "Updating sb for ordinal {}", ordinal); + index_service().write_sb(ordinal); + } auto cp_id = cp_ctx->id(); - LOGTRACEMOD(wbcache, "Initiating CP {} flush", cp_id); + LOGINFOMOD(wbcache, "Initiating CP {} flush", cp_id); m_vdev->cp_flush(cp_ctx); // This is a blocking io call LOGTRACEMOD(wbcache, "CP {} freed blkids: \n{}", cp_id, cp_ctx->to_string_free_list()); cp_ctx->complete(true); - LOGTRACEMOD(wbcache, "Completed CP {} flush", cp_id); + LOGINFOMOD(wbcache, "Completed CP {} flush", cp_id); }); } } diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index c1ddda6e7..13c935c01 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -64,7 +64,7 @@ void LogStoreService::on_meta_blk_found(const sisl::byte_view& buf, void* meta_c HS_REL_ASSERT_EQ(m_sb->version, logstore_service_sb_version, "Invalid version of log service metablk"); } -folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size) { +folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, HSDevType devType, uint64_t chunk_size) { const auto atomic_page_size = hs()->device_mgr()->atomic_page_size(devType); hs_vdev_context hs_ctx; diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 4c9fcced2..3f3c4fd54 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -78,9 +78,7 @@ class SoloReplDev : public ReplDev { return std::vector< peer_info >{ peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0, .priority_ = 1}}; } - std::vector< replica_id_t > get_replication_quorum() override { - return std::vector< replica_id_t >{m_group_id}; - } + std::vector< replica_id_t > get_replication_quorum() override { return std::vector< replica_id_t >{m_group_id}; } void reconcile_leader() override {} void yield_leadership(bool immediate_yield, replica_id_t candidate) override {} bool is_ready_for_traffic() const override { return true; } diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index 21d4780c3..4e34a94c3 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -79,7 +79,7 @@ hs_stats GenericReplService::get_cap_stats() const { ///////////////////// SoloReplService specializations and CP Callbacks ///////////////////////////// SoloReplService::SoloReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {} -SoloReplService::~SoloReplService(){}; +SoloReplService::~SoloReplService() {}; void SoloReplService::start() { for (auto const& [buf, mblk] : m_sb_bufs) { diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 5a81b873c..629fd2fae 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -636,29 +636,27 @@ void RaftReplService::start_repl_service_timers() { HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer( HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, - nullptr, [this](void *) { + nullptr, [this](void*) { LOGDEBUGMOD(replication, "Reaper Thread: Doing GC"); gc_repl_reqs(); gc_repl_devs(); }); // Check for queued fetches at the minimum every second - uint64_t interval_ns = std::min( - HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000); + uint64_t interval_ns = + std::min(HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000); m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer(interval_ns, true /* recurring */, nullptr, - [this](void *) { fetch_pending_data(); }); + [this](void*) { fetch_pending_data(); }); // Flush durable commit lsns to superblock // FIXUP: what is the best value for flush_durable_commit_interval_ms? m_flush_durable_commit_timer_hdl = iomanager.schedule_thread_timer( HS_DYNAMIC_CONFIG(consensus.flush_durable_commit_interval_ms) * 1000 * 1000, true /* recurring */, - nullptr, [this](void *) { flush_durable_commit_lsn(); }); + nullptr, [this](void*) { flush_durable_commit_lsn(); }); m_replace_member_sync_check_timer_hdl = iomanager.schedule_thread_timer( HS_DYNAMIC_CONFIG(consensus.replace_member_sync_check_interval_ms) * 1000 * 1000, true /* recurring */, - nullptr, [this](void *) { - monitor_replace_member_replication_status(); - }); + nullptr, [this](void*) { monitor_replace_member_replication_status(); }); latch.count_down(); } }); @@ -675,7 +673,7 @@ void RaftReplService::stop_repl_service_timers() { }); } -void RaftReplService::add_to_fetch_queue(cshared &rdev, std::vector rreqs) { +void RaftReplService::add_to_fetch_queue(cshared< RaftReplDev >& rdev, std::vector< repl_req_ptr_t > rreqs) { std::unique_lock lg(m_pending_fetch_mtx); m_pending_fetch_batches.push(std::make_pair(rdev, std::move(rreqs))); } @@ -683,7 +681,7 @@ void RaftReplService::add_to_fetch_queue(cshared &rdev, std::vector void RaftReplService::fetch_pending_data() { std::unique_lock lg(m_pending_fetch_mtx); while (!m_pending_fetch_batches.empty()) { - auto const &[d, rreqs] = m_pending_fetch_batches.front(); + auto const& [d, rreqs] = m_pending_fetch_batches.front(); if (get_elapsed_time_ms(rreqs.at(0)->created_time()) < HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms)) { break; } diff --git a/src/tests/test_index_gc.cpp b/src/tests/test_index_gc.cpp index 712f56202..af6205e9c 100644 --- a/src/tests/test_index_gc.cpp +++ b/src/tests/test_index_gc.cpp @@ -12,7 +12,6 @@ #include "btree_helpers/btree_decls.h" #include "btree_helpers/blob_route.h" - using namespace homestore; SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) @@ -22,11 +21,11 @@ SISL_LOGGING_DECL(test_index_gc) SISL_OPTION_GROUP( test_index_gc, (num_iters, "", "num_iters", "number of iterations for rand ops", - ::cxxopts::value< uint32_t >()->default_value("500000"), "number"), + ::cxxopts::value< uint32_t >()->default_value("500000"), "number"), (num_entries, "", "num_entries", "number of entries to test with", ::cxxopts::value< uint32_t >()->default_value("7000"), "number"), - (num_put, "", "num_put", "number of entries to test with", - ::cxxopts::value< uint32_t >()->default_value("20000"), "number"), + (num_put, "", "num_put", "number of entries to test with", ::cxxopts::value< uint32_t >()->default_value("20000"), + "number"), (run_time, "", "run_time", "run time for io", ::cxxopts::value< uint64_t >()->default_value("36000"), "seconds"), (disable_merge, "", "disable_merge", "disable_merge", ::cxxopts::value< bool >()->default_value("0"), ""), (operation_list, "", "operation_list", "operation list instead of default created following by percentage", @@ -34,7 +33,8 @@ SISL_OPTION_GROUP( (preload_size, "", "preload_size", "number of entries to preload tree with", ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), (init_device, "", "init_device", "init device", ::cxxopts::value< bool >()->default_value("1"), ""), - (ignore_node_lock_refresh, "", "ignore_node_lock_refresh", "ignore node lock refresh", ::cxxopts::value< bool >(), ""), + (ignore_node_lock_refresh, "", "ignore_node_lock_refresh", "ignore node lock refresh", ::cxxopts::value< bool >(), + ""), (cleanup_after_shutdown, "", "cleanup_after_shutdown", "cleanup after shutdown", ::cxxopts::value< bool >()->default_value("1"), ""), (max_merge_level, "", "max_merge_level", "max merge level", ::cxxopts::value< uint8_t >()->default_value("127"), @@ -42,7 +42,6 @@ SISL_OPTION_GROUP( (seed, "", "seed", "random engine seed, use random if not defined", ::cxxopts::value< uint64_t >()->default_value("0"), "number")) - using BtreeType = IndexTable< BlobRouteByChunkKey, TestFixedValue >; using op_func_t = std::function< void(void) >; static constexpr uint32_t g_num_fibers{4}; @@ -93,10 +92,10 @@ class TestIndexGC : public ::testing::Test { create_io_reactors(g_num_fibers); m_run_time = SISL_OPTIONS["run_time"].as< uint64_t >(); - //m_operations["put"] = std::bind(&BtreeTestHelper::put_random, this); - //m_operations["range_remove"] = std::bind(&BtreeTestHelper::range_remove_existing_random, this); - //m_operations["range_query"] = std::bind(&BtreeTestHelper::query_random, this); - //m_operations["get"] = std::bind(&BtreeTestHelper::get_random, this); + // m_operations["put"] = std::bind(&BtreeTestHelper::put_random, this); + // m_operations["range_remove"] = std::bind(&BtreeTestHelper::range_remove_existing_random, this); + // m_operations["range_query"] = std::bind(&BtreeTestHelper::query_random, this); + // m_operations["get"] = std::bind(&BtreeTestHelper::get_random, this); m_bt = std::make_shared< BtreeType >(uuid, parent_uuid, 0, m_cfg); hs()->index_service().add_index_table(m_bt); LOGINFO("Added index table to index service"); @@ -131,17 +130,17 @@ class TestIndexGC : public ::testing::Test { }; auto ctx = std::make_shared< Context >(); for (uint32_t i{0}; i < num_io_reactors; ++i) { - iomanager.create_reactor("homeblks_long_running_io" + std::to_string(i), iomgr::INTERRUPT_LOOP, 1u, - [this, ctx](bool is_started) { - if (is_started) { - { - std::unique_lock< std::mutex > lk{ctx->mtx}; - m_fibers.push_back(iomanager.iofiber_self()); - ++(ctx->thread_cnt); + iomanager.create_reactor("homeblks_long_running_io" + std::to_string(i), iomgr::INTERRUPT_LOOP, 1u, + [this, ctx](bool is_started) { + if (is_started) { + { + std::unique_lock< std::mutex > lk{ctx->mtx}; + m_fibers.push_back(iomanager.iofiber_self()); + ++(ctx->thread_cnt); + } + ctx->cv.notify_one(); } - ctx->cv.notify_one(); - } - }); + }); } { std::unique_lock< std::mutex > lk{ctx->mtx}; @@ -152,7 +151,8 @@ class TestIndexGC : public ::testing::Test { void put_many_random(uint16_t chunk_id, uint32_t num_put) { for (uint16_t i = 0; i < num_put; ++i) { - auto key = BlobRouteByChunkKey{BlobRouteByChunk(chunk_id, g_randval_generator(g_re), g_randval_generator(g_re))}; + auto key = + BlobRouteByChunkKey{BlobRouteByChunk(chunk_id, g_randval_generator(g_re), g_randval_generator(g_re))}; auto value = TestFixedValue::generate_rand(); auto sreq = BtreeSinglePutRequest{&key, &value, btree_put_type::UPSERT}; sreq.enable_route_tracing(); @@ -189,13 +189,13 @@ class TestIndexGC : public ::testing::Test { status = m_bt->query(query_req, valid_blob_indexes); if (status != homestore::btree_status_t::success) { LOGERROR("Failed to query blobs after purging reserved chunk={} in gc index table, index ret={}", chunk_id, - status); + status); return false; } if (!valid_blob_indexes.empty()) { LOGERROR("gc index table is not empty for chunk={} after purging, valid_blob_indexes.size={}", chunk_id, - valid_blob_indexes.size()); + valid_blob_indexes.size()); return SISL_OPTIONS["ignore_node_lock_refresh"].as< bool >(); } @@ -205,18 +205,18 @@ class TestIndexGC : public ::testing::Test { void gc_task(uint32_t idx) { LOGINFO("GC task {} started", idx); auto num_puts = SISL_OPTIONS["num_put"].as< uint32_t >(); - while(!time_to_stop()) { + while (!time_to_stop()) { // Step 1: preload chunks with some data for (uint16_t i = 0; i < 20; ++i) { - uint16_t chunk_id = 20*idx + i; + uint16_t chunk_id = 20 * idx + i; put_many_random(chunk_id, num_puts); } LOGDEBUG("Preload done for index {}", idx); // Step 2: start chunk gc for (uint16_t i = 0; i < 20; ++i) { - uint16_t chunk_id = 20*idx + i; - ASSERT_TRUE(do_gc(chunk_id)); + uint16_t chunk_id = 20 * idx + i; + ASSERT_TRUE(do_gc(chunk_id)); } LOGDEBUG("GC done for index {}", idx); auto elapsed_time = get_elapsed_time_sec(m_start_time); @@ -234,7 +234,7 @@ class TestIndexGC : public ::testing::Test { LOGINFO("Put task {} started", idx); while (!time_to_stop()) { for (uint16_t i = 0; i < 1000; ++i) { - uint16_t chunk_id = 20*idx + i; + uint16_t chunk_id = 20 * idx + i; put_many_random(chunk_id, 10); } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -247,8 +247,9 @@ class TestIndexGC : public ::testing::Test { LOGINFO("Get task {} started", idx); while (!time_to_stop()) { for (uint16_t i = 0; i < 1000; ++i) { - uint16_t chunk_id = 20*idx + i; - auto key = BlobRouteByChunkKey{BlobRouteByChunk(chunk_id, g_randval_generator(g_re), g_randval_generator(g_re))}; + uint16_t chunk_id = 20 * idx + i; + auto key = BlobRouteByChunkKey{ + BlobRouteByChunk(chunk_id, g_randval_generator(g_re), g_randval_generator(g_re))}; TestFixedValue value; homestore::BtreeSingleGetRequest get_req{&key, &value}; m_bt->get(get_req); @@ -259,9 +260,7 @@ class TestIndexGC : public ::testing::Test { m_test_done_latch.count_down(); } - bool time_to_stop() const { - return (get_elapsed_time_sec(m_start_time) > m_run_time); - } + bool time_to_stop() const { return (get_elapsed_time_sec(m_start_time) > m_run_time); } BtreeConfig m_cfg{g_node_size}; std::shared_ptr< BtreeType > m_bt; @@ -278,18 +277,10 @@ class TestIndexGC : public ::testing::Test { TEST_F(TestIndexGC, chunk_gc_test) { LOGINFO("Chunk GC test start"); m_start_time = Clock::now(); - iomanager.run_on_forget(m_fibers[0], [this]() { - gc_task(0); - }); - iomanager.run_on_forget(m_fibers[1], [this]() { - gc_task(1); - }); - iomanager.run_on_forget(m_fibers[2], [this]() { - put_task(2); - }); - iomanager.run_on_forget(m_fibers[3], [this]() { - get_task(3); - }); + iomanager.run_on_forget(m_fibers[0], [this]() { gc_task(0); }); + iomanager.run_on_forget(m_fibers[1], [this]() { gc_task(1); }); + iomanager.run_on_forget(m_fibers[2], [this]() { put_task(2); }); + iomanager.run_on_forget(m_fibers[3], [this]() { get_task(3); }); m_test_done_latch.wait(); LOGINFO("Chunk GC test passed"); }