From 5489f2c935e69b12aa93e2f9090691d36e44f357 Mon Sep 17 00:00:00 2001 From: Ryan Zhang Date: Tue, 7 Apr 2026 15:33:01 -0700 Subject: [PATCH 1/2] Split ActivityProfilerController into Sync and Async Handlers (#1269) Summary: Part 1 of a refactor to improve safety and readability around sync/async profiling. This is a straightforward (no-op) split that creates Sync/AsyncActivityProfilerHandler, which contains the methods which belonged to the ActivityProfilerController but are only used for one type of tracing. The original methods in the controller become thin wrappers that forward the request to the appropriate handler. Differential Revision: D92550422 --- libkineto/libkineto_defs.bzl | 2 + libkineto/src/ActivityProfilerController.cpp | 336 ++---------------- libkineto/src/ActivityProfilerController.h | 27 +- .../src/AsyncActivityProfilerHandler.cpp | 274 ++++++++++++++ libkineto/src/AsyncActivityProfilerHandler.h | 61 ++++ libkineto/src/SyncActivityProfilerHandler.cpp | 82 +++++ libkineto/src/SyncActivityProfilerHandler.h | 39 ++ 7 files changed, 503 insertions(+), 318 deletions(-) create mode 100644 libkineto/src/AsyncActivityProfilerHandler.cpp create mode 100644 libkineto/src/AsyncActivityProfilerHandler.h create mode 100644 libkineto/src/SyncActivityProfilerHandler.cpp create mode 100644 libkineto/src/SyncActivityProfilerHandler.h diff --git a/libkineto/libkineto_defs.bzl b/libkineto/libkineto_defs.bzl index 4102776e8..c96240af2 100644 --- a/libkineto/libkineto_defs.bzl +++ b/libkineto/libkineto_defs.bzl @@ -69,6 +69,8 @@ def get_libkineto_cpu_only_srcs(with_api = True): "src/GenericActivityProfiler.cpp", "src/ActivityProfilerController.cpp", "src/ActivityProfilerProxy.cpp", + "src/AsyncActivityProfilerHandler.cpp", + "src/SyncActivityProfilerHandler.cpp", "src/ActivityType.cpp", "src/Config.cpp", "src/ConfigLoader.cpp", diff --git a/libkineto/src/ActivityProfilerController.cpp b/libkineto/src/ActivityProfilerController.cpp index d31213a59..2cc8059fe 100644 --- a/libkineto/src/ActivityProfilerController.cpp +++ b/libkineto/src/ActivityProfilerController.cpp @@ -8,14 +8,10 @@ #include "ActivityProfilerController.h" -#include #include -#include #include #include "ActivityLoggerFactory.h" -#include "ActivityTrace.h" - // TODO DEVICE_AGNOSTIC: Move the device decision out of C++ files to be // determined entirely by the build process. For the // controller, we'll need some registration mechanism. @@ -34,7 +30,6 @@ #endif -#include "ThreadUtil.h" #include "output_json.h" #include "output_membuf.h" @@ -94,21 +89,16 @@ ActivityProfilerController::ActivityProfilerController( // directly by the GenericActivityProfiler. profiler_ = std::make_unique(cpuOnly); #endif + + syncHandler_ = std::make_unique( + *profiler_, syncTraceActive_); + asyncHandler_ = std::make_unique( + *profiler_, syncTraceActive_); configLoader_.addHandler(ConfigLoader::ConfigKind::ActivityProfiler, this); } ActivityProfilerController::~ActivityProfilerController() { configLoader_.removeHandler(ConfigLoader::ConfigKind::ActivityProfiler, this); - for (auto profilerThread : profilerThreads_) { - if (profilerThread) { - // signaling termination of the profiler loop - stopRunloop_ = true; - profilerThread->join(); - delete profilerThread; - profilerThread = nullptr; - } - } - #if !USE_GOOGLE_LOG for (auto& collector : loggerCollectors_) { Logger::removeLoggerObserver(collector.get()); @@ -124,7 +114,7 @@ static ActivityLoggerFactory initLoggerFactory() { return factory; } -static ActivityLoggerFactory& loggerFactory() { +ActivityLoggerFactory& ActivityProfilerController::loggerFactory() { static ActivityLoggerFactory factory = initLoggerFactory(); return factory; } @@ -135,7 +125,8 @@ void ActivityProfilerController::addLoggerFactory( loggerFactory().addProtocol(protocol, std::move(factory)); } -static std::unique_ptr makeLogger(const Config& config) { +std::unique_ptr ActivityProfilerController::makeLogger( + const Config& config) { if (config.activitiesLogToMemory()) { return std::make_unique(config); } @@ -154,266 +145,6 @@ void ActivityProfilerController::setInvariantViolationsLoggerFactory( invariantViolationsLoggerFactory() = factory(); } -bool ActivityProfilerController::canAcceptConfig() { - return !profiler_->isActive(); -} - -void ActivityProfilerController::acceptConfig(const Config& config) { - VLOG(1) << "acceptConfig"; - if (config.activityProfilerEnabled()) { - scheduleTrace(config); - } -} - -bool ActivityProfilerController::shouldActivateTimestampConfig( - const std::chrono::time_point& now) { - if (asyncRequestConfig_->hasProfileStartIteration()) { - return false; - } - if (asyncRequestConfig_->memoryProfilerEnabled()) { - return false; - } - // Note on now + Config::kControllerIntervalMsecs: - // Profiler interval does not align perfectly up to startTime - warmup. - // Waiting until the next tick won't allow sufficient time for the - // profiler to warm up. So check if we are very close to the warmup time - // and trigger warmup. - if (now + Config::kControllerIntervalMsecs >= - (asyncRequestConfig_->requestTimestamp() - - asyncRequestConfig_->activitiesWarmupDuration())) { - LOG(INFO) - << "Received on-demand activity trace request by " - << " profile timestamp = " - << asyncRequestConfig_->requestTimestamp().time_since_epoch().count(); - return true; - } - return false; -} - -bool ActivityProfilerController::shouldActivateIterationConfig( - int64_t currentIter) { - if (!asyncRequestConfig_->hasProfileStartIteration()) { - return false; - } - if (asyncRequestConfig_->memoryProfilerEnabled()) { - return false; - } - auto rootIter = asyncRequestConfig_->startIterationIncludingWarmup(); - // Keep waiting, it is not time to start yet. - if (currentIter < rootIter) { - return false; - } - - LOG(INFO) << "Received on-demand activity trace request by " - " profile start iteration = " - << asyncRequestConfig_->profileStartIteration() - << ", current iteration = " << currentIter; - // Re-calculate the start iter if requested iteration is in the past. - if (currentIter > rootIter) { - auto newProfileStart = - currentIter + asyncRequestConfig_->activitiesWarmupIterations(); - // Use Start Iteration Round Up if it is present. - if (asyncRequestConfig_->profileStartIterationRoundUp() > 0) { - // round up to nearest multiple - auto divisor = asyncRequestConfig_->profileStartIterationRoundUp(); - auto rem = newProfileStart % divisor; - newProfileStart += ((rem == 0) ? 0 : divisor - rem); - LOG(INFO) << "Rounding up profiler start iteration to : " - << newProfileStart; - asyncRequestConfig_->setProfileStartIteration(newProfileStart); - if (currentIter != asyncRequestConfig_->startIterationIncludingWarmup()) { - // Ex. Current 9, start 8, warmup 5, roundup 100. Resolves new start - // to 100, with warmup starting at 95. So don't start now. - return false; - } - } else { - LOG(INFO) << "Start iteration updated to : " << newProfileStart; - asyncRequestConfig_->setProfileStartIteration(newProfileStart); - } - } - return true; -} - -void ActivityProfilerController::profilerLoop() { - setThreadName("Kineto Activity Profiler"); - VLOG(0) << "Entering activity profiler loop"; - - auto now = system_clock::now(); - auto next_wakeup_time = now + Config::kControllerIntervalMsecs; - - while (!stopRunloop_) { - now = system_clock::now(); - - while (now < next_wakeup_time) { - /* sleep override */ - std::this_thread::sleep_for(next_wakeup_time - now); - now = system_clock::now(); - } - - // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_->isActive()) { - std::lock_guard lock(asyncConfigLock_); - if (asyncRequestConfig_ && !profiler_->isActive() && - shouldActivateTimestampConfig(now)) { - activateConfig(now); - } - } - - while (next_wakeup_time < now) { - next_wakeup_time += Config::kControllerIntervalMsecs; - } - - // Use syncTraceActive_ so we don't step into the loop while sync trace is - // running - if (profiler_->isActive() && !profiler_->isCollectingMemorySnapshot() && - !syncTraceActive_) { - next_wakeup_time = profiler_->performRunLoopStep(now, next_wakeup_time); - VLOG(1) << "Profiler loop: " - << duration_cast(system_clock::now() - now).count() - << "ms"; - } - } - - VLOG(0) << "Exited activity profiling loop"; -} - -void ActivityProfilerController::memoryProfilerLoop() { - while (!stopRunloop_) { - // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_->isActive()) { - std::lock_guard lock(asyncConfigLock_); - if (asyncRequestConfig_ && !profiler_->isActive() && - asyncRequestConfig_->memoryProfilerEnabled()) { - logger_ = makeLogger(*asyncRequestConfig_); - auto path = asyncRequestConfig_->activitiesLogFile(); - auto profile_time = asyncRequestConfig_->profileMemoryDuration(); - auto config = asyncRequestConfig_->clone(); - asyncRequestConfig_ = nullptr; - profiler_->performMemoryLoop( - path, profile_time, logger_.get(), *config); - } - } - } -} - -void ActivityProfilerController::step() { - // Do not remove this copy to currentIter. Otherwise count is not - // guaranteed. - int64_t currentIter = ++iterationCount_; - VLOG(0) << "Step called , iteration = " << currentIter; - - // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_->isActive()) { - std::lock_guard lock(asyncConfigLock_); - auto now = system_clock::now(); - if (asyncRequestConfig_ && !profiler_->isActive() && - shouldActivateIterationConfig(currentIter)) { - activateConfig(now); - } - } - if (profiler_->isActive() && !profiler_->isCollectingMemorySnapshot()) { - auto now = system_clock::now(); - auto next_wakeup_time = now + Config::kControllerIntervalMsecs; - profiler_->performRunLoopStep(now, next_wakeup_time, currentIter); - } -} - -// This function should only be called when holding the configLock_. -void ActivityProfilerController::activateConfig( - std::chrono::time_point now) { - logger_ = makeLogger(*asyncRequestConfig_); - profiler_->setLogger(logger_.get()); - LOGGER_OBSERVER_SET_TRIGGER_ON_DEMAND(); - profiler_->configure(*asyncRequestConfig_, now); - asyncRequestConfig_ = nullptr; -} - -void ActivityProfilerController::scheduleTrace(const Config& config) { - VLOG(1) << "scheduleTrace"; - if (profiler_->isActive()) { - LOG(WARNING) << "Ignored request - profiler busy"; - return; - } - - int64_t currentIter = iterationCount_; - std::unique_ptr configToSchedule; - - if (config.hasProfileStartIteration() && currentIter < 0) { - // Special case: daemon config with activitiesDuration set - if (config.activitiesDuration().count() > 0) { - LOG(INFO) << "Config with duration-based profiling, " - << "ignoring iteration count requirement"; - // Continue with modified config - clone and set profileStartIteration to - // -1 - configToSchedule = config.clone(); - configToSchedule->setProfileStartIteration(-1); - } else { - LOG(WARNING) << "Ignored profile iteration count based request as " - << "application is not updating iteration count"; - return; - } - } else { - configToSchedule = config.clone(); - } - - // Common scheduling logic - bool newConfigScheduled = false; - if (!asyncRequestConfig_) { - std::lock_guard lock(asyncConfigLock_); - if (!asyncRequestConfig_) { - asyncRequestConfig_ = std::move(configToSchedule); - newConfigScheduled = true; - } - } - if (!newConfigScheduled) { - LOG(WARNING) << "Ignored request - another profile request is pending."; - return; - } - - // start a profilerLoop() thread to handle request - - if (config.memoryProfilerEnabled()) { - auto thread_type = ThreadType::MEMORY_SNAPSHOT; - if (!profilerThreads_[thread_type]) { - profilerThreads_[thread_type] = new std::thread( - &ActivityProfilerController::memoryProfilerLoop, this); - } - } else { - auto thread_type = ThreadType::KINETO; - if (!profilerThreads_[thread_type]) { - profilerThreads_[thread_type] = - new std::thread(&ActivityProfilerController::profilerLoop, this); - } - } -} - -void ActivityProfilerController::prepareTrace(const Config& config) { - // Requests from ActivityProfilerApi have higher priority than - // requests from other sources (signal, daemon). - // Cancel any ongoing request and refuse new ones. - auto now = system_clock::now(); - syncTraceActive_ = true; - if (profiler_->isActive()) { - LOG(WARNING) << "Cancelling current trace request in order to start " - << "higher priority synchronous request"; - if (libkineto::api().client()) { - libkineto::api().client()->stop(); - } - profiler_->stopTrace(now); - profiler_->reset(); - } - - profiler_->configure(config, now); -} - -void ActivityProfilerController::toggleCollectionDynamic(const bool enable) { - profiler_->toggleCollectionDynamic(enable); -} - -void ActivityProfilerController::startTrace() { - UST_LOGGER_MARK_COMPLETED(kWarmUpStage); - profiler_->startTrace(std::chrono::system_clock::now()); -} bool ActivityProfilerController::isActive() { return profiler_->isActive(); } @@ -448,28 +179,6 @@ void ActivityProfilerController::popUserCorrelationId() { profiler_->popUserCorrelationId(); } -std::unique_ptr ActivityProfilerController:: - stopTrace() { - profiler_->stopTrace(std::chrono::system_clock::now()); - UST_LOGGER_MARK_COMPLETED(kCollectionStage); - auto logger = std::make_unique(profiler_->config()); - profiler_->processTrace(*logger); - // Will follow up with another patch for logging URLs when ActivityTrace - // is moved. - UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); - - // Logger Metadata contains a map of LOGs collected in Kineto - // logger_level -> List of log lines - // This will be added into the trace as metadata. - std::unordered_map> loggerMD = - profiler_->getLoggerMetadata(); - logger->setLoggerMetadata(std::move(loggerMD)); - - profiler_->reset(); - syncTraceActive_ = false; - return std::make_unique(std::move(logger), loggerFactory()); -} - void ActivityProfilerController::addMetadata( const std::string& key, const std::string& value) { @@ -487,4 +196,33 @@ void ActivityProfilerController::logInvariantViolation( } } +// Async-only functions +bool ActivityProfilerController::canAcceptConfig() { + return asyncHandler_->canAcceptConfig(); +} +void ActivityProfilerController::acceptConfig(const Config& config) { + asyncHandler_->acceptConfig(config); +} +void ActivityProfilerController::scheduleTrace(const Config& config) { + asyncHandler_->scheduleTrace(config); +} +void ActivityProfilerController::step() { + asyncHandler_->step(); +} + +// Sync-only functions +void ActivityProfilerController::prepareTrace(const Config& config) { + syncHandler_->prepareTrace(config); +} +void ActivityProfilerController::toggleCollectionDynamic(const bool enable) { + syncHandler_->toggleCollectionDynamic(enable); +} +void ActivityProfilerController::startTrace() { + syncHandler_->startTrace(); +} +std::unique_ptr ActivityProfilerController:: + stopTrace() { + return syncHandler_->stopTrace(); +} + } // namespace KINETO_NAMESPACE diff --git a/libkineto/src/ActivityProfilerController.h b/libkineto/src/ActivityProfilerController.h index c31c1cfcd..0de897737 100644 --- a/libkineto/src/ActivityProfilerController.h +++ b/libkineto/src/ActivityProfilerController.h @@ -12,8 +12,6 @@ #include #include #include -#include -#include #include // TODO(T90238193) @@ -21,17 +19,14 @@ #include "ActivityLoggerFactory.h" #include "ActivityProfilerInterface.h" #include "ActivityTraceInterface.h" +#include "AsyncActivityProfilerHandler.h" #include "ConfigLoader.h" #include "GenericActivityProfiler.h" #include "InvariantViolations.h" #include "LoggerCollector.h" +#include "SyncActivityProfilerHandler.h" namespace KINETO_NAMESPACE { -enum ThreadType { - KINETO = 0, - MEMORY_SNAPSHOT, - THREAD_MAX_COUNT // Number of enum entries (used for array sizing) -}; class Config; @@ -88,24 +83,18 @@ class ActivityProfilerController : public ConfigLoader::ConfigHandler { void popUserCorrelationId(); - private: - bool shouldActivateIterationConfig(int64_t currentIter); - bool shouldActivateTimestampConfig(const std::chrono::time_point& now); - void profilerLoop(); - void memoryProfilerLoop(); - void activateConfig(std::chrono::time_point now); + static std::unique_ptr makeLogger(const Config& config); - std::unique_ptr asyncRequestConfig_; - std::mutex asyncConfigLock_; + static ActivityLoggerFactory& loggerFactory(); + private: std::unique_ptr profiler_; - std::unique_ptr logger_; std::vector> loggerCollectors_; - std::thread* profilerThreads_[ThreadType::THREAD_MAX_COUNT] = {nullptr}; - std::atomic_bool stopRunloop_{false}; std::atomic_bool syncTraceActive_{false}; - std::atomic iterationCount_{-1}; ConfigLoader& configLoader_; + + std::unique_ptr syncHandler_; + std::unique_ptr asyncHandler_; }; } // namespace KINETO_NAMESPACE diff --git a/libkineto/src/AsyncActivityProfilerHandler.cpp b/libkineto/src/AsyncActivityProfilerHandler.cpp new file mode 100644 index 000000000..7f6f2eb41 --- /dev/null +++ b/libkineto/src/AsyncActivityProfilerHandler.cpp @@ -0,0 +1,274 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "AsyncActivityProfilerHandler.h" + +#include + +#include "ActivityProfilerController.h" +#include "Config.h" +#include "GenericActivityProfiler.h" +#include "Logger.h" +#include "ThreadUtil.h" +#include "libkineto.h" +#include "output_membuf.h" + +using namespace std::chrono; + +namespace KINETO_NAMESPACE { + +AsyncActivityProfilerHandler::AsyncActivityProfilerHandler( + GenericActivityProfiler& profiler, + std::atomic_bool& syncTraceActive) + : profiler_(profiler), syncTraceActive_(syncTraceActive) {} + +AsyncActivityProfilerHandler::~AsyncActivityProfilerHandler() { + for (auto profilerThread : profilerThreads_) { + if (profilerThread) { + // signaling termination of the profiler loop + stopRunloop_ = true; + profilerThread->join(); + delete profilerThread; + profilerThread = nullptr; + } + } +} + +bool AsyncActivityProfilerHandler::canAcceptConfig() { + return !profiler_.isActive(); +} + +void AsyncActivityProfilerHandler::acceptConfig(const Config& config) { + VLOG(1) << "acceptConfig"; + if (config.activityProfilerEnabled()) { + scheduleTrace(config); + } +} + +void AsyncActivityProfilerHandler::scheduleTrace(const Config& config) { + VLOG(1) << "scheduleTrace"; + if (profiler_.isActive()) { + LOG(WARNING) << "Ignored request - profiler busy"; + return; + } + + int64_t currentIter = iterationCount_; + std::unique_ptr configToSchedule; + + if (config.hasProfileStartIteration() && currentIter < 0) { + // Special case: daemon config with activitiesDuration set + if (config.activitiesDuration().count() > 0) { + LOG(INFO) << "Config with duration-based profiling, " + << "ignoring iteration count requirement"; + // Continue with modified config - clone and set profileStartIteration to + // -1 + configToSchedule = config.clone(); + configToSchedule->setProfileStartIteration(-1); + } else { + LOG(WARNING) << "Ignored profile iteration count based request as " + << "application is not updating iteration count"; + return; + } + } else { + configToSchedule = config.clone(); + } + + // Common scheduling logic + bool newConfigScheduled = false; + if (!asyncRequestConfig_) { + std::lock_guard lock(asyncConfigLock_); + if (!asyncRequestConfig_) { + asyncRequestConfig_ = std::move(configToSchedule); + newConfigScheduled = true; + } + } + if (!newConfigScheduled) { + LOG(WARNING) << "Ignored request - another profile request is pending."; + return; + } + + // start a profilerLoop() thread to handle request + + if (config.memoryProfilerEnabled()) { + auto thread_type = ThreadType::MEMORY_SNAPSHOT; + if (!profilerThreads_[thread_type]) { + profilerThreads_[thread_type] = new std::thread( + &AsyncActivityProfilerHandler::memoryProfilerLoop, this); + } + } else { + auto thread_type = ThreadType::KINETO; + if (!profilerThreads_[thread_type]) { + profilerThreads_[thread_type] = + new std::thread(&AsyncActivityProfilerHandler::profilerLoop, this); + } + } +} + +void AsyncActivityProfilerHandler::step() { + // Do not remove this copy to currentIter. Otherwise count is not + // guaranteed. + int64_t currentIter = ++iterationCount_; + VLOG(0) << "Step called , iteration = " << currentIter; + + // Perform Double-checked locking to reduce overhead of taking lock. + if (asyncRequestConfig_ && !profiler_.isActive()) { + std::lock_guard lock(asyncConfigLock_); + auto now = system_clock::now(); + if (asyncRequestConfig_ && !profiler_.isActive() && + shouldActivateIterationConfig(currentIter)) { + activateConfig(now); + } + } + if (profiler_.isActive() && !profiler_.isCollectingMemorySnapshot()) { + auto now = system_clock::now(); + auto next_wakeup_time = now + Config::kControllerIntervalMsecs; + profiler_.performRunLoopStep(now, next_wakeup_time, currentIter); + } +} + +bool AsyncActivityProfilerHandler::shouldActivateTimestampConfig( + const std::chrono::time_point& now) { + if (asyncRequestConfig_->hasProfileStartIteration()) { + return false; + } + if (asyncRequestConfig_->memoryProfilerEnabled()) { + return false; + } + // Note on now + Config::kControllerIntervalMsecs: + // Profiler interval does not align perfectly up to startTime - warmup. + // Waiting until the next tick won't allow sufficient time for the + // profiler to warm up. So check if we are very close to the warmup time + // and trigger warmup. + if (now + Config::kControllerIntervalMsecs >= + (asyncRequestConfig_->requestTimestamp() - + asyncRequestConfig_->activitiesWarmupDuration())) { + LOG(INFO) + << "Received on-demand activity trace request by " + << " profile timestamp = " + << asyncRequestConfig_->requestTimestamp().time_since_epoch().count(); + return true; + } + return false; +} + +bool AsyncActivityProfilerHandler::shouldActivateIterationConfig( + int64_t currentIter) { + if (!asyncRequestConfig_->hasProfileStartIteration()) { + return false; + } + if (asyncRequestConfig_->memoryProfilerEnabled()) { + return false; + } + auto rootIter = asyncRequestConfig_->startIterationIncludingWarmup(); + // Keep waiting, it is not time to start yet. + if (currentIter < rootIter) { + return false; + } + + LOG(INFO) << "Received on-demand activity trace request by " + " profile start iteration = " + << asyncRequestConfig_->profileStartIteration() + << ", current iteration = " << currentIter; + // Re-calculate the start iter if requested iteration is in the past. + if (currentIter > rootIter) { + auto newProfileStart = + currentIter + asyncRequestConfig_->activitiesWarmupIterations(); + // Use Start Iteration Round Up if it is present. + if (asyncRequestConfig_->profileStartIterationRoundUp() > 0) { + // round up to nearest multiple + auto divisor = asyncRequestConfig_->profileStartIterationRoundUp(); + auto rem = newProfileStart % divisor; + newProfileStart += ((rem == 0) ? 0 : divisor - rem); + LOG(INFO) << "Rounding up profiler start iteration to : " + << newProfileStart; + asyncRequestConfig_->setProfileStartIteration(newProfileStart); + if (currentIter != asyncRequestConfig_->startIterationIncludingWarmup()) { + // Ex. Current 9, start 8, warmup 5, roundup 100. Resolves new start + // to 100, with warmup starting at 95. So don't start now. + return false; + } + } else { + LOG(INFO) << "Start iteration updated to : " << newProfileStart; + asyncRequestConfig_->setProfileStartIteration(newProfileStart); + } + } + return true; +} + +void AsyncActivityProfilerHandler::profilerLoop() { + setThreadName("Kineto Activity Profiler"); + VLOG(0) << "Entering activity profiler loop"; + + auto now = system_clock::now(); + auto next_wakeup_time = now + Config::kControllerIntervalMsecs; + + while (!stopRunloop_) { + now = system_clock::now(); + + while (now < next_wakeup_time) { + /* sleep override */ + std::this_thread::sleep_for(next_wakeup_time - now); + now = system_clock::now(); + } + + // Perform Double-checked locking to reduce overhead of taking lock. + if (asyncRequestConfig_ && !profiler_.isActive()) { + std::lock_guard lock(asyncConfigLock_); + if (asyncRequestConfig_ && !profiler_.isActive() && + shouldActivateTimestampConfig(now)) { + activateConfig(now); + } + } + + while (next_wakeup_time < now) { + next_wakeup_time += Config::kControllerIntervalMsecs; + } + + // Use syncTraceActive_ so we don't step into the loop while sync trace is + // running + if (profiler_.isActive() && !profiler_.isCollectingMemorySnapshot() && + !syncTraceActive_) { + next_wakeup_time = profiler_.performRunLoopStep(now, next_wakeup_time); + VLOG(1) << "Profiler loop: " + << duration_cast(system_clock::now() - now).count() + << "ms"; + } + } + + VLOG(0) << "Exited activity profiling loop"; +} + +void AsyncActivityProfilerHandler::memoryProfilerLoop() { + while (!stopRunloop_) { + // Perform Double-checked locking to reduce overhead of taking lock. + if (asyncRequestConfig_ && !profiler_.isActive()) { + std::lock_guard lock(asyncConfigLock_); + if (asyncRequestConfig_ && !profiler_.isActive() && + asyncRequestConfig_->memoryProfilerEnabled()) { + logger_ = ActivityProfilerController::makeLogger(*asyncRequestConfig_); + auto path = asyncRequestConfig_->activitiesLogFile(); + auto profile_time = asyncRequestConfig_->profileMemoryDuration(); + auto config = asyncRequestConfig_->clone(); + asyncRequestConfig_ = nullptr; + profiler_.performMemoryLoop(path, profile_time, logger_.get(), *config); + } + } + } +} + +// This function should only be called when holding the configLock_. +void AsyncActivityProfilerHandler::activateConfig( + std::chrono::time_point now) { + logger_ = ActivityProfilerController::makeLogger(*asyncRequestConfig_); + profiler_.setLogger(logger_.get()); + LOGGER_OBSERVER_SET_TRIGGER_ON_DEMAND(); + profiler_.configure(*asyncRequestConfig_, now); + asyncRequestConfig_ = nullptr; +} + +} // namespace KINETO_NAMESPACE diff --git a/libkineto/src/AsyncActivityProfilerHandler.h b/libkineto/src/AsyncActivityProfilerHandler.h new file mode 100644 index 000000000..e83b9f1e3 --- /dev/null +++ b/libkineto/src/AsyncActivityProfilerHandler.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "ActivityLoggerFactory.h" +#include "GenericActivityProfiler.h" + +namespace KINETO_NAMESPACE { + +enum ThreadType { + KINETO = 0, + MEMORY_SNAPSHOT, + THREAD_MAX_COUNT // Number of enum entries (used for array sizing) +}; + +class Config; + +class AsyncActivityProfilerHandler { + public: + explicit AsyncActivityProfilerHandler(GenericActivityProfiler& profiler, std::atomic_bool& syncTraceActive); + + AsyncActivityProfilerHandler(const AsyncActivityProfilerHandler&) = delete; + AsyncActivityProfilerHandler& operator=(const AsyncActivityProfilerHandler&) = delete; + + ~AsyncActivityProfilerHandler(); + + bool canAcceptConfig(); + void acceptConfig(const Config& config); + void scheduleTrace(const Config& config); + void step(); + + private: + bool shouldActivateIterationConfig(int64_t currentIter); + bool shouldActivateTimestampConfig(const std::chrono::time_point& now); + void profilerLoop(); + void memoryProfilerLoop(); + void activateConfig(std::chrono::time_point now); + + std::unique_ptr asyncRequestConfig_; + std::mutex asyncConfigLock_; + std::thread* profilerThreads_[ThreadType::THREAD_MAX_COUNT] = {nullptr}; + std::atomic_bool stopRunloop_{false}; + std::atomic iterationCount_{-1}; + + GenericActivityProfiler& profiler_; + std::atomic_bool& syncTraceActive_; + std::unique_ptr logger_; +}; +} // namespace KINETO_NAMESPACE diff --git a/libkineto/src/SyncActivityProfilerHandler.cpp b/libkineto/src/SyncActivityProfilerHandler.cpp new file mode 100644 index 000000000..81dc19d94 --- /dev/null +++ b/libkineto/src/SyncActivityProfilerHandler.cpp @@ -0,0 +1,82 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "SyncActivityProfilerHandler.h" + +#include + +#include "ActivityProfilerController.h" +#include "ActivityTrace.h" +#include "Config.h" +#include "GenericActivityProfiler.h" +#include "Logger.h" +#include "libkineto.h" +#include "output_membuf.h" + +using namespace std::chrono; + +namespace KINETO_NAMESPACE { + +SyncActivityProfilerHandler::SyncActivityProfilerHandler( + GenericActivityProfiler& profiler, + std::atomic_bool& syncTraceActive) + : profiler_(profiler), syncTraceActive_(syncTraceActive) {} + +void SyncActivityProfilerHandler::prepareTrace(const Config& config) { + // Requests from ActivityProfilerApi have higher priority than + // requests from other sources (signal, daemon). + // Cancel any ongoing request and refuse new ones. + auto now = system_clock::now(); + syncTraceActive_ = true; + if (profiler_.isActive()) { + LOG(WARNING) << "Cancelling current trace request in order to start " + << "higher priority synchronous request"; + if (libkineto::api().client()) { + libkineto::api().client()->stop(); + } + + profiler_.stopTrace(now); + profiler_.reset(); + } + + profiler_.configure(config, now); +} + +void SyncActivityProfilerHandler::startTrace() { + UST_LOGGER_MARK_COMPLETED(kWarmUpStage); + profiler_.startTrace(std::chrono::system_clock::now()); +} + +std::unique_ptr SyncActivityProfilerHandler:: + stopTrace() { + profiler_.stopTrace(std::chrono::system_clock::now()); + UST_LOGGER_MARK_COMPLETED(kCollectionStage); + auto logger = std::make_unique(profiler_.config()); + profiler_.processTrace(*logger); + // Will follow up with another patch for logging URLs when ActivityTrace + // is moved. + UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); + + // Logger Metadata contains a map of LOGs collected in Kineto + // logger_level -> List of log lines + // This will be added into the trace as metadata. + std::unordered_map> loggerMD = + profiler_.getLoggerMetadata(); + logger->setLoggerMetadata(std::move(loggerMD)); + + profiler_.reset(); + syncTraceActive_ = false; + return std::make_unique( + std::move(logger), ActivityProfilerController::loggerFactory()); +} + +void SyncActivityProfilerHandler::toggleCollectionDynamic(const bool enable) { + profiler_.toggleCollectionDynamic(enable); +} + +} // namespace KINETO_NAMESPACE diff --git a/libkineto/src/SyncActivityProfilerHandler.h b/libkineto/src/SyncActivityProfilerHandler.h new file mode 100644 index 000000000..969187613 --- /dev/null +++ b/libkineto/src/SyncActivityProfilerHandler.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +#include "ActivityTraceInterface.h" +#include "GenericActivityProfiler.h" + +namespace KINETO_NAMESPACE { + +class Config; + +class SyncActivityProfilerHandler { + public: + explicit SyncActivityProfilerHandler(GenericActivityProfiler& profiler, std::atomic_bool& syncTraceActive); + + SyncActivityProfilerHandler(const SyncActivityProfilerHandler&) = delete; + SyncActivityProfilerHandler& operator=(const SyncActivityProfilerHandler&) = delete; + + ~SyncActivityProfilerHandler() = default; + + void prepareTrace(const Config& config); + void toggleCollectionDynamic(const bool enable); + void startTrace(); + std::unique_ptr stopTrace(); + + private: + GenericActivityProfiler& profiler_; + std::atomic_bool& syncTraceActive_; +}; +} // namespace KINETO_NAMESPACE From 27c0a1e0a0cfb9dd70bafe0c6897faaa14dec80a Mon Sep 17 00:00:00 2001 From: Ryan Zhang Date: Tue, 7 Apr 2026 15:33:01 -0700 Subject: [PATCH 2/2] Move state machine from profiler into async handler Summary: Part 2 of a refactor to improve safety and readability around sync/async profiling. Sync profiling currently only uses the profiler state machine to check whether `isActive()` is true. However that doesn't tell us if sync or async is the one that actually owns the profiler at that point, which can lead to edge cases when we make decisions about whether or not to proceed based on the profiler state. See the example below. Since the state machine behavior is only relevant for async tracing, we move the related functions from the `GenericActivityProfiler` into the Async handler. The `GenericActivityProfiler` just becomes a set of functions used to drive profiling, without needing to worry about transitions. Each handler owns their own `isActive()` state, and `ActivityProfilerController` makes decisions about how to handle scheduling between sync and async. In particular, we add a `cancel()` function both handlers so the controller can make sure both handlers are in a good state before preempting. For the async trace, `cancel()` will also clear the async config and move the state back to `WaitForRequest`. This change makes ownership explicit, which should hopefully make reasoning about preemptions from the controller perspective easier. Differential Revision: D93475197 --- libkineto/src/ActivityProfilerController.cpp | 18 +- .../src/AsyncActivityProfilerHandler.cpp | 239 +++++++++- libkineto/src/AsyncActivityProfilerHandler.h | 44 +- libkineto/src/CuptiActivityProfiler.cpp | 4 +- libkineto/src/CuptiActivityProfiler.h | 4 +- libkineto/src/GenericActivityProfiler.cpp | 258 ++--------- libkineto/src/GenericActivityProfiler.h | 102 +++-- libkineto/src/RocmActivityProfiler.cpp | 4 +- libkineto/src/RocmActivityProfiler.h | 4 +- libkineto/src/SyncActivityProfilerHandler.cpp | 33 +- libkineto/src/SyncActivityProfilerHandler.h | 6 + .../test/AsyncActivityProfilerHandlerTest.cpp | 422 ++++++++++++++++++ libkineto/test/CuptiActivityProfilerTest.cpp | 352 --------------- libkineto/test/RocmActivityProfilerTest.cpp | 3 - 14 files changed, 813 insertions(+), 680 deletions(-) create mode 100644 libkineto/test/AsyncActivityProfilerHandlerTest.cpp diff --git a/libkineto/src/ActivityProfilerController.cpp b/libkineto/src/ActivityProfilerController.cpp index 2cc8059fe..a3aa51267 100644 --- a/libkineto/src/ActivityProfilerController.cpp +++ b/libkineto/src/ActivityProfilerController.cpp @@ -146,7 +146,7 @@ void ActivityProfilerController::setInvariantViolationsLoggerFactory( } bool ActivityProfilerController::isActive() { - return profiler_->isActive(); + return syncHandler_->isSyncActive() || asyncHandler_->isAsyncActive(); } void ActivityProfilerController::transferCpuTrace( @@ -198,12 +198,20 @@ void ActivityProfilerController::logInvariantViolation( // Async-only functions bool ActivityProfilerController::canAcceptConfig() { - return asyncHandler_->canAcceptConfig(); + return !isActive(); } void ActivityProfilerController::acceptConfig(const Config& config) { + if (isActive()) { + LOG(WARNING) << "Ignored request - profiler busy"; + return; + } asyncHandler_->acceptConfig(config); } void ActivityProfilerController::scheduleTrace(const Config& config) { + if (isActive()) { + LOG(WARNING) << "Ignored request - profiler busy"; + return; + } asyncHandler_->scheduleTrace(config); } void ActivityProfilerController::step() { @@ -212,6 +220,12 @@ void ActivityProfilerController::step() { // Sync-only functions void ActivityProfilerController::prepareTrace(const Config& config) { + // Sync-trace requests preempt any active trace. + asyncHandler_->cancel(); + if (syncHandler_->isSyncActive()) { + syncHandler_->cancel(); + } + syncHandler_->prepareTrace(config); } void ActivityProfilerController::toggleCollectionDynamic(const bool enable) { diff --git a/libkineto/src/AsyncActivityProfilerHandler.cpp b/libkineto/src/AsyncActivityProfilerHandler.cpp index 7f6f2eb41..5067a29c8 100644 --- a/libkineto/src/AsyncActivityProfilerHandler.cpp +++ b/libkineto/src/AsyncActivityProfilerHandler.cpp @@ -28,6 +28,7 @@ AsyncActivityProfilerHandler::AsyncActivityProfilerHandler( : profiler_(profiler), syncTraceActive_(syncTraceActive) {} AsyncActivityProfilerHandler::~AsyncActivityProfilerHandler() { + ensureCollectTraceDone(); for (auto profilerThread : profilerThreads_) { if (profilerThread) { // signaling termination of the profiler loop @@ -39,10 +40,6 @@ AsyncActivityProfilerHandler::~AsyncActivityProfilerHandler() { } } -bool AsyncActivityProfilerHandler::canAcceptConfig() { - return !profiler_.isActive(); -} - void AsyncActivityProfilerHandler::acceptConfig(const Config& config) { VLOG(1) << "acceptConfig"; if (config.activityProfilerEnabled()) { @@ -52,10 +49,6 @@ void AsyncActivityProfilerHandler::acceptConfig(const Config& config) { void AsyncActivityProfilerHandler::scheduleTrace(const Config& config) { VLOG(1) << "scheduleTrace"; - if (profiler_.isActive()) { - LOG(WARNING) << "Ignored request - profiler busy"; - return; - } int64_t currentIter = iterationCount_; std::unique_ptr configToSchedule; @@ -116,18 +109,18 @@ void AsyncActivityProfilerHandler::step() { VLOG(0) << "Step called , iteration = " << currentIter; // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_.isActive()) { + if (asyncRequestConfig_ && !isAsyncActive()) { std::lock_guard lock(asyncConfigLock_); auto now = system_clock::now(); - if (asyncRequestConfig_ && !profiler_.isActive() && + if (asyncRequestConfig_ && !isAsyncActive() && shouldActivateIterationConfig(currentIter)) { activateConfig(now); } } - if (profiler_.isActive() && !profiler_.isCollectingMemorySnapshot()) { + if (isAsyncActive() && !isCollectingMemorySnapshot()) { auto now = system_clock::now(); auto next_wakeup_time = now + Config::kControllerIntervalMsecs; - profiler_.performRunLoopStep(now, next_wakeup_time, currentIter); + performRunLoopStep(now, next_wakeup_time, currentIter); } } @@ -217,9 +210,9 @@ void AsyncActivityProfilerHandler::profilerLoop() { } // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_.isActive()) { + if (asyncRequestConfig_ && !isAsyncActive()) { std::lock_guard lock(asyncConfigLock_); - if (asyncRequestConfig_ && !profiler_.isActive() && + if (asyncRequestConfig_ && !isAsyncActive() && shouldActivateTimestampConfig(now)) { activateConfig(now); } @@ -231,9 +224,8 @@ void AsyncActivityProfilerHandler::profilerLoop() { // Use syncTraceActive_ so we don't step into the loop while sync trace is // running - if (profiler_.isActive() && !profiler_.isCollectingMemorySnapshot() && - !syncTraceActive_) { - next_wakeup_time = profiler_.performRunLoopStep(now, next_wakeup_time); + if (isAsyncActive() && !isCollectingMemorySnapshot() && !syncTraceActive_) { + next_wakeup_time = performRunLoopStep(now, next_wakeup_time); VLOG(1) << "Profiler loop: " << duration_cast(system_clock::now() - now).count() << "ms"; @@ -246,29 +238,228 @@ void AsyncActivityProfilerHandler::profilerLoop() { void AsyncActivityProfilerHandler::memoryProfilerLoop() { while (!stopRunloop_) { // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_.isActive()) { + if (asyncRequestConfig_ && !isAsyncActive()) { std::lock_guard lock(asyncConfigLock_); - if (asyncRequestConfig_ && !profiler_.isActive() && + if (asyncRequestConfig_ && !isAsyncActive() && asyncRequestConfig_->memoryProfilerEnabled()) { logger_ = ActivityProfilerController::makeLogger(*asyncRequestConfig_); auto path = asyncRequestConfig_->activitiesLogFile(); auto profile_time = asyncRequestConfig_->profileMemoryDuration(); auto config = asyncRequestConfig_->clone(); asyncRequestConfig_ = nullptr; - profiler_.performMemoryLoop(path, profile_time, logger_.get(), *config); + performMemoryLoop(path, profile_time, logger_.get(), *config); } } } } -// This function should only be called when holding the configLock_. -void AsyncActivityProfilerHandler::activateConfig( +void AsyncActivityProfilerHandler::configure( + const Config& config, std::chrono::time_point now) { - logger_ = ActivityProfilerController::makeLogger(*asyncRequestConfig_); + if (!profiler_.canStart(config, now)) { + return; + } + logger_ = ActivityProfilerController::makeLogger(config); profiler_.setLogger(logger_.get()); LOGGER_OBSERVER_SET_TRIGGER_ON_DEMAND(); - profiler_.configure(*asyncRequestConfig_, now); + profiler_.configure(config, now); + currentRunloopState_ = RunloopState::Warmup; +} + +// This function should only be called when holding the configLock_. +void AsyncActivityProfilerHandler::activateConfig( + std::chrono::time_point now) { + configure(*asyncRequestConfig_, now); asyncRequestConfig_ = nullptr; } +time_point AsyncActivityProfilerHandler::performRunLoopStep( + const time_point& now, + const time_point& nextWakeupTime, + int64_t currentIter) { + auto new_wakeup_time = nextWakeupTime; + + VLOG_IF(1, currentIter >= 0) + << "Run loop on application step(), iteration = " << currentIter; + + switch (currentRunloopState_) { + case RunloopState::CollectMemorySnapshot: + LOG(WARNING) + << "Entered CollectMemorySnapshot in Kineto Loop Step, skipping loop"; + break; + case RunloopState::WaitForRequest: + VLOG(1) << "State: WaitForRequest"; + // Nothing to do + break; + + case RunloopState::Warmup: { + VLOG(1) << "State: Warmup"; + profiler_.flushWarmupBuffers(currentIter, nextWakeupTime); + + if (profiler_.isGpuCollectionStopped()) { + profiler_.stopTraceAndReset(now); + LOG(ERROR) + << "State: Warmup stopped by GPU profiler. (Buffer size configured is " + << profiler_.activitiesMaxGpuBufferSizeMB() << "MB)"; + UST_LOGGER_MARK_COMPLETED(kWarmUpStage); + VLOG(0) << "Warmup -> WaitForRequest"; + currentRunloopState_ = RunloopState::WaitForRequest; + break; + } + + if (profiler_.isWarmupDone(now, currentIter)) { + UST_LOGGER_MARK_COMPLETED(kWarmUpStage); + if (!profiler_.isProfilingByIteration() && + (now > profiler_.profileStartTime() + milliseconds(10))) { + LOG(INFO) << "Tracing started " + << duration_cast( + now - profiler_.profileStartTime()) + .count() + << "ms late!"; + } else { + LOG(INFO) << "Tracing started"; + } + profiler_.startTrace(now); + currentRunloopState_ = RunloopState::CollectTrace; + if (libkineto::api().client()) { + libkineto::api().client()->start(); + } + if (nextWakeupTime > profiler_.profileEndTime()) { + new_wakeup_time = profiler_.profileEndTime(); + } + } else if (nextWakeupTime > profiler_.profileStartTime()) { + new_wakeup_time = profiler_.profileStartTime(); + } + break; + } + + case RunloopState::CollectTrace: { + VLOG(1) << "State: CollectTrace"; + bool collection_done = profiler_.isCollectionDone(now, currentIter); + + if (collection_done || profiler_.isGpuCollectionStopped()) { + LOG(INFO) << "Tracing complete."; + VLOG_IF(1, currentIter >= 0) + << "This state change was invoked by application's step() call"; + // currentIter >= 0 means this is called from the step() api of + // the profiler in pytorch main thread, it should be executed in + // another thread in case pytorch main thread is blocked + if (currentIter >= 0) { + // if collectTraceThread_ is already running, there's no need to + // execute collectTrace twice. + // Do not call collectTrace when profilerThread_ is collecting + // Trace. Otherwise, libkineto::api().client()->stop will be called + // twice, which leads to an unrecoverable ::c10:Error at + // disableProfiler + if (!collectTraceThread_ && !getCollectTraceState()) { + std::lock_guard guard( + collectTraceStateMutex_); + collectTraceThread_ = std::make_unique( + &AsyncActivityProfilerHandler::collectTrace, + this, + collection_done, + now); + } + break; + } + // this is executed in profilerThread_ + { + std::lock_guard guard(collectTraceStateMutex_); + isCollectingTrace_ = true; + } + collectTrace(collection_done, now); + { + std::lock_guard guard(collectTraceStateMutex_); + isCollectingTrace_ = false; + } + } else if (profiler_.isProfilingByIteration()) { + // nothing to do here + } else if ( + now < profiler_.profileEndTime() && + profiler_.profileEndTime() < nextWakeupTime) { + new_wakeup_time = profiler_.profileEndTime(); + } + break; + } + + case RunloopState::ProcessTrace: { + VLOG(1) << "State: ProcessTrace"; + // skip this state transition if it called from the step() api + // of the profiler. + // else it could lead to a race between the profiler thread and an + // application thread calling step() + if (currentIter >= 0) { + return new_wakeup_time; + } + + // Before processing, we should wait for collectTrace thread to be done. + ensureCollectTraceDone(); + + // FIXME: Probably want to allow interruption here + // for quickly handling trace request via synchronous API + profiler_.processTraceAndReset(*logger_); + UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); + currentRunloopState_ = RunloopState::WaitForRequest; + VLOG(0) << "ProcessTrace -> WaitForRequest"; + break; + } + } + + return new_wakeup_time; +} + +void AsyncActivityProfilerHandler::collectTrace( + bool collection_done, + const std::chrono::time_point& now) { + profiler_.collectTrace(collection_done, now); + currentRunloopState_ = RunloopState::ProcessTrace; +} + +void AsyncActivityProfilerHandler::performMemoryLoop( + const std::string& path, + uint32_t profile_time, + ActivityLogger* logger, + Config& config) { + currentRunloopState_ = RunloopState::CollectMemorySnapshot; + if (libkineto::api().client()) { + libkineto::api().client()->start_memory_profile(); + LOG(INFO) << "Running memory profiling for " << profile_time << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(profile_time)); + LOG(INFO) << "Exporting memory profiling results to " << path; + libkineto::api().client()->export_memory_profile(path); + libkineto::api().client()->stop_memory_profile(); + LOG(INFO) << "Finalizing trace"; + logger->finalizeMemoryTrace(path, config); + } + currentRunloopState_ = RunloopState::WaitForRequest; +} + +bool AsyncActivityProfilerHandler::getCollectTraceState() { + std::lock_guard guard(collectTraceStateMutex_); + return isCollectingTrace_; +} + +void AsyncActivityProfilerHandler::ensureCollectTraceDone() { + if (collectTraceThread_ && collectTraceThread_->joinable()) { + collectTraceThread_->join(); + collectTraceThread_.reset(nullptr); + } +} + +void AsyncActivityProfilerHandler::cancel() { + { + std::lock_guard lock(asyncConfigLock_); + asyncRequestConfig_ = nullptr; + } + if (!isAsyncActive()) { + return; + } + ensureCollectTraceDone(); + if (libkineto::api().client()) { + libkineto::api().client()->stop(); + } + profiler_.stopTraceAndReset(std::chrono::system_clock::now()); + currentRunloopState_ = RunloopState::WaitForRequest; +} + } // namespace KINETO_NAMESPACE diff --git a/libkineto/src/AsyncActivityProfilerHandler.h b/libkineto/src/AsyncActivityProfilerHandler.h index e83b9f1e3..26f78a99d 100644 --- a/libkineto/src/AsyncActivityProfilerHandler.h +++ b/libkineto/src/AsyncActivityProfilerHandler.h @@ -36,11 +36,34 @@ class AsyncActivityProfilerHandler { ~AsyncActivityProfilerHandler(); - bool canAcceptConfig(); void acceptConfig(const Config& config); void scheduleTrace(const Config& config); void step(); + [[nodiscard]] bool isAsyncActive() const { + return currentRunloopState_ != RunloopState::WaitForRequest; + } + + [[nodiscard]] bool isCollectingMemorySnapshot() const { + return currentRunloopState_ == RunloopState::CollectMemorySnapshot; + } + + void cancel(); + + void configure(const Config& config, std::chrono::time_point now); + + // Invoke at a regular interval to perform profiling activities. + // When not active, an interval of 1-5 seconds is probably fine, + // depending on required warm-up time and delayed start time. + // When active, it's a good idea to invoke more frequently to stay below + // memory usage limit (ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB) during warmup. + std::chrono::time_point performRunLoopStep( + const std::chrono::time_point& now, + const std::chrono::time_point& nextWakeupTime, + int64_t currentIter = -1); + + void ensureCollectTraceDone(); + private: bool shouldActivateIterationConfig(int64_t currentIter); bool shouldActivateTimestampConfig(const std::chrono::time_point& now); @@ -57,5 +80,24 @@ class AsyncActivityProfilerHandler { GenericActivityProfiler& profiler_; std::atomic_bool& syncTraceActive_; std::unique_ptr logger_; + + enum class RunloopState { + WaitForRequest, + Warmup, + CollectTrace, + ProcessTrace, + CollectMemorySnapshot, + }; + + void performMemoryLoop(const std::string& path, uint32_t profile_time, ActivityLogger* logger, Config& config); + + void collectTrace(bool collection_done, const std::chrono::time_point& now); + + bool getCollectTraceState(); + + std::atomic currentRunloopState_{RunloopState::WaitForRequest}; + std::unique_ptr collectTraceThread_{nullptr}; + std::recursive_mutex collectTraceStateMutex_; + bool isCollectingTrace_{false}; }; } // namespace KINETO_NAMESPACE diff --git a/libkineto/src/CuptiActivityProfiler.cpp b/libkineto/src/CuptiActivityProfiler.cpp index e9ffed026..9a2d1f0a3 100644 --- a/libkineto/src/CuptiActivityProfiler.cpp +++ b/libkineto/src/CuptiActivityProfiler.cpp @@ -134,11 +134,11 @@ void CuptiActivityProfiler::disableGpuTracing() { cupti_.disableCuptiActivities(derivedConfig_->profileActivityTypes()); } -void CuptiActivityProfiler::clearGpuActivities() { +void CuptiActivityProfiler::clearGpuActivitiesImpl() { cupti_.clearActivities(); } -bool CuptiActivityProfiler::isGpuCollectionStopped() const { +bool CuptiActivityProfiler::isGpuCollectionStoppedImpl() const { return cupti_.stopCollection; } diff --git a/libkineto/src/CuptiActivityProfiler.h b/libkineto/src/CuptiActivityProfiler.h index 344e62b6b..49d3e233b 100644 --- a/libkineto/src/CuptiActivityProfiler.h +++ b/libkineto/src/CuptiActivityProfiler.h @@ -27,8 +27,8 @@ class CuptiActivityProfiler : public GenericActivityProfiler { void setMaxGpuBufferSize(int64_t size) override; void enableGpuTracing() override; void disableGpuTracing() override; - void clearGpuActivities() override; - bool isGpuCollectionStopped() const override; + void clearGpuActivitiesImpl() override; + bool isGpuCollectionStoppedImpl() const override; void processGpuActivities(ActivityLogger& logger) override; void synchronizeGpuDevice() override; void pushCorrelationIdImpl(uint64_t id, CorrelationFlowType type) override; diff --git a/libkineto/src/GenericActivityProfiler.cpp b/libkineto/src/GenericActivityProfiler.cpp index 9e7f3458c..0e4d52bd2 100644 --- a/libkineto/src/GenericActivityProfiler.cpp +++ b/libkineto/src/GenericActivityProfiler.cpp @@ -102,23 +102,14 @@ std::ostream& operator<<( } GenericActivityProfiler::GenericActivityProfiler(bool cpuOnly) - : flushOverhead_{0, 0}, - setupOverhead_{0, 0}, - cpuOnly_{cpuOnly}, - currentRunloopState_{RunloopState::WaitForRequest} {} - -GenericActivityProfiler::~GenericActivityProfiler() { - if (collectTraceThread_ && collectTraceThread_->joinable()) { - collectTraceThread_->join(); - } -} + : flushOverhead_{0, 0}, setupOverhead_{0, 0}, cpuOnly_{cpuOnly} {} +GenericActivityProfiler::~GenericActivityProfiler() {} void GenericActivityProfiler::transferCpuTrace( std::unique_ptr cpuTrace) { std::lock_guard guard(mutex_); const string& trace_name = cpuTrace->span.name; - if (currentRunloopState_ != RunloopState::CollectTrace && - currentRunloopState_ != RunloopState::ProcessTrace) { + if (!acceptCpuTraces_) { VLOG(0) << "Trace collection not in progress - discarding span " << trace_name; return; @@ -470,10 +461,6 @@ void GenericActivityProfiler::configure( const Config& config, const time_point& now) { std::lock_guard guard(mutex_); - if (isActive()) { - LOG(WARNING) << "GenericActivityProfiler already busy, terminating"; - return; - } ApproximateClockToUnixTimeConverter clockConverter; get_time_converter() = clockConverter.makeConverter(); @@ -491,11 +478,6 @@ void GenericActivityProfiler::configure( derivedConfig_.reset(); derivedConfig_ = std::make_unique(*config_); - // Check if now is a valid time to start. - if (!derivedConfig_->canStart(now)) { - return; - } - if (LOG_IS_ON(INFO)) { config_->printActivityProfilerConfig(LIBKINETO_DBG_STREAM); } @@ -573,18 +555,33 @@ void GenericActivityProfiler::configure( traceBuffers_ = std::make_unique(); captureWindowStartTime_ = captureWindowEndTime_ = 0; - currentRunloopState_ = RunloopState::Warmup; + acceptCpuTraces_ = false; } -bool GenericActivityProfiler::getCollectTraceState() { - std::lock_guard guard(collectTraceStateMutex_); - return isCollectingTrace; -} +void GenericActivityProfiler::flushWarmupBuffers( + int64_t currentIter, + const time_point& nextWakeupTime) { + if (cpuOnly_) { + return; + } + // Flushing GPU activities can take a while so avoid doing it close to + // the start time. Clear during warmup in the following cases: + // 1. Iteration-based flow: called from application step() API + // (currentIter >= 0) with iteration profiling enabled + // 2. Timestamp-based flow: called from periodic runloop + // (currentIter < 0) when not close to profile start time + // 3. Iteration config with periodic runloop: always safe to clear + const bool isIterationBasedFlow = + derivedConfig_->isProfilingByIteration() && currentIter >= 0; + const bool isTimestampBasedFlowSafeToFlush = + !derivedConfig_->isProfilingByIteration() && currentIter < 0 && + nextWakeupTime < derivedConfig_->profileStartTime(); + const bool isIterationConfigWithPeriodicRunloop = + derivedConfig_->isProfilingByIteration() && currentIter < 0; -void GenericActivityProfiler::ensureCollectTraceDone() { - if (collectTraceThread_ && collectTraceThread_->joinable()) { - collectTraceThread_->join(); - collectTraceThread_.reset(nullptr); + if (isIterationBasedFlow || isTimestampBasedFlowSafeToFlush || + isIterationConfigWithPeriodicRunloop) { + clearGpuActivitiesImpl(); } } @@ -604,12 +601,12 @@ void GenericActivityProfiler::toggleCollectionDynamic(const bool enable) { void GenericActivityProfiler::startTraceInternal( const time_point& now) { captureWindowStartTime_ = libkineto::timeSinceEpoch(now); + acceptCpuTraces_ = true; VLOG(0) << "Warmup -> CollectTrace"; for (auto& session : sessions_) { LOG(INFO) << "Starting child profiler session"; session->start(); } - currentRunloopState_ = RunloopState::CollectTrace; } void GenericActivityProfiler::stopTraceInternal( @@ -629,23 +626,15 @@ void GenericActivityProfiler::stopTraceInternal( } } - if (currentRunloopState_ == RunloopState::CollectTrace) { - VLOG(0) << "CollectTrace -> ProcessTrace"; - } else { - LOG(WARNING) << "Called stopTrace with state == " - << static_cast>( - currentRunloopState_.load()); - } for (auto& session : sessions_) { LOG(INFO) << "Stopping child profiler session"; session->stop(); } - currentRunloopState_ = RunloopState::ProcessTrace; } void GenericActivityProfiler::resetInternal() { + acceptCpuTraces_ = false; resetTraceData(); - currentRunloopState_ = RunloopState::WaitForRequest; } void GenericActivityProfiler::finalizeTrace( @@ -787,7 +776,7 @@ void GenericActivityProfiler::popUserCorrelationId() { void GenericActivityProfiler::resetTraceData() { if (!cpuOnly_) { - clearGpuActivities(); + clearGpuActivitiesImpl(); onResetTraceData(); } activityMap_.clear(); @@ -808,191 +797,6 @@ void GenericActivityProfiler::resetTraceData() { #endif // !USE_GOOGLE_LOG } -// On-demand only code follows. -// -// TODO: Decide if we should refactor this into either the controller -// or its own class. -time_point GenericActivityProfiler::performRunLoopStep( - const time_point& now, - const time_point& nextWakeupTime, - int64_t currentIter) { - auto new_wakeup_time = nextWakeupTime; - bool warmup_done = false; - bool collection_done = false; - - VLOG_IF(1, currentIter >= 0) - << "Run loop on application step(), iteration = " << currentIter; - - switch (currentRunloopState_) { - case RunloopState::CollectMemorySnapshot: - LOG(WARNING) - << "Entered CollectMemorySnapshot in Kineto Loop Step, skipping loop"; - break; - case RunloopState::WaitForRequest: - VLOG(1) << "State: WaitForRequest"; - // Nothing to do - break; - - case RunloopState::Warmup: - VLOG(1) << "State: Warmup"; - warmup_done = derivedConfig_->isWarmupDone(now, currentIter); - { - // Flushing GPU activities can take a while so avoid doing it close to - // the start time. Clear during warmup in the following cases: - // 1. Iteration-based flow: called from application step() API - // (currentIter >= 0) with iteration profiling enabled - // 2. Timestamp-based flow: called from periodic runloop - // (currentIter < 0) when not close to profile start time - // 3. Iteration config with periodic runloop: always safe to clear - const bool isIterationBasedFlow = - derivedConfig_->isProfilingByIteration() && currentIter >= 0; - const bool isTimestampBasedFlowSafeToFlush = - !derivedConfig_->isProfilingByIteration() && currentIter < 0 && - nextWakeupTime < derivedConfig_->profileStartTime(); - const bool isIterationConfigWithPeriodicRunloop = - derivedConfig_->isProfilingByIteration() && currentIter < 0; - - if (!cpuOnly_ && - (isIterationBasedFlow || isTimestampBasedFlowSafeToFlush || - isIterationConfigWithPeriodicRunloop)) { - clearGpuActivities(); - } - } - - if (isGpuCollectionStopped()) { - // Go to process trace to clear any outstanding buffers etc - std::lock_guard guard(mutex_); - stopTraceInternal(now); - resetInternal(); - LOG(ERROR) - << "State: Warmup stopped by GPU profiler. (Buffer size configured is " - << config_->activitiesMaxGpuBufferSize() / 1024 / 1024 << "MB)"; - UST_LOGGER_MARK_COMPLETED(kWarmUpStage); - VLOG(0) << "Warmup -> WaitForRequest"; - break; - } - - if (warmup_done) { - UST_LOGGER_MARK_COMPLETED(kWarmUpStage); - if (!derivedConfig_->isProfilingByIteration() && - (now > derivedConfig_->profileStartTime() + milliseconds(10))) { - LOG(INFO) << "Tracing started " - << duration_cast( - now - derivedConfig_->profileStartTime()) - .count() - << "ms late!"; - } else { - LOG(INFO) << "Tracing started"; - } - startTrace(now); - if (libkineto::api().client()) { - libkineto::api().client()->start(); - } - if (nextWakeupTime > derivedConfig_->profileEndTime()) { - new_wakeup_time = derivedConfig_->profileEndTime(); - } - } else if (nextWakeupTime > derivedConfig_->profileStartTime()) { - new_wakeup_time = derivedConfig_->profileStartTime(); - } - - break; - - case RunloopState::CollectTrace: - VLOG(1) << "State: CollectTrace"; - collection_done = derivedConfig_->isCollectionDone(now, currentIter); - - if (collection_done || isGpuCollectionStopped()) { - // Update runloop state first to prevent further updates to shared - // state - LOG(INFO) << "Tracing complete."; - VLOG_IF(1, currentIter >= 0) - << "This state change was invoked by application's step() call"; - - // currentIter >= 0 means this is called from the step() api of - // the profile in pytorch main thread, it should be executed in - // another thread in case pytorch main thread is blocked - if (currentIter >= 0) { - // if collectTraceThread_ is already running, there's no need to - // execute collectTrace twice. - // Do not call collectTrace when profilerThread_ is collecting - // Trace. Otherwise, libkineto::api().client()->stop will be called - // twice, which leads to an unrecoverable ::c10:Error at - // disableProfiler - if (!collectTraceThread_ && !getCollectTraceState()) { - std::lock_guard guard(mutex_); - collectTraceThread_ = std::make_unique( - &GenericActivityProfiler::collectTrace, - this, - collection_done, - now); - } - break; - } - // this is executed in profilerThread_ - { - std::lock_guard guard(collectTraceStateMutex_); - isCollectingTrace = true; - } - collectTrace(collection_done, now); - { - std::lock_guard guard(collectTraceStateMutex_); - isCollectingTrace = false; - } - } else if (derivedConfig_->isProfilingByIteration()) { - // nothing to do here - } else if ( - now < derivedConfig_->profileEndTime() && - derivedConfig_->profileEndTime() < nextWakeupTime) { - new_wakeup_time = derivedConfig_->profileEndTime(); - } - - break; - - case RunloopState::ProcessTrace: - VLOG(1) << "State: ProcessTrace"; - // skip this state transition if it called from the step() api - // of the profiler. - // else it could lead to a race between the profiler thread and an - // application thread calling step() - if (currentIter >= 0) { - return new_wakeup_time; - } - - // Before processing, we should wait for collectTrace thread to be done. - ensureCollectTraceDone(); - - // FIXME: Probably want to allow interruption here - // for quickly handling trace request via synchronous API - std::lock_guard guard(mutex_); - processTraceInternal(*logger_); - UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); - resetInternal(); - VLOG(0) << "ProcessTrace -> WaitForRequest"; - break; - } - - return new_wakeup_time; -} - -void GenericActivityProfiler::performMemoryLoop( - const string& path, - uint32_t profile_time, - ActivityLogger* logger, - Config& config) { - currentRunloopState_ = RunloopState::CollectMemorySnapshot; - if (libkineto::api().client()) { - libkineto::api().client()->start_memory_profile(); - LOG(INFO) << "Running memory profiling for " << profile_time << " ms"; - std::this_thread::sleep_for(std::chrono::milliseconds(profile_time)); - LOG(INFO) << "Exporting memory profiling results to " << path; - libkineto::api().client()->export_memory_profile(path); - libkineto::api().client()->stop_memory_profile(); - LOG(INFO) << "Finalizing trace"; - logger->finalizeMemoryTrace(path, config); - } - currentRunloopState_ = RunloopState::WaitForRequest; -} - void GenericActivityProfiler::collectTrace( bool collection_done, const std::chrono::time_point& now) { @@ -1000,7 +804,7 @@ void GenericActivityProfiler::collectTrace( libkineto::api().client()->stop(); } - if (isGpuCollectionStopped()) { + if (isGpuCollectionStoppedImpl()) { ecs_.gpu_stopped_early = true; LOG(ERROR) << "State: CollectTrace stopped by GPU profiler. (Buffer size configured is " diff --git a/libkineto/src/GenericActivityProfiler.h b/libkineto/src/GenericActivityProfiler.h index 7fa4d5c2e..5ca8df73c 100644 --- a/libkineto/src/GenericActivityProfiler.h +++ b/libkineto/src/GenericActivityProfiler.h @@ -113,25 +113,6 @@ class GenericActivityProfiler { GenericActivityProfiler& operator=(const GenericActivityProfiler&) = delete; virtual ~GenericActivityProfiler(); - bool isActive() const { - return currentRunloopState_ != RunloopState::WaitForRequest; - } - bool isCollectingMemorySnapshot() const { - return currentRunloopState_ == RunloopState::CollectMemorySnapshot; - } - - // Invoke at a regular interval to perform profiling activities. - // When not active, an interval of 1-5 seconds is probably fine, - // depending on required warm-up time and delayed start time. - // When active, it's a good idea to invoke more frequently to stay below - // memory usage limit (ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB) during warmup. - std::chrono::time_point performRunLoopStep( - const std::chrono::time_point& now, - const std::chrono::time_point& nextWakeupTime, - int64_t currentIter = -1); - - void performMemoryLoop(const std::string& path, uint32_t profile_time, ActivityLogger* logger, Config& config); - // Collect CPU and GPU traces void collectTrace(bool collection_done, const std::chrono::time_point& now); @@ -156,7 +137,37 @@ class GenericActivityProfiler { return cpuActivityPresent_ || gpuActivityPresent_; } - // Synchronous control API + void flushWarmupBuffers(int64_t currentIter, + const std::chrono::time_point& nextWakeupTime); + + bool isWarmupDone(const std::chrono::time_point& now, int64_t currentIter) const { + return derivedConfig_->isWarmupDone(now, currentIter); + } + + bool isCollectionDone(const std::chrono::time_point& now, int64_t currentIter) const { + return derivedConfig_->isCollectionDone(now, currentIter); + } + + bool isGpuCollectionStopped() const { + return isGpuCollectionStoppedImpl(); + } + + bool isProfilingByIteration() const { + return derivedConfig_->isProfilingByIteration(); + } + + std::chrono::time_point profileStartTime() const { + return derivedConfig_->profileStartTime(); + } + + std::chrono::time_point profileEndTime() const { + return derivedConfig_->profileEndTime(); + } + + int activitiesMaxGpuBufferSizeMB() const { + return config_ ? config_->activitiesMaxGpuBufferSize() / 1024 / 1024 : 0; + } + void startTrace(const std::chrono::time_point& now) { std::lock_guard guard(mutex_); startTraceInternal(now); @@ -167,20 +178,33 @@ class GenericActivityProfiler { stopTraceInternal(now); } - // Ensure collectTrace is done - void ensureCollectTraceDone(); - // Process CPU and GPU traces + void stopTraceAndReset(const std::chrono::time_point& now) { + std::lock_guard guard(mutex_); + stopTraceInternal(now); + resetInternal(); + } + void processTrace(ActivityLogger& logger) { std::lock_guard guard(mutex_); processTraceInternal(logger); } + void processTraceAndReset(ActivityLogger& logger) { + std::lock_guard guard(mutex_); + processTraceInternal(logger); + resetInternal(); + } + void reset() { std::lock_guard guard(mutex_); resetInternal(); } - // Set up profiler as specified in config. + bool canStart(const Config& config, const std::chrono::time_point& now) const { + ConfigDerivedState derived(config); + return derived.canStart(now); + } + void configure(const Config& config, const std::chrono::time_point& now); // Toggle GPU tracing during a profile instance @@ -246,8 +270,8 @@ class GenericActivityProfiler { virtual void setMaxGpuBufferSize([[maybe_unused]] int64_t size) {} virtual void enableGpuTracing() {} virtual void disableGpuTracing() {} - virtual void clearGpuActivities() {} - virtual bool isGpuCollectionStopped() const { + virtual void clearGpuActivitiesImpl() {} + virtual bool isGpuCollectionStoppedImpl() const { return false; } virtual void processGpuActivities([[maybe_unused]] ActivityLogger& logger) {} @@ -379,8 +403,6 @@ class GenericActivityProfiler { void checkTimestampOrder(const ITraceActivity* act1); - bool getCollectTraceState(); - // On-demand Request Config (should not be modified) // TODO: remove this config_, dependency needs to be removed from // finalizeTrace. @@ -392,14 +414,6 @@ class GenericActivityProfiler { // Logger used during trace processing ActivityLogger* logger_; - enum class RunloopState { - WaitForRequest, - Warmup, - CollectTrace, - ProcessTrace, - CollectMemorySnapshot, - }; - // All recorded trace spans, both CPU and GPU // Trace Id -> list of iterations. // Using map of lists for the iterator semantics, since we are recording @@ -426,6 +440,10 @@ class GenericActivityProfiler { bool gpuOnly_{false}; bool cpuActivityPresent_{false}; bool gpuActivityPresent_{false}; + + // Gate for CPU trace ingestion. True only between startTraceInternal() + // and resetInternal() — spans arriving outside this window are discarded. + bool acceptCpuTraces_{false}; bool rangeProfilingActive_{false}; std::atomic toggleState_{true}; @@ -438,18 +456,6 @@ class GenericActivityProfiler { // Mutex to protect non-atomic access to below state std::recursive_mutex mutex_; - // Add a thread to collect both cpu and gpu traces in case torch main thread - // is blocked when profiling by iterations is enabled. Issue #953 shows - // details. - std::unique_ptr collectTraceThread_{nullptr}; - - // Add a mutex to protect state for CollectTrace - std::recursive_mutex collectTraceStateMutex_; - bool isCollectingTrace{false}; - - // Runloop phase - std::atomic currentRunloopState_{RunloopState::WaitForRequest}; - // Keep track of the start time and end time for the trace collected. // External threads using startTrace need to manually stopTrace. Part of the // mock tests. All CUDA events before this time will be removed diff --git a/libkineto/src/RocmActivityProfiler.cpp b/libkineto/src/RocmActivityProfiler.cpp index 120d5b380..1d199c295 100644 --- a/libkineto/src/RocmActivityProfiler.cpp +++ b/libkineto/src/RocmActivityProfiler.cpp @@ -113,11 +113,11 @@ void RocmActivityProfiler::disableGpuTracing() { roc_.disableActivities(derivedConfig_->profileActivityTypes()); } -void RocmActivityProfiler::clearGpuActivities() { +void RocmActivityProfiler::clearGpuActivitiesImpl() { roc_.clearActivities(); } -bool RocmActivityProfiler::isGpuCollectionStopped() const { +bool RocmActivityProfiler::isGpuCollectionStoppedImpl() const { return roc_.stopCollection; } diff --git a/libkineto/src/RocmActivityProfiler.h b/libkineto/src/RocmActivityProfiler.h index 04a65f8cf..f2425307d 100644 --- a/libkineto/src/RocmActivityProfiler.h +++ b/libkineto/src/RocmActivityProfiler.h @@ -42,8 +42,8 @@ class RocmActivityProfiler : public GenericActivityProfiler { void setMaxGpuBufferSize(int64_t size) override; void enableGpuTracing() override; void disableGpuTracing() override; - void clearGpuActivities() override; - bool isGpuCollectionStopped() const override; + void clearGpuActivitiesImpl() override; + bool isGpuCollectionStoppedImpl() const override; void processGpuActivities(ActivityLogger& logger) override; void synchronizeGpuDevice() override; void pushCorrelationIdImpl(uint64_t id, CorrelationFlowType type) override; diff --git a/libkineto/src/SyncActivityProfilerHandler.cpp b/libkineto/src/SyncActivityProfilerHandler.cpp index 81dc19d94..2a0a2f794 100644 --- a/libkineto/src/SyncActivityProfilerHandler.cpp +++ b/libkineto/src/SyncActivityProfilerHandler.cpp @@ -28,23 +28,13 @@ SyncActivityProfilerHandler::SyncActivityProfilerHandler( : profiler_(profiler), syncTraceActive_(syncTraceActive) {} void SyncActivityProfilerHandler::prepareTrace(const Config& config) { - // Requests from ActivityProfilerApi have higher priority than - // requests from other sources (signal, daemon). - // Cancel any ongoing request and refuse new ones. - auto now = system_clock::now(); - syncTraceActive_ = true; - if (profiler_.isActive()) { - LOG(WARNING) << "Cancelling current trace request in order to start " - << "higher priority synchronous request"; - if (libkineto::api().client()) { - libkineto::api().client()->stop(); - } - - profiler_.stopTrace(now); - profiler_.reset(); + auto now = std::chrono::system_clock::now(); + if (!profiler_.canStart(config, now)) { + return; } - profiler_.configure(config, now); + syncTraceActive_ = true; + active_ = true; } void SyncActivityProfilerHandler::startTrace() { @@ -71,10 +61,23 @@ std::unique_ptr SyncActivityProfilerHandler:: profiler_.reset(); syncTraceActive_ = false; + active_ = false; return std::make_unique( std::move(logger), ActivityProfilerController::loggerFactory()); } +void SyncActivityProfilerHandler::cancel() { + if (!active_) { + return; + } + if (libkineto::api().client()) { + libkineto::api().client()->stop(); + } + profiler_.stopTraceAndReset(std::chrono::system_clock::now()); + syncTraceActive_ = false; + active_ = false; +} + void SyncActivityProfilerHandler::toggleCollectionDynamic(const bool enable) { profiler_.toggleCollectionDynamic(enable); } diff --git a/libkineto/src/SyncActivityProfilerHandler.h b/libkineto/src/SyncActivityProfilerHandler.h index 969187613..b78c51295 100644 --- a/libkineto/src/SyncActivityProfilerHandler.h +++ b/libkineto/src/SyncActivityProfilerHandler.h @@ -31,9 +31,15 @@ class SyncActivityProfilerHandler { void toggleCollectionDynamic(const bool enable); void startTrace(); std::unique_ptr stopTrace(); + void cancel(); + + bool isSyncActive() const { + return active_; + } private: GenericActivityProfiler& profiler_; std::atomic_bool& syncTraceActive_; + std::atomic active_{false}; }; } // namespace KINETO_NAMESPACE diff --git a/libkineto/test/AsyncActivityProfilerHandlerTest.cpp b/libkineto/test/AsyncActivityProfilerHandlerTest.cpp new file mode 100644 index 000000000..5beb2e2d3 --- /dev/null +++ b/libkineto/test/AsyncActivityProfilerHandlerTest.cpp @@ -0,0 +1,422 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include + +#include + +#ifdef __linux__ +#include +#include +#include +#endif + +#include "include/Config.h" +#include "src/AsyncActivityProfilerHandler.h" +#include "src/GenericActivityProfiler.h" +#include "src/SyncActivityProfilerHandler.h" + +#include "src/Logger.h" + +using namespace std::chrono; +using namespace KINETO_NAMESPACE; + +// Subclass that lets us control isGpuCollectionStopped() for testing +// the buffer-overflow-during-warmup path without needing real CUPTI. +class MockGpuProfiler : public GenericActivityProfiler { + public: + MockGpuProfiler() : GenericActivityProfiler(/*cpuOnly=*/false) {} + + void setGpuCollectionStopped(bool stopped) { + gpuStopped_ = stopped; + } + + protected: + bool isGpuCollectionStoppedImpl() const override { + return gpuStopped_; + } + + private: + bool gpuStopped_{false}; +}; + +static std::string logUrlToPath(const std::string& url) { + const std::string prefix = "file://"; + if (url.substr(0, prefix.size()) == prefix) { + return url.substr(prefix.size()); + } + return url; +} + +static void checkTracefile(const char* filename) { +#ifdef __linux__ + int fd = open(filename, O_RDONLY); + if (!fd) { + perror(filename); + } + EXPECT_TRUE(fd); + struct stat buf{}; + fstat(fd, &buf); + EXPECT_GT(buf.st_size, 100); + close(fd); +#endif +} + +TEST(AsyncActivityProfilerHandler, AsyncTrace) { + std::vector log_modules( + {"CuptiActivityProfiler.cpp", "output_json.cpp"}); + SET_LOG_VERBOSITY_LEVEL(1, log_modules); + + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + + int iter = 0; + int warmup = 5; + auto now = system_clock::now(); + auto startTime = now + seconds(10); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = {} + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + )CFG", + warmup, + filename, + duration_cast(startTime.time_since_epoch()).count())); + + EXPECT_TRUE(success); + EXPECT_FALSE(handler.isAsyncActive()); + + handler.configure(cfg, now); + + EXPECT_TRUE(handler.isAsyncActive()); + + // fast forward in time and we have reached the startTime + now = startTime; + + // Warmup + handler.performRunLoopStep(now, now); + + auto next = now + milliseconds(1000); + + // Iteration-based steps should have no effect on timestamp-based config + while (++iter < 20) { + handler.performRunLoopStep(now, now, iter); + } + + // Terminate collection + handler.performRunLoopStep(next, next); + + EXPECT_TRUE(handler.isAsyncActive()); + + auto nextnext = next + milliseconds(1000); + + while (++iter < 40) { + handler.performRunLoopStep(next, next, iter); + } + + EXPECT_TRUE(handler.isAsyncActive()); + + handler.performRunLoopStep(nextnext, nextnext); + handler.performRunLoopStep(nextnext, nextnext); + + EXPECT_FALSE(handler.isAsyncActive()); + + auto logFile = logUrlToPath(cfg.activitiesLogUrl()); + checkTracefile(logFile.c_str()); +} + +TEST(AsyncActivityProfilerHandler, AsyncTraceUsingIter) { + std::vector log_modules( + {"CuptiActivityProfiler.cpp", "output_json.cpp"}); + SET_LOG_VERBOSITY_LEVEL(1, log_modules); + + auto runIterTest = [&](int start_iter, int warmup_iters, int trace_iters) { + LOG(INFO) << "Async Trace Test: start_iteration = " << start_iter + << " warmup iterations = " << warmup_iters + << " trace iterations = " << trace_iters; + + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + + int iter = 0; + auto now = system_clock::now(); + + bool success = cfg.parse( + fmt::format( + R"CFG( + PROFILE_START_ITERATION = {} + ACTIVITIES_WARMUP_ITERATIONS={} + ACTIVITIES_ITERATIONS={} + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + )CFG", + start_iter, + warmup_iters, + trace_iters, + filename)); + + EXPECT_TRUE(success); + EXPECT_FALSE(handler.isAsyncActive()); + + while (iter < (start_iter - warmup_iters)) { + iter++; + } + + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + + now += seconds(10); + auto next = now + milliseconds(1000); + + handler.performRunLoopStep(now, next); + EXPECT_TRUE(handler.isAsyncActive()); + + while (iter < start_iter) { + handler.performRunLoopStep(now, next, iter++); + } + + while (iter < (start_iter + trace_iters)) { + handler.performRunLoopStep(now, next, iter++); + } + + if (iter >= (start_iter + trace_iters)) { + handler.performRunLoopStep(now, next, iter++); + } + EXPECT_TRUE(handler.isAsyncActive()); + + auto nextnext = next + milliseconds(1000); + handler.performRunLoopStep(nextnext, nextnext); + handler.ensureCollectTraceDone(); + handler.performRunLoopStep(nextnext, nextnext); + + EXPECT_FALSE(handler.isAsyncActive()); + + auto logFile = logUrlToPath(cfg.activitiesLogUrl()); + checkTracefile(logFile.c_str()); + }; + + runIterTest(50, 5, 10); + runIterTest(0, 0, 2); + runIterTest(0, 5, 5); +} + +TEST(AsyncActivityProfilerHandler, MetadataJsonFormatingTest) { + std::vector log_modules( + {"CuptiActivityProfiler.cpp", "output_json.cpp"}); + SET_LOG_VERBOSITY_LEVEL(1, log_modules); + + setenv("PT_PROFILER_JOB_NAME", "test_training_job", 1); + setenv("PT_PROFILER_JOB_VERSION", "2", 1); + setenv("PT_PROFILER_JOB_ATTEMPT_INDEX", "5", 1); + + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + + auto now = system_clock::now(); + auto startTime = now + seconds(2); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = 1 + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + )CFG", + filename, + duration_cast(startTime.time_since_epoch()).count())); + + EXPECT_TRUE(success); + EXPECT_FALSE(handler.isAsyncActive()); + + handler.configure(cfg, now); + + EXPECT_TRUE(handler.isAsyncActive()); + + std::string keyPrefix = "TEST_METADATA_"; + profiler.addMetadata(keyPrefix + "NORMAL", "\"metadata value\""); + profiler.addMetadata(keyPrefix + "NEWLINE", "\"metadata \nvalue\""); + profiler.addMetadata(keyPrefix + "BACKSLASH", R"("/test/metadata\path")"); + + auto next = startTime + milliseconds(1000); + auto after = next + milliseconds(1000); + + handler.performRunLoopStep(startTime, startTime); + EXPECT_TRUE(handler.isAsyncActive()); + + handler.performRunLoopStep(next, next); + EXPECT_TRUE(handler.isAsyncActive()); + + handler.performRunLoopStep(after, after); + EXPECT_FALSE(handler.isAsyncActive()); + +#ifdef __linux__ + auto logFile = logUrlToPath(cfg.activitiesLogUrl()); + std::ifstream file(logFile); + if (!file.is_open()) { + throw std::runtime_error("Failed to open the trace JSON file."); + } + std::string jsonStr( + (std::istreambuf_iterator(file)), std::istreambuf_iterator()); + folly::dynamic jsonData = folly::parseJson(jsonStr); + + auto countSubstrings = [](const std::string& source, + const std::string& substring) { + size_t count = 0; + size_t pos = source.find(substring); + while (pos != std::string::npos) { + ++count; + pos = source.find(substring, pos + substring.length()); + } + return count; + }; + + EXPECT_EQ(3, countSubstrings(jsonStr, keyPrefix)); + EXPECT_EQ(2, countSubstrings(jsonStr, "metadata value")); + EXPECT_EQ(1, countSubstrings(jsonStr, "/test/metadata/path")); + + EXPECT_EQ(jsonData["PT_PROFILER_JOB_NAME"].asString(), "test_training_job"); + EXPECT_EQ(jsonData["PT_PROFILER_JOB_VERSION"].asString(), "2"); + EXPECT_EQ(jsonData["PT_PROFILER_JOB_ATTEMPT_INDEX"].asString(), "5"); +#endif + + unsetenv("PT_PROFILER_JOB_NAME"); + unsetenv("PT_PROFILER_JOB_VERSION"); + unsetenv("PT_PROFILER_JOB_ATTEMPT_INDEX"); +} + +TEST(AsyncActivityProfilerHandler, Cancel) { + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + // Cancel when inactive is a no-op + EXPECT_FALSE(handler.isAsyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isAsyncActive()); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + auto now = system_clock::now(); + auto startTime = now + seconds(10); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = 5 + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + )CFG", + filename, + duration_cast(startTime.time_since_epoch()).count())); + EXPECT_TRUE(success); + + // Cancel during Warmup + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isAsyncActive()); + + // Stays inactive on subsequent steps + handler.performRunLoopStep(startTime, startTime); + EXPECT_FALSE(handler.isAsyncActive()); + + // Cancel during CollectTrace + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + now = startTime; + handler.performRunLoopStep(now, now); + EXPECT_TRUE(handler.isAsyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isAsyncActive()); +} + +TEST(AsyncActivityProfilerHandler, BufferSizeLimitDuringWarmup) { + MockGpuProfiler profiler; + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + auto now = system_clock::now(); + auto startTime = now + seconds(10); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = 5 + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB = 3 + )CFG", + filename, + duration_cast(startTime.time_since_epoch()).count())); + EXPECT_TRUE(success); + + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + + // Simulate GPU buffer overflow + profiler.setGpuCollectionStopped(true); + + // During warmup, the handler should detect GPU collection stopped + // and transition back to WaitForRequest + now = startTime; + handler.performRunLoopStep(now, now); + EXPECT_FALSE(handler.isAsyncActive()); +} + +TEST(SyncActivityProfilerHandler, Cancel) { + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + SyncActivityProfilerHandler handler(profiler, syncTraceActive); + + // Cancel when inactive is a no-op + EXPECT_FALSE(handler.isSyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isSyncActive()); + + // Cancel after prepareTrace + Config cfg; + cfg.validate(system_clock::now()); + handler.prepareTrace(cfg); + EXPECT_TRUE(handler.isSyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isSyncActive()); +} diff --git a/libkineto/test/CuptiActivityProfilerTest.cpp b/libkineto/test/CuptiActivityProfilerTest.cpp index de46ea585..952b83cae 100644 --- a/libkineto/test/CuptiActivityProfilerTest.cpp +++ b/libkineto/test/CuptiActivityProfilerTest.cpp @@ -302,205 +302,6 @@ class CuptiActivityProfilerTest : public ::testing::Test { ActivityLoggerFactory loggerFactory; }; -void checkTracefile(const char* filename) { -#ifdef __linux__ - // Check that the expected file was written and that it has some content - int fd = open(filename, O_RDONLY); - if (!fd) { - perror(filename); - } - EXPECT_TRUE(fd); - // Should expect at least 100 bytes - struct stat buf{}; - fstat(fd, &buf); - EXPECT_GT(buf.st_size, 100); - close(fd); -#endif -} - -TEST(CuptiActivityProfiler, AsyncTrace) { - std::vector log_modules( - {"CuptiActivityProfiler.cpp", "output_json.cpp"}); - SET_LOG_VERBOSITY_LEVEL(1, log_modules); - - MockCuptiActivities activities; - CuptiActivityProfiler profiler(activities, /*cpu only*/ true); - - char filename[] = "/tmp/libkineto_testXXXXXX.json"; - mkstemps(filename, 5); - - Config cfg; - - int iter = 0; - int warmup = 5; - auto now = system_clock::now(); - auto startTime = now + seconds(10); - - bool success = cfg.parse( - fmt::format( - R"CFG( - ACTIVITIES_WARMUP_PERIOD_SECS = {} - ACTIVITIES_DURATION_SECS = 1 - ACTIVITIES_LOG_FILE = {} - PROFILE_START_TIME = {} - )CFG", - warmup, - filename, - duration_cast(startTime.time_since_epoch()).count())); - - EXPECT_TRUE(success); - EXPECT_FALSE(profiler.isActive()); - - auto logger = std::make_unique(cfg.activitiesLogFile()); - - // Usually configuration is done when now is startTime - warmup to kick off - // warmup but start right away in the test - profiler.configure(cfg, now); - profiler.setLogger(logger.get()); - - EXPECT_TRUE(profiler.isActive()); - - // fast forward in time and we have reached the startTime - now = startTime; - - // Run the profiler - // Warmup - // performRunLoopStep is usually called by the controller loop and takes - // the current time and the controller's next wakeup time. - profiler.performRunLoopStep( - /* Current time */ now, /* Next wakeup time */ now); - - auto next = now + milliseconds(1000); - - // performRunLoopStep can also be called by an application thread to update - // iteration count since this config does not use iteration this should have - // no effect on the state - while (++iter < 20) { - profiler.performRunLoopStep(now, now, iter); - } - - // Runloop should now be in collect state, so start workload - // Perform another runloop step, passing in the end profile time as current. - // This should terminate collection - profiler.performRunLoopStep( - /* Current time */ next, /* Next wakeup time */ next); - // One step needed for each of the Process and Finalize phases - // Doesn't really matter what times we pass in here. - - EXPECT_TRUE(profiler.isActive()); - - auto nextnext = next + milliseconds(1000); - - while (++iter < 40) { - profiler.performRunLoopStep(next, next, iter); - } - - EXPECT_TRUE(profiler.isActive()); - - profiler.performRunLoopStep(nextnext, nextnext); - profiler.performRunLoopStep(nextnext, nextnext); - - // Assert that tracing has completed - EXPECT_FALSE(profiler.isActive()); - - checkTracefile(filename); -} - -TEST(CuptiActivityProfiler, AsyncTraceUsingIter) { - std::vector log_modules( - {"CuptiActivityProfiler.cpp", "output_json.cpp"}); - SET_LOG_VERBOSITY_LEVEL(1, log_modules); - - auto runIterTest = [&](int start_iter, int warmup_iters, int trace_iters) { - LOG(INFO) << "Async Trace Test: start_iteration = " << start_iter - << " warmup iterations = " << warmup_iters - << " trace iterations = " << trace_iters; - - MockCuptiActivities activities; - CuptiActivityProfiler profiler(activities, /*cpu only*/ true); - - char filename[] = "/tmp/libkineto_testXXXXXX.json"; - mkstemps(filename, 5); - - Config cfg; - - int iter = 0; - auto now = system_clock::now(); - - bool success = cfg.parse( - fmt::format( - R"CFG( - PROFILE_START_ITERATION = {} - ACTIVITIES_WARMUP_ITERATIONS={} - ACTIVITIES_ITERATIONS={} - ACTIVITIES_DURATION_SECS = 1 - ACTIVITIES_LOG_FILE = {} - )CFG", - start_iter, - warmup_iters, - trace_iters, - filename)); - - EXPECT_TRUE(success); - EXPECT_FALSE(profiler.isActive()); - - auto logger = std::make_unique(cfg.activitiesLogFile()); - - // Usually configuration is done when now is startIter - warmup iter to kick - // off warmup but start right away in the test - while (iter < (start_iter - warmup_iters)) { - profiler.performRunLoopStep(now, now, iter++); - } - - profiler.configure(cfg, now); - profiler.setLogger(logger.get()); - - EXPECT_TRUE(profiler.isActive()); - - // fast forward in time, mimicking what will happen in reality - now += seconds(10); - auto next = now + milliseconds(1000); - - // this call to runloop step should not be effecting the state - profiler.performRunLoopStep(now, next); - EXPECT_TRUE(profiler.isActive()); - - // start trace collection - while (iter < start_iter) { - profiler.performRunLoopStep(now, next, iter++); - } - - // Runloop should now be in collect state, so start workload - - while (iter < (start_iter + trace_iters)) { - profiler.performRunLoopStep(now, next, iter++); - } - - // One step is required for each of the Process and Finalize phases - // Doesn't really matter what times we pass in here. - if (iter >= (start_iter + trace_iters)) { - profiler.performRunLoopStep(now, next, iter++); - } - EXPECT_TRUE(profiler.isActive()); - - auto nextnext = next + milliseconds(1000); - profiler.performRunLoopStep(nextnext, nextnext); - profiler.ensureCollectTraceDone(); - profiler.performRunLoopStep(nextnext, nextnext); - - // Assert that tracing has completed - EXPECT_FALSE(profiler.isActive()); - - checkTracefile(filename); - }; - - // start iter = 50, warmup iters = 5, trace iters = 10 - runIterTest(50, 5, 10); - // should be able to start at 0 iteration - runIterTest(0, 0, 2); - runIterTest(0, 5, 5); -} - TEST_F(CuptiActivityProfilerTest, SyncTrace) { // Verbose logging is useful for debugging std::vector log_modules({"CuptiActivityProfiler.cpp"}); @@ -1037,10 +838,7 @@ TEST_F(CuptiActivityProfilerTest, SubActivityProfilers) { profiler.configure(*cfg_, start_time); profiler.startTrace(start_time); - EXPECT_TRUE(profiler.isActive()); - profiler.stopTrace(start_time + nanoseconds(duration_ns)); - EXPECT_TRUE(profiler.isActive()); char filename[] = "/tmp/libkineto_testXXXXXX.json"; mkstemps(filename, 5); @@ -1074,156 +872,6 @@ TEST_F(CuptiActivityProfilerTest, SubActivityProfilers) { EXPECT_GT(buf.st_size, 100); } -TEST_F(CuptiActivityProfilerTest, BufferSizeLimitTestWarmup) { - CuptiActivityProfiler profiler(cuptiActivities_, /*cpu only*/ false); - - auto now = system_clock::now(); - auto startTime = now + seconds(10); - - int maxBufferSizeMB = 3; - - auto startTimeEpoch = std::to_string( - duration_cast(startTime.time_since_epoch()).count()); - std::string maxBufferSizeMBStr = std::to_string(maxBufferSizeMB); - cfg_->handleOption("ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB", maxBufferSizeMBStr); - cfg_->handleOption("PROFILE_START_TIME", startTimeEpoch); - - EXPECT_FALSE(profiler.isActive()); - profiler.configure(*cfg_, now); - EXPECT_TRUE(profiler.isActive()); - - for (int i = 0; i < maxBufferSizeMB; i++) { - uint8_t* buf; - size_t gpuBufferSize; - size_t maxNumRecords; - cuptiActivities_.bufferRequestedOverride( - &buf, &gpuBufferSize, &maxNumRecords); - } - - // fast forward to startTime and profiler is now running - now = startTime; - - profiler.performRunLoopStep(now, now); - - auto next = now + milliseconds(1000); - profiler.performRunLoopStep(next, next); - profiler.performRunLoopStep(next, next); - profiler.performRunLoopStep(next, next); - - EXPECT_FALSE(profiler.isActive()); -} - -TEST(CuptiActivityProfiler, MetadataJsonFormatingTest) { - // Check for Json string sanitation and env var injection - // based on AsyncTrace test - std::vector log_modules( - {"CuptiActivityProfiler.cpp", "output_json.cpp"}); - SET_LOG_VERBOSITY_LEVEL(1, log_modules); - - // Set environment variables for testing - setenv("PT_PROFILER_JOB_NAME", "test_training_job", 1); - setenv("PT_PROFILER_JOB_VERSION", "2", 1); - setenv("PT_PROFILER_JOB_ATTEMPT_INDEX", "5", 1); - - MockCuptiActivities activities; - CuptiActivityProfiler profiler(activities, /*cpu only*/ true); - - char filename[] = "/tmp/libkineto_testXXXXXX.json"; - mkstemps(filename, 5); - - Config cfg; - - auto now = system_clock::now(); - auto startTime = now + seconds(2); - - bool success = cfg.parse( - fmt::format( - R"CFG( - ACTIVITIES_WARMUP_PERIOD_SECS = 1 - ACTIVITIES_DURATION_SECS = 1 - ACTIVITIES_LOG_FILE = {} - PROFILE_START_TIME = {} - )CFG", - filename, - duration_cast(startTime.time_since_epoch()).count())); - - EXPECT_TRUE(success); - EXPECT_FALSE(profiler.isActive()); - - auto logger = std::make_unique(cfg.activitiesLogFile()); - - // Usually configuration is done when now is startTime - warmup to kick off - // warmup but start right away in the test - profiler.configure(cfg, now); - profiler.setLogger(logger.get()); - - EXPECT_TRUE(profiler.isActive()); - - // Add test metadata - std::string keyPrefix = "TEST_METADATA_"; - profiler.addMetadata(keyPrefix + "NORMAL", "\"metadata value\""); - profiler.addMetadata(keyPrefix + "NEWLINE", "\"metadata \nvalue\""); - profiler.addMetadata(keyPrefix + "BACKSLASH", R"("/test/metadata\path")"); - - // Profiling activity at start up/during active/after duration - auto next = startTime + milliseconds(1000); - auto after = next + milliseconds(1000); - - profiler.performRunLoopStep(startTime, startTime); - EXPECT_TRUE(profiler.isActive()); - - profiler.performRunLoopStep(next, next); - EXPECT_TRUE(profiler.isActive()); - - profiler.performRunLoopStep(after, after); - EXPECT_FALSE(profiler.isActive()); - -#ifdef __linux__ - // Check that the saved JSON file can be loaded and deserialized - std::ifstream file(filename); - if (!file.is_open()) { - throw std::runtime_error("Failed to open the trace JSON file."); - } - std::string jsonStr( - (std::istreambuf_iterator(file)), std::istreambuf_iterator()); - nlohmann::json jsonData = nlohmann::json::parse(jsonStr); - - auto countSubstrings = [](const std::string& source, - const std::string& substring) { - size_t count = 0; - size_t pos = source.find(substring); - while (pos != std::string::npos) { - ++count; - pos = source.find(substring, pos + substring.length()); - } - return count; - }; - - // Check if metadata has been correctly sanitized - EXPECT_EQ(3, countSubstrings(jsonStr, keyPrefix)); - EXPECT_EQ(2, countSubstrings(jsonStr, "metadata value")); - EXPECT_EQ(1, countSubstrings(jsonStr, "/test/metadata/path")); - - // Verify injected env vars are in trace metadata with correct values - EXPECT_EQ( - jsonData["PT_PROFILER_JOB_NAME"].get(), "test_training_job"); - EXPECT_EQ(jsonData["PT_PROFILER_JOB_VERSION"].get(), "2"); - EXPECT_EQ(jsonData["PT_PROFILER_JOB_ATTEMPT_INDEX"].get(), "5"); - - // Verify hostname is non-empty when present (gethostname may not be - // available in all environments, but when it succeeds the value must - // not be empty). - if (jsonData.contains("host_name")) { - EXPECT_FALSE(jsonData["host_name"].get().empty()); - } -#endif - - // Clean up environment variables - unsetenv("PT_PROFILER_JOB_NAME"); - unsetenv("PT_PROFILER_JOB_VERSION"); - unsetenv("PT_PROFILER_JOB_ATTEMPT_INDEX"); -} - TEST_F(CuptiActivityProfilerTest, JsonGPUIDSortTest) { // Set logging level for debugging purpose std::vector log_modules( diff --git a/libkineto/test/RocmActivityProfilerTest.cpp b/libkineto/test/RocmActivityProfilerTest.cpp index a5b0669be..c14477d05 100644 --- a/libkineto/test/RocmActivityProfilerTest.cpp +++ b/libkineto/test/RocmActivityProfilerTest.cpp @@ -734,10 +734,7 @@ TEST_F(RocmActivityProfilerTest, SubActivityProfilers) { profiler.configure(*cfg_, start_time); profiler.startTrace(start_time); - EXPECT_TRUE(profiler.isActive()); - profiler.stopTrace(start_time + nanoseconds(duration_ns)); - EXPECT_TRUE(profiler.isActive()); char filename[] = "/tmp/libkineto_testXXXXXX.json"; mkstemps(filename, 5);