Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.3.3"
version = "8.0.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/blkdata_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class BlkDataService {
* @param chunk_size The size of chunks to use for the virtual device, in bytes.
*/
void create_vdev(uint64_t size, HSDevType devType, uint32_t blk_size, blk_allocator_type_t alloc_type,
chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint32_t chunk_size);
chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint64_t chunk_size);

/**
* @brief Opens a virtual device with the specified virtual device information.
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IndexService {
// Creates the vdev that is needed to initialize the device
void create_vdev(uint64_t size, HSDevType devType, uint32_t num_chunks,
chunk_selector_type_t chunk_sel_type = chunk_selector_type_t::ROUND_ROBIN,
uint32_t chunk_size = 0);
uint64_t chunk_size = 0);
// Open the existing vdev which is represnted by the vdev_info_block
shared< VirtualDev > open_vdev(const vdev_info& vb, bool load_existing);
std::shared_ptr< ChunkSelector > get_chunk_selector() { return m_custom_chunk_selector; };
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class LogStoreService {
*/
void device_truncate();

folly::Future< std::error_code > create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size);
folly::Future< std::error_code > create_vdev(uint64_t size, HSDevType devType, uint64_t chunk_size);
std::shared_ptr< VirtualDev > open_vdev(const vdev_info& vinfo, bool load_existing);
std::shared_ptr< JournalVirtualDev > get_vdev() const { return m_logdev_vdev; }
std::vector< std::shared_ptr< LogDev > > get_all_logdevs();
Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/bitmap_blk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void BitmapBlkAllocator::cp_flush(CP*) {

if (m_is_disk_bm_dirty.load()) {
sisl::byte_array bitmap_buf = acquire_underlying_buffer();
BLKALLOC_LOG(INFO, "Flushing bitmap (name: {} id: {}) size = {}", get_name(), m_chunk_id, bitmap_buf->size());
if (m_meta_blk_cookie) {
meta_service().update_sub_sb(bitmap_buf->cbytes(), bitmap_buf->size(), m_meta_blk_cookie);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/blkdata_svc/blkdata_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ BlkDataService::~BlkDataService() = default;

// first-time boot path
void BlkDataService::create_vdev(uint64_t size, HSDevType devType, uint32_t blk_size, blk_allocator_type_t alloc_type,
chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint32_t chunk_size) {
chunk_selector_type_t chunk_sel_type, uint32_t num_chunks, uint64_t chunk_size) {
hs_vdev_context vdev_ctx;
vdev_ctx.type = hs_vdev_type_t::DATA_VDEV;

Expand Down
26 changes: 13 additions & 13 deletions src/lib/device/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ struct vdev_info {
uint32_t num_mirrors{0}; // 12: Total number of mirrors
uint32_t blk_size{0}; // 16: IO block size for this vdev
uint32_t num_primary_chunks{0}; // 20: number of primary chunks
uint32_t chunk_size{0}; // 24: chunk size used in vdev.
vdev_size_type_t size_type{}; // 28: Whether its a static or dynamic type.
uint8_t slot_allocated{0}; // 29: Is this current slot allocated
uint8_t failed{0}; // 30: set to true if disk is replaced
uint8_t hs_dev_type{0}; // 31: PDev dev type (as in fast or data)
uint8_t multi_pdev_choice{0}; // 32: Choice when multiple pdevs are present (vdev_multi_pdev_opts_t)
char name[max_name_len]; // 33: Name of the vdev
uint16_t checksum{0}; // 97: Checksum of this entire Block
uint8_t alloc_type; // 98: Allocator type of this vdev
uint8_t chunk_sel_type; // 99: Chunk Selector type of this vdev_id
uint8_t use_slab_allocator{0}; // 100: Use slab allocator for this vdev
uint8_t padding[154]{}; // 101: Padding to make it 256 bytes
uint64_t chunk_size{0}; // 24: chunk size used in vdev.
vdev_size_type_t size_type{}; // 32: Whether its a static or dynamic type.
uint8_t slot_allocated{0}; // 33: Is this current slot allocated
uint8_t failed{0}; // 34: set to true if disk is replaced
uint8_t hs_dev_type{0}; // 35: PDev dev type (as in fast or data)
uint8_t multi_pdev_choice{0}; // 36: Choice when multiple pdevs are present (vdev_multi_pdev_opts_t)
char name[max_name_len]; // 37: Name of the vdev
uint16_t checksum{0}; // 101: Checksum of this entire Block
uint8_t alloc_type; // 102: Allocator type of this vdev
uint8_t chunk_sel_type; // 103: Chunk Selector type of this vdev_id
uint8_t use_slab_allocator{0}; // 104: Use slab allocator for this vdev
uint8_t padding[150]{}; // 105: Padding to make it 256 bytes
uint8_t user_private[user_private_size]{}; // 128: User sepcific information

uint32_t get_vdev_id() const { return vdev_id; }
Expand Down Expand Up @@ -107,7 +107,7 @@ struct vdev_parameters {
// NOTE: If pdev opts is ALL_PDEV_STRIPED, then num_chunks would round off
// to number of pdevs evenly
uint32_t blk_size; // Block size vdev operates on
uint32_t chunk_size{}; // Chunk size provided for dynamic vdev.
uint64_t chunk_size{}; // Chunk size provided for dynamic vdev.
HSDevType dev_type; // Which physical device type this vdev belongs to (FAST or DATA)
blk_allocator_type_t alloc_type; // which allocator type this vdev wants to be with;
chunk_selector_type_t chunk_sel_type; // which chunk selector type this vdev wants to be with;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/device/device_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void DeviceManager::compose_vparam(uint64_t vdev_id, vdev_parameters& vparam, st
}

// Based on the min chunk size, we calculate the max number of chunks that can be created in each target pdev
uint32_t min_chunk_size = hs_super_blk::min_chunk_size(vparam.dev_type);
uint64_t min_chunk_size = hs_super_blk::min_chunk_size(vparam.dev_type);
// FIXME: it is possible that each vdev is less than max_num_chunks, but total is more than MAX_CHUNKS_IN_SYSTEM.
// uint32 convert is safe as it only overflow when vdev size > 64PB with 16MB min_chunk_size.
uint32_t max_num_chunks = std::min(uint32_t(vparam.vdev_size / min_chunk_size), hs_super_blk::MAX_CHUNKS_IN_SYSTEM);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/device/hs_super_blk.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class hs_super_blk {
// for a device with 100MB and min_chunk_size = 16MB, we should get 6 chunks, not 7.
return dinfo.dev_size / min_c_size;
}
static uint32_t min_chunk_size(HSDevType dtype) {
static uint64_t min_chunk_size(HSDevType dtype) {
uint64_t min_chunk_size = (dtype == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE;
#ifdef _PRERELEASE
auto chunk_size = iomgr_flip::instance()->get_test_flip< long >("set_minimum_chunk_size");
Expand Down
2 changes: 1 addition & 1 deletion src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ IndexService::IndexService(std::unique_ptr< IndexServiceCallbacks > cbs, shared<
}

void IndexService::create_vdev(uint64_t size, HSDevType devType, uint32_t num_chunks,
chunk_selector_type_t chunk_sel_type, uint32_t chunk_size) {
chunk_selector_type_t chunk_sel_type, uint64_t chunk_size) {
auto const atomic_page_size = hs()->device_mgr()->atomic_page_size(devType);
hs_vdev_context vdev_ctx;
vdev_ctx.type = hs_vdev_type_t::INDEX_VDEV;
Expand Down
18 changes: 11 additions & 7 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,11 @@ bool IndexWBCache::was_node_committed(IndexBufferPtr const& buf) {

//////////////////// CP Related API section /////////////////////////////////
folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) {
#ifdef _PRERELEASE
LOGTRACEMOD(wbcache, "Starting Index CP Flush with cp \ndag={}", cp_ctx->to_string_with_dags());
#else
LOGINFOMOD(wbcache, "Starting Index CP Flush with cp {}", cp_ctx->id());
#endif
// #ifdef _PRERELEASE
// static int id = 0;
// auto filename = "cp_" + std::to_string(id++) + "_" + std::to_string(rand() % 100) + ".dot";
Expand All @@ -833,6 +837,7 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) {
} else {
CP_PERIODIC_LOG(DEBUG, unmove(cp_ctx->id()), "Btree does not have any dirty buffers to flush");
}
cp_ctx->complete(true);
return folly::makeFuture< bool >(true); // nothing to flush
}

Expand Down Expand Up @@ -946,20 +951,19 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferP
if (next_buf) {
do_flush_one_buf(cp_ctx, next_buf, false);
} else if (!has_more) {
for (const auto& ordinal : m_updated_ordinals) {
LOGTRACEMOD(wbcache, "Updating sb for ordinal {}", ordinal);
index_service().write_sb(ordinal);
}

// We are done flushing the buffers, We flush the vdev to persist the vdev bitmaps and free blks
// Pick a CP Manager blocking IO fiber to execute the cp flush of vdev
iomanager.run_on_forget(cp_mgr().pick_blocking_io_fiber(), [this, cp_ctx]() {
for (const auto& ordinal : m_updated_ordinals) {
LOGTRACEMOD(wbcache, "Updating sb for ordinal {}", ordinal);
index_service().write_sb(ordinal);
}
auto cp_id = cp_ctx->id();
LOGTRACEMOD(wbcache, "Initiating CP {} flush", cp_id);
LOGINFOMOD(wbcache, "Initiating CP {} flush", cp_id);
m_vdev->cp_flush(cp_ctx); // This is a blocking io call
LOGTRACEMOD(wbcache, "CP {} freed blkids: \n{}", cp_id, cp_ctx->to_string_free_list());
cp_ctx->complete(true);
LOGTRACEMOD(wbcache, "Completed CP {} flush", cp_id);
LOGINFOMOD(wbcache, "Completed CP {} flush", cp_id);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/logstore/log_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void LogStoreService::on_meta_blk_found(const sisl::byte_view& buf, void* meta_c
HS_REL_ASSERT_EQ(m_sb->version, logstore_service_sb_version, "Invalid version of log service metablk");
}

folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size) {
folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, HSDevType devType, uint64_t chunk_size) {
const auto atomic_page_size = hs()->device_mgr()->atomic_page_size(devType);

hs_vdev_context hs_ctx;
Expand Down
4 changes: 1 addition & 3 deletions src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ class SoloReplDev : public ReplDev {
return std::vector< peer_info >{
peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0, .priority_ = 1}};
}
std::vector< replica_id_t > get_replication_quorum() override {
return std::vector< replica_id_t >{m_group_id};
}
std::vector< replica_id_t > get_replication_quorum() override { return std::vector< replica_id_t >{m_group_id}; }
void reconcile_leader() override {}
void yield_leadership(bool immediate_yield, replica_id_t candidate) override {}
bool is_ready_for_traffic() const override { return true; }
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/service/generic_repl_svc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ hs_stats GenericReplService::get_cap_stats() const {

///////////////////// SoloReplService specializations and CP Callbacks /////////////////////////////
SoloReplService::SoloReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {}
SoloReplService::~SoloReplService(){};
SoloReplService::~SoloReplService() {};

void SoloReplService::start() {
for (auto const& [buf, mblk] : m_sb_bufs) {
Expand Down
18 changes: 8 additions & 10 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,29 +636,27 @@ void RaftReplService::start_repl_service_timers() {
HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec));
m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer(
HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */,
nullptr, [this](void *) {
nullptr, [this](void*) {
LOGDEBUGMOD(replication, "Reaper Thread: Doing GC");
gc_repl_reqs();
gc_repl_devs();
});

// Check for queued fetches at the minimum every second
uint64_t interval_ns = std::min(
HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000);
uint64_t interval_ns =
std::min(HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000);
m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer(interval_ns, true /* recurring */, nullptr,
[this](void *) { fetch_pending_data(); });
[this](void*) { fetch_pending_data(); });

// Flush durable commit lsns to superblock
// FIXUP: what is the best value for flush_durable_commit_interval_ms?
m_flush_durable_commit_timer_hdl = iomanager.schedule_thread_timer(
HS_DYNAMIC_CONFIG(consensus.flush_durable_commit_interval_ms) * 1000 * 1000, true /* recurring */,
nullptr, [this](void *) { flush_durable_commit_lsn(); });
nullptr, [this](void*) { flush_durable_commit_lsn(); });

m_replace_member_sync_check_timer_hdl = iomanager.schedule_thread_timer(
HS_DYNAMIC_CONFIG(consensus.replace_member_sync_check_interval_ms) * 1000 * 1000, true /* recurring */,
nullptr, [this](void *) {
monitor_replace_member_replication_status();
});
nullptr, [this](void*) { monitor_replace_member_replication_status(); });
latch.count_down();
}
});
Expand All @@ -675,15 +673,15 @@ void RaftReplService::stop_repl_service_timers() {
});
}

void RaftReplService::add_to_fetch_queue(cshared<RaftReplDev> &rdev, std::vector<repl_req_ptr_t> rreqs) {
void RaftReplService::add_to_fetch_queue(cshared< RaftReplDev >& rdev, std::vector< repl_req_ptr_t > rreqs) {
std::unique_lock lg(m_pending_fetch_mtx);
m_pending_fetch_batches.push(std::make_pair(rdev, std::move(rreqs)));
}

void RaftReplService::fetch_pending_data() {
std::unique_lock lg(m_pending_fetch_mtx);
while (!m_pending_fetch_batches.empty()) {
auto const &[d, rreqs] = m_pending_fetch_batches.front();
auto const& [d, rreqs] = m_pending_fetch_batches.front();
if (get_elapsed_time_ms(rreqs.at(0)->created_time()) < HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms)) {
break;
}
Expand Down
Loading
Loading