diff --git a/refactor.sh b/refactor.sh new file mode 100644 index 000000000..0acd4b571 --- /dev/null +++ b/refactor.sh @@ -0,0 +1,142 @@ +#!/bin/bash + +# Input parameters +CLUSTER=908 +NAMESPACE="nuobject2sm-dev" +DEPLOYMENT_COUNT=1 +REFACTOR_IMAGE="hub.tess.io/yawzhang/storage_mgr:refactor_new-RelWithDebInfo" +NEW_IMAGE="hub.tess.io/sds/storage_mgr:1.0-pre.0.2.6.6-RelWithDebInfo" +NODE_NAME="tess-node-xkbtd-tess908.sddz.ebay.com" +DEPLOYMENT_REGEX="nuobject2-isofruit-cherry-908-sm1" +LOG_FILE="logs/refactor.log" + +if [[ -z "$NODE_NAME" ]]; then + echo "please update node name in the script" + exit 1 +fi + +# Function to log messages with timestamps +log_with_timestamp() { + local message="$1" + echo "$(date '+%Y-%m-%d %H:%M:%S') $message" | tee -a "$LOG_FILE" +} + +# Function to check deployment status +check_deployment_status() { + local deployment=$1 + local status + status=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get deployment "$deployment" -o jsonpath='{.status.readyReplicas}' 2>/dev/null) + if [[ "$status" -eq 1 ]]; then + return 0 + else + return 1 + fi +} + +# Function to check pod logs +check_pod_logs() { + local pod=$1 + local log_message=$2 + tess kubectl --context="$CLUSTER" -n "$NAMESPACE" logs "$pod" | grep -q "$log_message" + return $? +} + +PODS=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pods --field-selector spec.nodeName="$NODE_NAME" -o jsonpath='{.items[*].metadata.name}') +if [[ -z "$PODS" ]]; then + log_with_timestamp "No pods found on node $NODE_NAME." + exit 0 +fi + +log_with_timestamp "node $NODE_NAME has pods [$PODS]" + +process_cnt=0 +for POD in $PODS; do + DEPLOYMENT=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pod "$POD" -o jsonpath='{.metadata.ownerReferences[?(@.kind=="ReplicaSet")].name}' | sed 's/-[a-z0-9]*$//') + if [[ -z "$DEPLOYMENT" ]]; then + log_with_timestamp "No deployment found for pod $POD. Skipping..." + continue +# elif ! [[ $DEPLOYMENT =~ $DEPLOYMENT_REGEX ]]; then + elif ! [[ $DEPLOYMENT == $DEPLOYMENT_REGEX ]]; then + log_with_timestamp "Skipping pod $POD as its deployment $DEPLOYMENT does not match the expected pattern." + continue + fi + log_with_timestamp "Processing deployment $DEPLOYMENT pod $POD..." + + # PRE-CHECK + CURRENT_IMAGE=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pod "$POD" -o jsonpath='{.spec.containers[?(@.name=="sm-app")].image}') + if [[ "$CURRENT_IMAGE" == "$NEW_IMAGE" ]]; then + log_with_timestamp "[PRE-CHECK] Pod $POD is already using the new image $NEW_IMAGE. Skipping..." + continue + fi + + if [[ $process_cnt -ge $DEPLOYMENT_COUNT ]]; then + log_with_timestamp "Reached the maximum number of deployments to process: $DEPLOYMENT_COUNT. Stopping further processing." + break + fi + + # Step 1: Update deployment strategy to Recreate and set sm-app image to refactor image + log_with_timestamp "[Step 1]. Updating deployment $DEPLOYMENT strategy to Recreate and setting sm-app image to $REFACTOR_IMAGE..." + tess kubectl --context="$CLUSTER" -n "$NAMESPACE" patch deployment "$DEPLOYMENT" --type='json' -p='[ + {"op": "replace", "path": "/spec/strategy", "value": {"type": "Recreate"}}, + {"op": "replace", "path": "/spec/template/spec/containers/0/image", "value": "'"$REFACTOR_IMAGE"'"} + ]' + + sleep 60 + + # Step 2: Get new pod name and check the log + NEW_POD=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pods -o jsonpath='{.items[*].metadata.name}' | tr ' ' '\n' | grep "$DEPLOYMENT" | grep -v "$POD") + while [[ -z "$NEW_POD" ]]; do + log_with_timestamp "[Step 2]. No new pod found for deployment $DEPLOYMENT. deployment still upgrading, wait 3s and retrying." + sleep 3 + NEW_POD=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pods -o jsonpath='{.items[*].metadata.name}' | tr ' ' '\n' | grep "$DEPLOYMENT" | grep -v "$POD") + done + log_with_timestamp "[Step 2]. New pod created: $NEW_POD for deployment $DEPLOYMENT." + log_with_timestamp "[Step 2]. Checking logs for new pod $NEW_POD..." + max_retry_cnt=20 + retry_cnt=0 + while ! check_pod_logs "$NEW_POD" "exit status 0;"; do + if [[ $retry_cnt -ge $max_retry_cnt ]]; then + log_with_timestamp "[Step 2]. Exceeded maximum retries while checking logs for new pod $NEW_POD." + exit 1 + fi + log_with_timestamp "[Step 2]. Expected log message not found in new pod $NEW_POD, sleeping 3s and retrying." + sleep 3 + retry_cnt=$((retry_cnt + 1)) + done + log_with_timestamp "[Step 2]. refactor confirmation log found in new pod $NEW_POD." + + # Double Check deployment status again, expecting it to not be ready + if check_deployment_status "$DEPLOYMENT"; then + log_with_timestamp "[Step 2]. Unexpected! Deployment $DEPLOYMENT is still ready after updating to refactor image for pod $POD." + exit 1 + fi + + # Step 3: Update deployment sm-app image to new image + log_with_timestamp "[Step 3]. Updating deployment $DEPLOYMENT to use new image $NEW_IMAGE for pod $NEW_POD..." + tess kubectl --context="$CLUSTER" -n "$NAMESPACE" set image deployment/"$DEPLOYMENT" sm-app="$NEW_IMAGE" + sleep 40 + + # Step 4: Check deployment status again, expecting it to be ready + while ! check_deployment_status "$DEPLOYMENT"; do + log_with_timestamp "[Step 4]. Deployment $DEPLOYMENT is not ready after updating to new image for pod $NEW_POD. sleep 5s and retrying." + sleep 5 + done + + # Step 5: Update deployment strategy back to RollingUpdate and set maxUnavailable and maxSurge + log_with_timestamp "[Step 5]. Updating deployment $DEPLOYMENT strategy back to RollingUpdate with maxUnavailable=0 and maxSurge=1..." + tess kubectl --context="$CLUSTER" -n "$NAMESPACE" patch deployment "$DEPLOYMENT" --type='json' -p='[ + {"op": "replace", "path": "/spec/strategy", "value": {"type": "RollingUpdate", "rollingUpdate": {"maxUnavailable": 1, "maxSurge": 1}}} + ]' + + # Final check to ensure deployment is ready + while ! check_deployment_status "$DEPLOYMENT"; do + log_with_timestamp "[Step 6]. Deployment $DEPLOYMENT is not ready after updating strategy back to RollingUpdate. sleep 5s and retrying." + sleep 5 + done + + log_with_timestamp "[Step 6]. Deployment $DEPLOYMENT processed successfully." + process_cnt=$((process_cnt + 1)) + sleep 60 +done + +log_with_timestamp "All pods processed successfully." \ No newline at end of file diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 7768086ee..eddb66af2 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -172,6 +172,7 @@ struct logstore_superblk { [[nodiscard]] static logstore_superblk default_value(); static void init(logstore_superblk& m); + static void init(logstore_superblk& meta, logstore_seq_num_t first_seq_num); static void clear(logstore_superblk& m); [[nodiscard]] static bool is_valid(const logstore_superblk& m); diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 48183a56c..f36aafb05 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -82,6 +82,8 @@ class LogStoreService { */ void start(bool format); + void refactor(); + /** * @brief Stop the LogStoreService. It resets all parameters and can be restarted with start method. * diff --git a/src/include/homestore/superblk_handler.hpp b/src/include/homestore/superblk_handler.hpp index 0402a097f..b19e5693b 100644 --- a/src/include/homestore/superblk_handler.hpp +++ b/src/include/homestore/superblk_handler.hpp @@ -108,9 +108,12 @@ class superblk { sisl::byte_array raw_buf() { return m_raw_buf; } void write() { + LOGINFO("Writing superblk {} of size {}", m_meta_sub_name, m_raw_buf->size()); if (m_meta_blk) { + LOGINFO("Updating existing superblk {}", m_meta_sub_name); meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk); } else { + LOGINFO("Adding new superblk {}", m_meta_sub_name); meta_service().add_sub_sb(m_meta_sub_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk); } } diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 960d885e2..5d7213af6 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -46,7 +46,11 @@ CPManager::CPManager() : start_cp_thread(); } -CPManager::~CPManager() { HS_REL_ASSERT(!m_cur_cp, "CPManager is tiering down without calling shutdown"); } +CPManager::~CPManager() { + delete (m_cur_cp); + rcu_xchg_pointer(&m_cur_cp, nullptr); + LOGINFO("CPManager destroyed"); +} void CPManager::start(bool first_time_boot) { if (first_time_boot) { diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 141ff2063..c5ce85e5b 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -197,7 +197,7 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_ do_start(); return false; } else { - return true; + RELEASE_ASSERT(false, "refactor mode should not bu used for the first time boot"); } } @@ -292,21 +292,8 @@ void HomeStore::do_start() { if (has_repl_data_service()) { s_cast< GenericReplService* >(m_repl_service.get())->start(); // Replservice starts logstore & data service - } else { - if (has_data_service()) { m_data_service->start(); } - if (has_log_service() && inp_params.auto_recovery) { - // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities - // to start log store - m_log_service->start(is_first_time_boot() /* format */); - } - } - - // If this is the first time boot, we need to commit the formatting so that it will not be considered as first time - // boot going forward on next reboot. - if (m_dev_mgr->is_first_time_boot()) { - // Take the first CP after we have initialized all subsystems and wait for it to complete. - m_cp_mgr->trigger_cp_flush(true /* force */).get(); - m_dev_mgr->commit_formatting(); + LOGINFO("Refactor mode enabled, skipping further HomeStore start steps after ReplicationService start"); + return; } m_cp_mgr->start_timer(); @@ -316,15 +303,8 @@ void HomeStore::do_start() { } void HomeStore::shutdown() { - if (!m_init_done) { - LOGWARN("Homestore shutdown is called before init is completed"); - return; - } - LOGINFO("Homestore shutdown is started"); - m_resource_mgr->stop(); - // 1 stop all the services, after which all the upper layer api call are rejected and there is not on-going request. // Note that, after stopping, all the service are alive. if (has_repl_data_service()) @@ -340,7 +320,6 @@ void HomeStore::shutdown() { // 2 call cp_manager shutdown, which will which trigger cp flush to make sure all the in-memory data of all the // services are flushed to disk. since all the upper layer api call are rejected and there is not on-going request, // so after cp flush is done, we can guarantee all the necessary data are persisted to disk. - m_cp_mgr->shutdown(); m_cp_mgr.reset(); // 3 call reset/shutdown to clear all the services and after that all the services are dead, excluding metasevice diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 764259756..2591b5823 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -70,43 +70,12 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) { HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading this log dev id: {}", m_logdev_id); auto const store_list = m_logdev_meta.load(); - - // Notify to the caller that a new log store was reserved earlier and it is being loaded, with its meta info - for (const auto& spair : store_list) { - on_log_store_found(spair.first, spair.second); - } - - THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ", - m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx()); - - m_vdev_jd->update_data_start_offset(m_logdev_meta.get_start_dev_offset()); - m_log_idx = m_logdev_meta.get_start_log_idx(); - do_load(m_logdev_meta.get_start_dev_offset()); - m_log_records->reinit(m_log_idx); - m_last_flush_idx = m_log_idx - 1; - } - - // Now that we have create/load logdev metablk, so the log dev is ready to be used - m_is_ready = true; - - if (allow_timer_flush()) start_timer(); - handle_unopened_log_stores(format); - - { - // Also call the logstore to inform that start/replay is completed. - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - if (!format) { - for (auto& p : m_id_logstore_map) { - auto& lstore{p.second.log_store}; - if (lstore && lstore->get_log_replay_done_cb()) { - lstore->get_log_replay_done_cb()(lstore, lstore->start_lsn() - 1); - lstore->truncate(lstore->truncated_upto()); - } - } - } + LOGINFO("just refactor for lgodev {}, donot need rebuild logstore and load logs, return directly", m_logdev_id); } } +void LogDev::refactor() { LOGINFO("get all blks and call refactor superblk, not support now"); } + LogDev::~LogDev() { THIS_LOGDEV_LOG(INFO, "Logdev stopping id {}", m_logdev_id); HS_LOG_ASSERT((m_pending_flush_size.load() == 0), @@ -146,21 +115,8 @@ void LogDev::stop() { } } - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (auto& [_, store] : m_id_logstore_map) { - store.log_store->stop(); - } - - // trigger a new flush to make sure all pending writes are flushed - flush_under_guard(); - - // after we call stop, we need to do any pending device truncations - truncate(); - m_id_logstore_map.clear(); - if (allow_timer_flush()) { - auto f = stop_timer(); - std::move(f).get(); - } + THIS_LOGDEV_LOG(INFO, "no need to stop logstore in refactor mode, return directly"); + return; } void LogDev::destroy() { @@ -869,7 +825,9 @@ void LogDevMetadata::rollback_super_blk_found(const sisl::byte_view& buf, void* std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::load() { std::vector< std::pair< logstore_id_t, logstore_superblk > > ret_list; + std::vector< std::pair< logstore_id_t, logstore_superblk > > all_list; ret_list.reserve(1024); + all_list.reserve(1024); if (store_capacity()) { m_id_reserver = std::make_unique< sisl::IDReserver >(store_capacity()); } else { @@ -889,6 +847,12 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa m_id_reserver->reserve(idx); ret_list.push_back(std::make_pair<>(idx, store_sb[idx])); ++n; + LOGINFO("Loaded valid logstore superblk for log_dev={}, store_id={} start_lsn={}", m_sb->logdev_id, idx, + store_sb[idx].m_first_seq_num); + all_list.push_back(std::make_pair<>(idx, store_sb[idx])); + } else { + LOGINFO("Found invalid logstore superblk for log_dev={}, store_id={}", m_sb->logdev_id, idx); + all_list.push_back(std::make_pair<>(idx, store_sb[idx])); } ++idx; } @@ -898,9 +862,80 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa m_rollback_info.insert({rec.store_id, rec.idx_range}); } + LOGINFO("call refactor superblk for logdev={}, all_list_size={}, reserved_list_size={}", m_sb->logdev_id, + all_list.size(), ret_list.size()); + refactor_superblk(all_list); + return ret_list; } +void LogDevMetadata::refactor_superblk(const std::vector< std::pair< logstore_id_t, logstore_superblk > >& all_list) { + // increase size if needed + auto nstores = (m_store_info.size() == 0) ? 0u : *m_store_info.rbegin() + 1; + auto req_sz = sizeof(new_logdev_superblk) + (nstores * sizeof(logstore_superblk)); + if (meta_service().is_aligned_buf_needed(req_sz)) { req_sz = sisl::round_up(req_sz, meta_service().align_size()); } + LOGINFO("Refactoring logdev_superblk log_dev={}, current_size={}, required_size={}, nstores={}", m_sb->logdev_id, + m_sb.size(), req_sz, nstores); + if (req_sz != m_sb.size()) { + const auto old_buf = m_sb.raw_buf(); + m_sb.create(req_sz); + logstore_superblk* sb_area = m_sb->get_logstore_superblk(); + std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value()); + std::memcpy(voidptr_cast(m_sb.raw_buf()->bytes()), static_cast< const void* >(old_buf->cbytes()), + std::min(old_buf->size(), m_sb.size())); + } else { + LOGINFO("No need to resize logdev_superblk log_dev={}, skip refactor", m_sb->logdev_id); + return; + } + + LOGINFO("before refactor, the sb is: log_dev={}, num_stores={}, start_dev_offset={}, key_idx={}, flush_mode={}", + m_sb->logdev_id, m_sb->num_stores, m_sb->start_dev_offset, m_sb->key_idx, + static_cast< int >(m_sb->flush_mode)); + + // convert old superblk to new superblk + new_logdev_superblk new_sb(m_sb.get()); + std::memcpy(voidptr_cast(m_sb.raw_buf()->bytes()), static_cast< const void* >(&new_sb), + sizeof(new_logdev_superblk)); + + // initialize all logstore superblks to default value + logstore_superblk* sb_area = + reinterpret_cast< logstore_superblk* >(m_sb.raw_buf()->bytes() + sizeof(new_logdev_superblk)); + uint32_t store_cap = (m_sb.size() - sizeof(new_logdev_superblk)) / sizeof(logstore_superblk); + std::fill_n(sb_area, store_cap, logstore_superblk::default_value()); + + // copy log store superblks + for (const auto& [store_id, store_sb] : all_list) { + HS_REL_ASSERT(logstore_superblk::is_valid(store_sb), + "Refactoring logdev superblk with invalid logstore superblk for store id {}-{}", new_sb.logdev_id, + store_id); + LOGINFO("Refactoring logdev_superblk log_dev={}, store_id={} start_lsn={}", new_sb.logdev_id, store_id, + store_sb.m_first_seq_num); + logstore_superblk::init(sb_area[store_id], store_sb.m_first_seq_num); + } + + // check if refactor is successful + new_logdev_superblk* test_sb = reinterpret_cast< new_logdev_superblk* >(m_sb.raw_buf()->bytes()); + LOGINFO("Verifying refactored logdev_superblk log_dev={}, num_stores={}, start_dev_offset={}, key_idx={}, " + "flush_mode={}", + test_sb->logdev_id, test_sb->num_stores, test_sb->start_dev_offset, test_sb->key_idx, + static_cast< int >(test_sb->flush_mode)); + const logstore_superblk* test_store_sb = + reinterpret_cast< logstore_superblk* >(m_sb.raw_buf()->bytes() + sizeof(new_logdev_superblk)); + for (const auto& [store_id, store_sb] : all_list) { + if (test_store_sb[store_id].m_first_seq_num != store_sb.m_first_seq_num) { + LOGERROR("Refactored logdev superblk verification failed for store id {}, expected is {}, actual is {}", + store_id, store_sb.m_first_seq_num, test_store_sb[store_id].m_first_seq_num); + RELEASE_ASSERT(false, "Refactored logdev superblk verification failed"); + } else { + LOGINFO("Refactored logdev={} superblk verification succeeded for store id {}, lsn={}", test_sb->logdev_id, + store_id, test_store_sb[store_id].m_first_seq_num); + } + } + m_sb.write(); + LOGINFO("Refactored logdev_superblk written to disk, log_dev={}", new_sb.logdev_id); + +} + logstore_id_t LogDevMetadata::reserve_store(bool persist_now) { auto const idx = m_id_reserver->reserve(); // Search the id reserver and alloc an idx; m_store_info.insert(idx); diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 5b18f981b..13db95e67 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -427,6 +428,47 @@ struct logdev_superblk { }; #pragma pack() +#pragma pack(1) +struct new_logdev_superblk { + static constexpr uint32_t LOGDEV_SB_MAGIC{0xDABAF00D}; + static constexpr uint32_t LOGDEV_SB_VERSION{1}; + + uint32_t magic{LOGDEV_SB_MAGIC}; + uint32_t version{LOGDEV_SB_VERSION}; + logdev_id_t logdev_id{0}; + uint32_t num_stores{0}; + uint64_t start_dev_offset{0}; + logid_t key_idx{0}; + flush_mode_t flush_mode; + uuid_t pid{boost::uuids::nil_uuid()}; + + new_logdev_superblk(logdev_superblk* old_sb) : + magic{old_sb->magic}, + version{old_sb->version}, + logdev_id{old_sb->logdev_id}, + num_stores{old_sb->num_stores}, + start_dev_offset{old_sb->start_dev_offset}, + key_idx{old_sb->key_idx}, + flush_mode{old_sb->flush_mode}, + pid{boost::uuids::nil_uuid()} {} + + uint32_t get_magic() const { return magic; } + uint32_t get_version() const { return version; } + off_t start_offset() const { return static_cast< off_t >(start_dev_offset); } + uint32_t num_stores_reserved() const { return num_stores; } + + void set_start_offset(const off_t offset) { start_dev_offset = static_cast< uint64_t >(offset); } + + logstore_superblk* get_logstore_superblk() { + return reinterpret_cast< logstore_superblk* >(reinterpret_cast< uint8_t* >(this) + sizeof(new_logdev_superblk)); + } + const logstore_superblk* get_logstore_superblk() const { + return reinterpret_cast< const logstore_superblk* >(reinterpret_cast< const uint8_t* >(this) + + sizeof(new_logdev_superblk)); + } +}; +#pragma pack() + #pragma pack(1) typedef std::pair< logid_t, logid_t > logid_range_t; @@ -486,6 +528,7 @@ class LogDevMetadata { logdev_superblk* create(logdev_id_t id, flush_mode_t); void reset(); std::vector< std::pair< logstore_id_t, logstore_superblk > > load(); + void refactor_superblk(const std::vector< std::pair< logstore_id_t, logstore_superblk > >& all_list); void persist(); bool is_empty() const { return m_sb.is_empty(); } @@ -599,6 +642,8 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ void start(bool format, std::shared_ptr< JournalVirtualDev > vdev); + void refactor(); + /** * @brief Stop the logdev. it waits for all the pending writes to be completed and reject new api calls. * diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index f4eee9760..3689e96aa 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -413,6 +413,9 @@ nlohmann::json HomeLogStore::get_status(int verbosity) const { logstore_superblk logstore_superblk::default_value() { return logstore_superblk{-1}; } void logstore_superblk::init(logstore_superblk& meta) { meta.m_first_seq_num = 0; } +void logstore_superblk::init(logstore_superblk& meta, logstore_seq_num_t first_seq_num) { + meta.m_first_seq_num = first_seq_num; +} void logstore_superblk::clear(logstore_superblk& meta) { meta.m_first_seq_num = -1; } bool logstore_superblk::is_valid(const logstore_superblk& meta) { return meta.m_first_seq_num >= 0; } diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index abb266101..35bb30866 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -119,6 +119,12 @@ void LogStoreService::start(bool format) { } } +void LogStoreService::refactor() { + for (auto& [id, logdev] : m_id_logdev_map) { + logdev->refactor(); + } +} + void LogStoreService::stop() { start_stopping(); while (true) { diff --git a/src/lib/meta/meta_blk_service.cpp b/src/lib/meta/meta_blk_service.cpp index 1b93dc203..8cfa2f05e 100644 --- a/src/lib/meta/meta_blk_service.cpp +++ b/src/lib/meta/meta_blk_service.cpp @@ -398,6 +398,7 @@ void MetaBlkService::register_handler(meta_sub_type type, const meta_blk_found_c } void MetaBlkService::add_sub_sb(meta_sub_type type, const uint8_t* context_data, uint64_t sz, void*& cookie) { + LOGINFO("Adding sub sb [type={}, sz={}]", type, in_bytes(sz)); std::lock_guard< decltype(m_meta_mtx) > lg(m_meta_mtx); HS_REL_ASSERT_EQ(m_inited, true, "accessing metablk store before init is not allowed."); HS_REL_ASSERT_LT(type.length(), MAX_SUBSYS_TYPE_LEN, "type len: {} should not exceed len: {}", type.length(), @@ -538,6 +539,7 @@ void MetaBlkService::write_meta_blk_to_disk(meta_blk* mblk) { HS_REL_ASSERT(false, "error happens happen during write_meta_blk_to_disk: {}, buf address: {}", error.value(), (const char*)mblk); } + LOGINFO("Successfully write meta blk to disk"); } // @@ -548,6 +550,7 @@ void MetaBlkService::write_meta_blk_to_disk(meta_blk* mblk) { // 3. update in-memory meta blks map; // meta_blk* MetaBlkService::init_meta_blk(BlkId& bid, meta_sub_type type, const uint8_t* context_data, size_t sz) { + LOGINFO("Initializing meta blk [type={}, sz={}]", type, in_bytes(sz)); meta_blk* mblk{r_cast< meta_blk* >(hs_utils::iobuf_alloc(block_size(), sisl::buftag::metablk, align_size()))}; mblk->hdr.h.compressed = 0; mblk->hdr.h.bid = bid; @@ -577,6 +580,9 @@ meta_blk* MetaBlkService::init_meta_blk(BlkId& bid, meta_sub_type type, const ui mblk->hdr.h.next_bid.invalidate(); // write this meta blk to disk + LOGINFO("Writing meta blk to disk [type={}, bid={}, prev_bid={}, next_bid={}, sz={}]", type, + mblk->hdr.h.bid.to_string(), mblk->hdr.h.prev_bid.to_string(), mblk->hdr.h.next_bid.to_string(), + in_bytes(sz)); write_meta_blk_internal(mblk, context_data, sz); // now update previous last mblk or ssb. They can only be updated after meta blk is written to disk; @@ -750,6 +756,9 @@ void MetaBlkService::write_meta_blk_internal(meta_blk* mblk, const uint8_t* cont } // write meta blk; + LOGINFO("Writing meta block to disk, type={}, bid={}, context_sz={}, compressed={}, ovf_bid={}", mblk->hdr.h.type, + mblk->hdr.h.bid, uint64_cast(mblk->hdr.h.context_sz), unsigned(mblk->hdr.h.compressed), + mblk->hdr.h.ovf_bid.to_string()); write_meta_blk_to_disk(mblk); #ifdef _PRERELEASE @@ -802,6 +811,7 @@ void MetaBlkService::_cookie_sanity_check(const void* cookie) const { // 3. free old ovf_bid if there is any // void MetaBlkService::update_sub_sb(const uint8_t* context_data, uint64_t sz, void* cookie) { + LOGINFO("Updating sub super block, context_sz={}", sz); std::lock_guard< decltype(m_meta_mtx) > lg{m_meta_mtx}; HS_REL_ASSERT_EQ(m_inited, true, "accessing metablk store before init is not allowed."); @@ -839,6 +849,7 @@ void MetaBlkService::update_sub_sb(const uint8_t* context_data, uint64_t sz, voi mblk->hdr.h.gen_cnt += 1; // write this meta blk to disk + LOGINFO("Writing updated sub super block, context_sz={}", sz); write_meta_blk_internal(mblk, context_data, sz); #ifdef _PRERELEASE diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 0ccbf8343..a8cd6ae05 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -171,39 +171,8 @@ void RaftReplService::start() { LOGINFO("Starting LogStore service, fist_boot = {}", hs()->is_first_time_boot()); hs()->logstore_service().start(hs()->is_first_time_boot()); LOGINFO("Started LogStore service, log replay should already done till this point"); - // all log stores are replayed, time to start data service. - LOGINFO("Starting DataService"); - hs()->data_service().start(); - - // Step 6: Iterate all the repl devs and ask each one of them to join the raft group concurrently. - std::vector< std::future< bool > > join_group_futures; - for (const auto& [_, repl_dev] : m_rd_map) { - if (repl_dev->get_stage() == repl_dev_stage_t::UNREADY) { - LOGINFO("Repl dev is unready, skip join group, group_id={}", boost::uuids::to_string(repl_dev->group_id())); - continue; - } - join_group_futures.emplace_back(std::async(std::launch::async, [&repl_dev]() { - auto rdev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev); - rdev->wait_for_logstore_ready(); - - // upper layer can register a callback to be notified when log replay is done. - if (auto listener = rdev->get_listener(); listener) listener->on_log_replay_done(rdev->group_id()); - return rdev->join_group(); - })); - } - - for (auto& future : join_group_futures) { - if (!future.get()) HS_REL_ASSERT(false, "FAILED TO JOIN GROUP, PANIC HERE"); - } - // Step 7: Register to CPManager to ensure we can flush the superblk. - hs()->cp_mgr().register_consumer(cp_consumer_t::REPLICATION_SVC, std::make_unique< RaftReplServiceCPHandler >()); - - // Step 8: Start a reaper thread which wakes up time-to-time and fetches pending data or cleans up old requests etc - start_reaper_thread(); - - // Delete any unopened logstores. - hs()->logstore_service().delete_unopened_logdevs(); + LOGINFO("return directly for refactor mode"); } void RaftReplService::stop() { @@ -232,14 +201,11 @@ void RaftReplService::stop() { m_msg_mgr.reset(); hs()->logstore_service().stop(); - hs()->data_service().stop(); + LOGINFO("In refactor mode, data service is not started, so not stopping it"); + return; } -RaftReplService::~RaftReplService() { - stop_reaper_thread(); - - // the base class destructor will clear the m_rd_map -} +RaftReplService::~RaftReplService() { LOGINFO("Destroying RaftReplService"); } void RaftReplService::monitor_cert_changes() { auto fw = ioenvironment.get_file_watcher();