diff --git a/libkineto/libkineto_defs.bzl b/libkineto/libkineto_defs.bzl index 4102776e8..c96240af2 100644 --- a/libkineto/libkineto_defs.bzl +++ b/libkineto/libkineto_defs.bzl @@ -69,6 +69,8 @@ def get_libkineto_cpu_only_srcs(with_api = True): "src/GenericActivityProfiler.cpp", "src/ActivityProfilerController.cpp", "src/ActivityProfilerProxy.cpp", + "src/AsyncActivityProfilerHandler.cpp", + "src/SyncActivityProfilerHandler.cpp", "src/ActivityType.cpp", "src/Config.cpp", "src/ConfigLoader.cpp", diff --git a/libkineto/src/ActivityProfilerController.cpp b/libkineto/src/ActivityProfilerController.cpp index d31213a59..a3aa51267 100644 --- a/libkineto/src/ActivityProfilerController.cpp +++ b/libkineto/src/ActivityProfilerController.cpp @@ -8,14 +8,10 @@ #include "ActivityProfilerController.h" -#include #include -#include #include #include "ActivityLoggerFactory.h" -#include "ActivityTrace.h" - // TODO DEVICE_AGNOSTIC: Move the device decision out of C++ files to be // determined entirely by the build process. For the // controller, we'll need some registration mechanism. @@ -34,7 +30,6 @@ #endif -#include "ThreadUtil.h" #include "output_json.h" #include "output_membuf.h" @@ -94,21 +89,16 @@ ActivityProfilerController::ActivityProfilerController( // directly by the GenericActivityProfiler. profiler_ = std::make_unique(cpuOnly); #endif + + syncHandler_ = std::make_unique( + *profiler_, syncTraceActive_); + asyncHandler_ = std::make_unique( + *profiler_, syncTraceActive_); configLoader_.addHandler(ConfigLoader::ConfigKind::ActivityProfiler, this); } ActivityProfilerController::~ActivityProfilerController() { configLoader_.removeHandler(ConfigLoader::ConfigKind::ActivityProfiler, this); - for (auto profilerThread : profilerThreads_) { - if (profilerThread) { - // signaling termination of the profiler loop - stopRunloop_ = true; - profilerThread->join(); - delete profilerThread; - profilerThread = nullptr; - } - } - #if !USE_GOOGLE_LOG for (auto& collector : loggerCollectors_) { Logger::removeLoggerObserver(collector.get()); @@ -124,7 +114,7 @@ static ActivityLoggerFactory initLoggerFactory() { return factory; } -static ActivityLoggerFactory& loggerFactory() { +ActivityLoggerFactory& ActivityProfilerController::loggerFactory() { static ActivityLoggerFactory factory = initLoggerFactory(); return factory; } @@ -135,7 +125,8 @@ void ActivityProfilerController::addLoggerFactory( loggerFactory().addProtocol(protocol, std::move(factory)); } -static std::unique_ptr makeLogger(const Config& config) { +std::unique_ptr ActivityProfilerController::makeLogger( + const Config& config) { if (config.activitiesLogToMemory()) { return std::make_unique(config); } @@ -154,268 +145,8 @@ void ActivityProfilerController::setInvariantViolationsLoggerFactory( invariantViolationsLoggerFactory() = factory(); } -bool ActivityProfilerController::canAcceptConfig() { - return !profiler_->isActive(); -} - -void ActivityProfilerController::acceptConfig(const Config& config) { - VLOG(1) << "acceptConfig"; - if (config.activityProfilerEnabled()) { - scheduleTrace(config); - } -} - -bool ActivityProfilerController::shouldActivateTimestampConfig( - const std::chrono::time_point& now) { - if (asyncRequestConfig_->hasProfileStartIteration()) { - return false; - } - if (asyncRequestConfig_->memoryProfilerEnabled()) { - return false; - } - // Note on now + Config::kControllerIntervalMsecs: - // Profiler interval does not align perfectly up to startTime - warmup. - // Waiting until the next tick won't allow sufficient time for the - // profiler to warm up. So check if we are very close to the warmup time - // and trigger warmup. - if (now + Config::kControllerIntervalMsecs >= - (asyncRequestConfig_->requestTimestamp() - - asyncRequestConfig_->activitiesWarmupDuration())) { - LOG(INFO) - << "Received on-demand activity trace request by " - << " profile timestamp = " - << asyncRequestConfig_->requestTimestamp().time_since_epoch().count(); - return true; - } - return false; -} - -bool ActivityProfilerController::shouldActivateIterationConfig( - int64_t currentIter) { - if (!asyncRequestConfig_->hasProfileStartIteration()) { - return false; - } - if (asyncRequestConfig_->memoryProfilerEnabled()) { - return false; - } - auto rootIter = asyncRequestConfig_->startIterationIncludingWarmup(); - // Keep waiting, it is not time to start yet. - if (currentIter < rootIter) { - return false; - } - - LOG(INFO) << "Received on-demand activity trace request by " - " profile start iteration = " - << asyncRequestConfig_->profileStartIteration() - << ", current iteration = " << currentIter; - // Re-calculate the start iter if requested iteration is in the past. - if (currentIter > rootIter) { - auto newProfileStart = - currentIter + asyncRequestConfig_->activitiesWarmupIterations(); - // Use Start Iteration Round Up if it is present. - if (asyncRequestConfig_->profileStartIterationRoundUp() > 0) { - // round up to nearest multiple - auto divisor = asyncRequestConfig_->profileStartIterationRoundUp(); - auto rem = newProfileStart % divisor; - newProfileStart += ((rem == 0) ? 0 : divisor - rem); - LOG(INFO) << "Rounding up profiler start iteration to : " - << newProfileStart; - asyncRequestConfig_->setProfileStartIteration(newProfileStart); - if (currentIter != asyncRequestConfig_->startIterationIncludingWarmup()) { - // Ex. Current 9, start 8, warmup 5, roundup 100. Resolves new start - // to 100, with warmup starting at 95. So don't start now. - return false; - } - } else { - LOG(INFO) << "Start iteration updated to : " << newProfileStart; - asyncRequestConfig_->setProfileStartIteration(newProfileStart); - } - } - return true; -} - -void ActivityProfilerController::profilerLoop() { - setThreadName("Kineto Activity Profiler"); - VLOG(0) << "Entering activity profiler loop"; - - auto now = system_clock::now(); - auto next_wakeup_time = now + Config::kControllerIntervalMsecs; - - while (!stopRunloop_) { - now = system_clock::now(); - - while (now < next_wakeup_time) { - /* sleep override */ - std::this_thread::sleep_for(next_wakeup_time - now); - now = system_clock::now(); - } - - // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_->isActive()) { - std::lock_guard lock(asyncConfigLock_); - if (asyncRequestConfig_ && !profiler_->isActive() && - shouldActivateTimestampConfig(now)) { - activateConfig(now); - } - } - - while (next_wakeup_time < now) { - next_wakeup_time += Config::kControllerIntervalMsecs; - } - - // Use syncTraceActive_ so we don't step into the loop while sync trace is - // running - if (profiler_->isActive() && !profiler_->isCollectingMemorySnapshot() && - !syncTraceActive_) { - next_wakeup_time = profiler_->performRunLoopStep(now, next_wakeup_time); - VLOG(1) << "Profiler loop: " - << duration_cast(system_clock::now() - now).count() - << "ms"; - } - } - - VLOG(0) << "Exited activity profiling loop"; -} - -void ActivityProfilerController::memoryProfilerLoop() { - while (!stopRunloop_) { - // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_->isActive()) { - std::lock_guard lock(asyncConfigLock_); - if (asyncRequestConfig_ && !profiler_->isActive() && - asyncRequestConfig_->memoryProfilerEnabled()) { - logger_ = makeLogger(*asyncRequestConfig_); - auto path = asyncRequestConfig_->activitiesLogFile(); - auto profile_time = asyncRequestConfig_->profileMemoryDuration(); - auto config = asyncRequestConfig_->clone(); - asyncRequestConfig_ = nullptr; - profiler_->performMemoryLoop( - path, profile_time, logger_.get(), *config); - } - } - } -} - -void ActivityProfilerController::step() { - // Do not remove this copy to currentIter. Otherwise count is not - // guaranteed. - int64_t currentIter = ++iterationCount_; - VLOG(0) << "Step called , iteration = " << currentIter; - - // Perform Double-checked locking to reduce overhead of taking lock. - if (asyncRequestConfig_ && !profiler_->isActive()) { - std::lock_guard lock(asyncConfigLock_); - auto now = system_clock::now(); - if (asyncRequestConfig_ && !profiler_->isActive() && - shouldActivateIterationConfig(currentIter)) { - activateConfig(now); - } - } - if (profiler_->isActive() && !profiler_->isCollectingMemorySnapshot()) { - auto now = system_clock::now(); - auto next_wakeup_time = now + Config::kControllerIntervalMsecs; - profiler_->performRunLoopStep(now, next_wakeup_time, currentIter); - } -} - -// This function should only be called when holding the configLock_. -void ActivityProfilerController::activateConfig( - std::chrono::time_point now) { - logger_ = makeLogger(*asyncRequestConfig_); - profiler_->setLogger(logger_.get()); - LOGGER_OBSERVER_SET_TRIGGER_ON_DEMAND(); - profiler_->configure(*asyncRequestConfig_, now); - asyncRequestConfig_ = nullptr; -} - -void ActivityProfilerController::scheduleTrace(const Config& config) { - VLOG(1) << "scheduleTrace"; - if (profiler_->isActive()) { - LOG(WARNING) << "Ignored request - profiler busy"; - return; - } - - int64_t currentIter = iterationCount_; - std::unique_ptr configToSchedule; - - if (config.hasProfileStartIteration() && currentIter < 0) { - // Special case: daemon config with activitiesDuration set - if (config.activitiesDuration().count() > 0) { - LOG(INFO) << "Config with duration-based profiling, " - << "ignoring iteration count requirement"; - // Continue with modified config - clone and set profileStartIteration to - // -1 - configToSchedule = config.clone(); - configToSchedule->setProfileStartIteration(-1); - } else { - LOG(WARNING) << "Ignored profile iteration count based request as " - << "application is not updating iteration count"; - return; - } - } else { - configToSchedule = config.clone(); - } - - // Common scheduling logic - bool newConfigScheduled = false; - if (!asyncRequestConfig_) { - std::lock_guard lock(asyncConfigLock_); - if (!asyncRequestConfig_) { - asyncRequestConfig_ = std::move(configToSchedule); - newConfigScheduled = true; - } - } - if (!newConfigScheduled) { - LOG(WARNING) << "Ignored request - another profile request is pending."; - return; - } - - // start a profilerLoop() thread to handle request - - if (config.memoryProfilerEnabled()) { - auto thread_type = ThreadType::MEMORY_SNAPSHOT; - if (!profilerThreads_[thread_type]) { - profilerThreads_[thread_type] = new std::thread( - &ActivityProfilerController::memoryProfilerLoop, this); - } - } else { - auto thread_type = ThreadType::KINETO; - if (!profilerThreads_[thread_type]) { - profilerThreads_[thread_type] = - new std::thread(&ActivityProfilerController::profilerLoop, this); - } - } -} - -void ActivityProfilerController::prepareTrace(const Config& config) { - // Requests from ActivityProfilerApi have higher priority than - // requests from other sources (signal, daemon). - // Cancel any ongoing request and refuse new ones. - auto now = system_clock::now(); - syncTraceActive_ = true; - if (profiler_->isActive()) { - LOG(WARNING) << "Cancelling current trace request in order to start " - << "higher priority synchronous request"; - if (libkineto::api().client()) { - libkineto::api().client()->stop(); - } - profiler_->stopTrace(now); - profiler_->reset(); - } - - profiler_->configure(config, now); -} - -void ActivityProfilerController::toggleCollectionDynamic(const bool enable) { - profiler_->toggleCollectionDynamic(enable); -} - -void ActivityProfilerController::startTrace() { - UST_LOGGER_MARK_COMPLETED(kWarmUpStage); - profiler_->startTrace(std::chrono::system_clock::now()); -} bool ActivityProfilerController::isActive() { - return profiler_->isActive(); + return syncHandler_->isSyncActive() || asyncHandler_->isAsyncActive(); } void ActivityProfilerController::transferCpuTrace( @@ -448,28 +179,6 @@ void ActivityProfilerController::popUserCorrelationId() { profiler_->popUserCorrelationId(); } -std::unique_ptr ActivityProfilerController:: - stopTrace() { - profiler_->stopTrace(std::chrono::system_clock::now()); - UST_LOGGER_MARK_COMPLETED(kCollectionStage); - auto logger = std::make_unique(profiler_->config()); - profiler_->processTrace(*logger); - // Will follow up with another patch for logging URLs when ActivityTrace - // is moved. - UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); - - // Logger Metadata contains a map of LOGs collected in Kineto - // logger_level -> List of log lines - // This will be added into the trace as metadata. - std::unordered_map> loggerMD = - profiler_->getLoggerMetadata(); - logger->setLoggerMetadata(std::move(loggerMD)); - - profiler_->reset(); - syncTraceActive_ = false; - return std::make_unique(std::move(logger), loggerFactory()); -} - void ActivityProfilerController::addMetadata( const std::string& key, const std::string& value) { @@ -487,4 +196,47 @@ void ActivityProfilerController::logInvariantViolation( } } +// Async-only functions +bool ActivityProfilerController::canAcceptConfig() { + return !isActive(); +} +void ActivityProfilerController::acceptConfig(const Config& config) { + if (isActive()) { + LOG(WARNING) << "Ignored request - profiler busy"; + return; + } + asyncHandler_->acceptConfig(config); +} +void ActivityProfilerController::scheduleTrace(const Config& config) { + if (isActive()) { + LOG(WARNING) << "Ignored request - profiler busy"; + return; + } + asyncHandler_->scheduleTrace(config); +} +void ActivityProfilerController::step() { + asyncHandler_->step(); +} + +// Sync-only functions +void ActivityProfilerController::prepareTrace(const Config& config) { + // Sync-trace requests preempt any active trace. + asyncHandler_->cancel(); + if (syncHandler_->isSyncActive()) { + syncHandler_->cancel(); + } + + syncHandler_->prepareTrace(config); +} +void ActivityProfilerController::toggleCollectionDynamic(const bool enable) { + syncHandler_->toggleCollectionDynamic(enable); +} +void ActivityProfilerController::startTrace() { + syncHandler_->startTrace(); +} +std::unique_ptr ActivityProfilerController:: + stopTrace() { + return syncHandler_->stopTrace(); +} + } // namespace KINETO_NAMESPACE diff --git a/libkineto/src/ActivityProfilerController.h b/libkineto/src/ActivityProfilerController.h index c31c1cfcd..0de897737 100644 --- a/libkineto/src/ActivityProfilerController.h +++ b/libkineto/src/ActivityProfilerController.h @@ -12,8 +12,6 @@ #include #include #include -#include -#include #include // TODO(T90238193) @@ -21,17 +19,14 @@ #include "ActivityLoggerFactory.h" #include "ActivityProfilerInterface.h" #include "ActivityTraceInterface.h" +#include "AsyncActivityProfilerHandler.h" #include "ConfigLoader.h" #include "GenericActivityProfiler.h" #include "InvariantViolations.h" #include "LoggerCollector.h" +#include "SyncActivityProfilerHandler.h" namespace KINETO_NAMESPACE { -enum ThreadType { - KINETO = 0, - MEMORY_SNAPSHOT, - THREAD_MAX_COUNT // Number of enum entries (used for array sizing) -}; class Config; @@ -88,24 +83,18 @@ class ActivityProfilerController : public ConfigLoader::ConfigHandler { void popUserCorrelationId(); - private: - bool shouldActivateIterationConfig(int64_t currentIter); - bool shouldActivateTimestampConfig(const std::chrono::time_point& now); - void profilerLoop(); - void memoryProfilerLoop(); - void activateConfig(std::chrono::time_point now); + static std::unique_ptr makeLogger(const Config& config); - std::unique_ptr asyncRequestConfig_; - std::mutex asyncConfigLock_; + static ActivityLoggerFactory& loggerFactory(); + private: std::unique_ptr profiler_; - std::unique_ptr logger_; std::vector> loggerCollectors_; - std::thread* profilerThreads_[ThreadType::THREAD_MAX_COUNT] = {nullptr}; - std::atomic_bool stopRunloop_{false}; std::atomic_bool syncTraceActive_{false}; - std::atomic iterationCount_{-1}; ConfigLoader& configLoader_; + + std::unique_ptr syncHandler_; + std::unique_ptr asyncHandler_; }; } // namespace KINETO_NAMESPACE diff --git a/libkineto/src/AsyncActivityProfilerHandler.cpp b/libkineto/src/AsyncActivityProfilerHandler.cpp new file mode 100644 index 000000000..5067a29c8 --- /dev/null +++ b/libkineto/src/AsyncActivityProfilerHandler.cpp @@ -0,0 +1,465 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "AsyncActivityProfilerHandler.h" + +#include + +#include "ActivityProfilerController.h" +#include "Config.h" +#include "GenericActivityProfiler.h" +#include "Logger.h" +#include "ThreadUtil.h" +#include "libkineto.h" +#include "output_membuf.h" + +using namespace std::chrono; + +namespace KINETO_NAMESPACE { + +AsyncActivityProfilerHandler::AsyncActivityProfilerHandler( + GenericActivityProfiler& profiler, + std::atomic_bool& syncTraceActive) + : profiler_(profiler), syncTraceActive_(syncTraceActive) {} + +AsyncActivityProfilerHandler::~AsyncActivityProfilerHandler() { + ensureCollectTraceDone(); + for (auto profilerThread : profilerThreads_) { + if (profilerThread) { + // signaling termination of the profiler loop + stopRunloop_ = true; + profilerThread->join(); + delete profilerThread; + profilerThread = nullptr; + } + } +} + +void AsyncActivityProfilerHandler::acceptConfig(const Config& config) { + VLOG(1) << "acceptConfig"; + if (config.activityProfilerEnabled()) { + scheduleTrace(config); + } +} + +void AsyncActivityProfilerHandler::scheduleTrace(const Config& config) { + VLOG(1) << "scheduleTrace"; + + int64_t currentIter = iterationCount_; + std::unique_ptr configToSchedule; + + if (config.hasProfileStartIteration() && currentIter < 0) { + // Special case: daemon config with activitiesDuration set + if (config.activitiesDuration().count() > 0) { + LOG(INFO) << "Config with duration-based profiling, " + << "ignoring iteration count requirement"; + // Continue with modified config - clone and set profileStartIteration to + // -1 + configToSchedule = config.clone(); + configToSchedule->setProfileStartIteration(-1); + } else { + LOG(WARNING) << "Ignored profile iteration count based request as " + << "application is not updating iteration count"; + return; + } + } else { + configToSchedule = config.clone(); + } + + // Common scheduling logic + bool newConfigScheduled = false; + if (!asyncRequestConfig_) { + std::lock_guard lock(asyncConfigLock_); + if (!asyncRequestConfig_) { + asyncRequestConfig_ = std::move(configToSchedule); + newConfigScheduled = true; + } + } + if (!newConfigScheduled) { + LOG(WARNING) << "Ignored request - another profile request is pending."; + return; + } + + // start a profilerLoop() thread to handle request + + if (config.memoryProfilerEnabled()) { + auto thread_type = ThreadType::MEMORY_SNAPSHOT; + if (!profilerThreads_[thread_type]) { + profilerThreads_[thread_type] = new std::thread( + &AsyncActivityProfilerHandler::memoryProfilerLoop, this); + } + } else { + auto thread_type = ThreadType::KINETO; + if (!profilerThreads_[thread_type]) { + profilerThreads_[thread_type] = + new std::thread(&AsyncActivityProfilerHandler::profilerLoop, this); + } + } +} + +void AsyncActivityProfilerHandler::step() { + // Do not remove this copy to currentIter. Otherwise count is not + // guaranteed. + int64_t currentIter = ++iterationCount_; + VLOG(0) << "Step called , iteration = " << currentIter; + + // Perform Double-checked locking to reduce overhead of taking lock. + if (asyncRequestConfig_ && !isAsyncActive()) { + std::lock_guard lock(asyncConfigLock_); + auto now = system_clock::now(); + if (asyncRequestConfig_ && !isAsyncActive() && + shouldActivateIterationConfig(currentIter)) { + activateConfig(now); + } + } + if (isAsyncActive() && !isCollectingMemorySnapshot()) { + auto now = system_clock::now(); + auto next_wakeup_time = now + Config::kControllerIntervalMsecs; + performRunLoopStep(now, next_wakeup_time, currentIter); + } +} + +bool AsyncActivityProfilerHandler::shouldActivateTimestampConfig( + const std::chrono::time_point& now) { + if (asyncRequestConfig_->hasProfileStartIteration()) { + return false; + } + if (asyncRequestConfig_->memoryProfilerEnabled()) { + return false; + } + // Note on now + Config::kControllerIntervalMsecs: + // Profiler interval does not align perfectly up to startTime - warmup. + // Waiting until the next tick won't allow sufficient time for the + // profiler to warm up. So check if we are very close to the warmup time + // and trigger warmup. + if (now + Config::kControllerIntervalMsecs >= + (asyncRequestConfig_->requestTimestamp() - + asyncRequestConfig_->activitiesWarmupDuration())) { + LOG(INFO) + << "Received on-demand activity trace request by " + << " profile timestamp = " + << asyncRequestConfig_->requestTimestamp().time_since_epoch().count(); + return true; + } + return false; +} + +bool AsyncActivityProfilerHandler::shouldActivateIterationConfig( + int64_t currentIter) { + if (!asyncRequestConfig_->hasProfileStartIteration()) { + return false; + } + if (asyncRequestConfig_->memoryProfilerEnabled()) { + return false; + } + auto rootIter = asyncRequestConfig_->startIterationIncludingWarmup(); + // Keep waiting, it is not time to start yet. + if (currentIter < rootIter) { + return false; + } + + LOG(INFO) << "Received on-demand activity trace request by " + " profile start iteration = " + << asyncRequestConfig_->profileStartIteration() + << ", current iteration = " << currentIter; + // Re-calculate the start iter if requested iteration is in the past. + if (currentIter > rootIter) { + auto newProfileStart = + currentIter + asyncRequestConfig_->activitiesWarmupIterations(); + // Use Start Iteration Round Up if it is present. + if (asyncRequestConfig_->profileStartIterationRoundUp() > 0) { + // round up to nearest multiple + auto divisor = asyncRequestConfig_->profileStartIterationRoundUp(); + auto rem = newProfileStart % divisor; + newProfileStart += ((rem == 0) ? 0 : divisor - rem); + LOG(INFO) << "Rounding up profiler start iteration to : " + << newProfileStart; + asyncRequestConfig_->setProfileStartIteration(newProfileStart); + if (currentIter != asyncRequestConfig_->startIterationIncludingWarmup()) { + // Ex. Current 9, start 8, warmup 5, roundup 100. Resolves new start + // to 100, with warmup starting at 95. So don't start now. + return false; + } + } else { + LOG(INFO) << "Start iteration updated to : " << newProfileStart; + asyncRequestConfig_->setProfileStartIteration(newProfileStart); + } + } + return true; +} + +void AsyncActivityProfilerHandler::profilerLoop() { + setThreadName("Kineto Activity Profiler"); + VLOG(0) << "Entering activity profiler loop"; + + auto now = system_clock::now(); + auto next_wakeup_time = now + Config::kControllerIntervalMsecs; + + while (!stopRunloop_) { + now = system_clock::now(); + + while (now < next_wakeup_time) { + /* sleep override */ + std::this_thread::sleep_for(next_wakeup_time - now); + now = system_clock::now(); + } + + // Perform Double-checked locking to reduce overhead of taking lock. + if (asyncRequestConfig_ && !isAsyncActive()) { + std::lock_guard lock(asyncConfigLock_); + if (asyncRequestConfig_ && !isAsyncActive() && + shouldActivateTimestampConfig(now)) { + activateConfig(now); + } + } + + while (next_wakeup_time < now) { + next_wakeup_time += Config::kControllerIntervalMsecs; + } + + // Use syncTraceActive_ so we don't step into the loop while sync trace is + // running + if (isAsyncActive() && !isCollectingMemorySnapshot() && !syncTraceActive_) { + next_wakeup_time = performRunLoopStep(now, next_wakeup_time); + VLOG(1) << "Profiler loop: " + << duration_cast(system_clock::now() - now).count() + << "ms"; + } + } + + VLOG(0) << "Exited activity profiling loop"; +} + +void AsyncActivityProfilerHandler::memoryProfilerLoop() { + while (!stopRunloop_) { + // Perform Double-checked locking to reduce overhead of taking lock. + if (asyncRequestConfig_ && !isAsyncActive()) { + std::lock_guard lock(asyncConfigLock_); + if (asyncRequestConfig_ && !isAsyncActive() && + asyncRequestConfig_->memoryProfilerEnabled()) { + logger_ = ActivityProfilerController::makeLogger(*asyncRequestConfig_); + auto path = asyncRequestConfig_->activitiesLogFile(); + auto profile_time = asyncRequestConfig_->profileMemoryDuration(); + auto config = asyncRequestConfig_->clone(); + asyncRequestConfig_ = nullptr; + performMemoryLoop(path, profile_time, logger_.get(), *config); + } + } + } +} + +void AsyncActivityProfilerHandler::configure( + const Config& config, + std::chrono::time_point now) { + if (!profiler_.canStart(config, now)) { + return; + } + logger_ = ActivityProfilerController::makeLogger(config); + profiler_.setLogger(logger_.get()); + LOGGER_OBSERVER_SET_TRIGGER_ON_DEMAND(); + profiler_.configure(config, now); + currentRunloopState_ = RunloopState::Warmup; +} + +// This function should only be called when holding the configLock_. +void AsyncActivityProfilerHandler::activateConfig( + std::chrono::time_point now) { + configure(*asyncRequestConfig_, now); + asyncRequestConfig_ = nullptr; +} + +time_point AsyncActivityProfilerHandler::performRunLoopStep( + const time_point& now, + const time_point& nextWakeupTime, + int64_t currentIter) { + auto new_wakeup_time = nextWakeupTime; + + VLOG_IF(1, currentIter >= 0) + << "Run loop on application step(), iteration = " << currentIter; + + switch (currentRunloopState_) { + case RunloopState::CollectMemorySnapshot: + LOG(WARNING) + << "Entered CollectMemorySnapshot in Kineto Loop Step, skipping loop"; + break; + case RunloopState::WaitForRequest: + VLOG(1) << "State: WaitForRequest"; + // Nothing to do + break; + + case RunloopState::Warmup: { + VLOG(1) << "State: Warmup"; + profiler_.flushWarmupBuffers(currentIter, nextWakeupTime); + + if (profiler_.isGpuCollectionStopped()) { + profiler_.stopTraceAndReset(now); + LOG(ERROR) + << "State: Warmup stopped by GPU profiler. (Buffer size configured is " + << profiler_.activitiesMaxGpuBufferSizeMB() << "MB)"; + UST_LOGGER_MARK_COMPLETED(kWarmUpStage); + VLOG(0) << "Warmup -> WaitForRequest"; + currentRunloopState_ = RunloopState::WaitForRequest; + break; + } + + if (profiler_.isWarmupDone(now, currentIter)) { + UST_LOGGER_MARK_COMPLETED(kWarmUpStage); + if (!profiler_.isProfilingByIteration() && + (now > profiler_.profileStartTime() + milliseconds(10))) { + LOG(INFO) << "Tracing started " + << duration_cast( + now - profiler_.profileStartTime()) + .count() + << "ms late!"; + } else { + LOG(INFO) << "Tracing started"; + } + profiler_.startTrace(now); + currentRunloopState_ = RunloopState::CollectTrace; + if (libkineto::api().client()) { + libkineto::api().client()->start(); + } + if (nextWakeupTime > profiler_.profileEndTime()) { + new_wakeup_time = profiler_.profileEndTime(); + } + } else if (nextWakeupTime > profiler_.profileStartTime()) { + new_wakeup_time = profiler_.profileStartTime(); + } + break; + } + + case RunloopState::CollectTrace: { + VLOG(1) << "State: CollectTrace"; + bool collection_done = profiler_.isCollectionDone(now, currentIter); + + if (collection_done || profiler_.isGpuCollectionStopped()) { + LOG(INFO) << "Tracing complete."; + VLOG_IF(1, currentIter >= 0) + << "This state change was invoked by application's step() call"; + // currentIter >= 0 means this is called from the step() api of + // the profiler in pytorch main thread, it should be executed in + // another thread in case pytorch main thread is blocked + if (currentIter >= 0) { + // if collectTraceThread_ is already running, there's no need to + // execute collectTrace twice. + // Do not call collectTrace when profilerThread_ is collecting + // Trace. Otherwise, libkineto::api().client()->stop will be called + // twice, which leads to an unrecoverable ::c10:Error at + // disableProfiler + if (!collectTraceThread_ && !getCollectTraceState()) { + std::lock_guard guard( + collectTraceStateMutex_); + collectTraceThread_ = std::make_unique( + &AsyncActivityProfilerHandler::collectTrace, + this, + collection_done, + now); + } + break; + } + // this is executed in profilerThread_ + { + std::lock_guard guard(collectTraceStateMutex_); + isCollectingTrace_ = true; + } + collectTrace(collection_done, now); + { + std::lock_guard guard(collectTraceStateMutex_); + isCollectingTrace_ = false; + } + } else if (profiler_.isProfilingByIteration()) { + // nothing to do here + } else if ( + now < profiler_.profileEndTime() && + profiler_.profileEndTime() < nextWakeupTime) { + new_wakeup_time = profiler_.profileEndTime(); + } + break; + } + + case RunloopState::ProcessTrace: { + VLOG(1) << "State: ProcessTrace"; + // skip this state transition if it called from the step() api + // of the profiler. + // else it could lead to a race between the profiler thread and an + // application thread calling step() + if (currentIter >= 0) { + return new_wakeup_time; + } + + // Before processing, we should wait for collectTrace thread to be done. + ensureCollectTraceDone(); + + // FIXME: Probably want to allow interruption here + // for quickly handling trace request via synchronous API + profiler_.processTraceAndReset(*logger_); + UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); + currentRunloopState_ = RunloopState::WaitForRequest; + VLOG(0) << "ProcessTrace -> WaitForRequest"; + break; + } + } + + return new_wakeup_time; +} + +void AsyncActivityProfilerHandler::collectTrace( + bool collection_done, + const std::chrono::time_point& now) { + profiler_.collectTrace(collection_done, now); + currentRunloopState_ = RunloopState::ProcessTrace; +} + +void AsyncActivityProfilerHandler::performMemoryLoop( + const std::string& path, + uint32_t profile_time, + ActivityLogger* logger, + Config& config) { + currentRunloopState_ = RunloopState::CollectMemorySnapshot; + if (libkineto::api().client()) { + libkineto::api().client()->start_memory_profile(); + LOG(INFO) << "Running memory profiling for " << profile_time << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(profile_time)); + LOG(INFO) << "Exporting memory profiling results to " << path; + libkineto::api().client()->export_memory_profile(path); + libkineto::api().client()->stop_memory_profile(); + LOG(INFO) << "Finalizing trace"; + logger->finalizeMemoryTrace(path, config); + } + currentRunloopState_ = RunloopState::WaitForRequest; +} + +bool AsyncActivityProfilerHandler::getCollectTraceState() { + std::lock_guard guard(collectTraceStateMutex_); + return isCollectingTrace_; +} + +void AsyncActivityProfilerHandler::ensureCollectTraceDone() { + if (collectTraceThread_ && collectTraceThread_->joinable()) { + collectTraceThread_->join(); + collectTraceThread_.reset(nullptr); + } +} + +void AsyncActivityProfilerHandler::cancel() { + { + std::lock_guard lock(asyncConfigLock_); + asyncRequestConfig_ = nullptr; + } + if (!isAsyncActive()) { + return; + } + ensureCollectTraceDone(); + if (libkineto::api().client()) { + libkineto::api().client()->stop(); + } + profiler_.stopTraceAndReset(std::chrono::system_clock::now()); + currentRunloopState_ = RunloopState::WaitForRequest; +} + +} // namespace KINETO_NAMESPACE diff --git a/libkineto/src/AsyncActivityProfilerHandler.h b/libkineto/src/AsyncActivityProfilerHandler.h new file mode 100644 index 000000000..26f78a99d --- /dev/null +++ b/libkineto/src/AsyncActivityProfilerHandler.h @@ -0,0 +1,103 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "ActivityLoggerFactory.h" +#include "GenericActivityProfiler.h" + +namespace KINETO_NAMESPACE { + +enum ThreadType { + KINETO = 0, + MEMORY_SNAPSHOT, + THREAD_MAX_COUNT // Number of enum entries (used for array sizing) +}; + +class Config; + +class AsyncActivityProfilerHandler { + public: + explicit AsyncActivityProfilerHandler(GenericActivityProfiler& profiler, std::atomic_bool& syncTraceActive); + + AsyncActivityProfilerHandler(const AsyncActivityProfilerHandler&) = delete; + AsyncActivityProfilerHandler& operator=(const AsyncActivityProfilerHandler&) = delete; + + ~AsyncActivityProfilerHandler(); + + void acceptConfig(const Config& config); + void scheduleTrace(const Config& config); + void step(); + + [[nodiscard]] bool isAsyncActive() const { + return currentRunloopState_ != RunloopState::WaitForRequest; + } + + [[nodiscard]] bool isCollectingMemorySnapshot() const { + return currentRunloopState_ == RunloopState::CollectMemorySnapshot; + } + + void cancel(); + + void configure(const Config& config, std::chrono::time_point now); + + // Invoke at a regular interval to perform profiling activities. + // When not active, an interval of 1-5 seconds is probably fine, + // depending on required warm-up time and delayed start time. + // When active, it's a good idea to invoke more frequently to stay below + // memory usage limit (ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB) during warmup. + std::chrono::time_point performRunLoopStep( + const std::chrono::time_point& now, + const std::chrono::time_point& nextWakeupTime, + int64_t currentIter = -1); + + void ensureCollectTraceDone(); + + private: + bool shouldActivateIterationConfig(int64_t currentIter); + bool shouldActivateTimestampConfig(const std::chrono::time_point& now); + void profilerLoop(); + void memoryProfilerLoop(); + void activateConfig(std::chrono::time_point now); + + std::unique_ptr asyncRequestConfig_; + std::mutex asyncConfigLock_; + std::thread* profilerThreads_[ThreadType::THREAD_MAX_COUNT] = {nullptr}; + std::atomic_bool stopRunloop_{false}; + std::atomic iterationCount_{-1}; + + GenericActivityProfiler& profiler_; + std::atomic_bool& syncTraceActive_; + std::unique_ptr logger_; + + enum class RunloopState { + WaitForRequest, + Warmup, + CollectTrace, + ProcessTrace, + CollectMemorySnapshot, + }; + + void performMemoryLoop(const std::string& path, uint32_t profile_time, ActivityLogger* logger, Config& config); + + void collectTrace(bool collection_done, const std::chrono::time_point& now); + + bool getCollectTraceState(); + + std::atomic currentRunloopState_{RunloopState::WaitForRequest}; + std::unique_ptr collectTraceThread_{nullptr}; + std::recursive_mutex collectTraceStateMutex_; + bool isCollectingTrace_{false}; +}; +} // namespace KINETO_NAMESPACE diff --git a/libkineto/src/CuptiActivityProfiler.cpp b/libkineto/src/CuptiActivityProfiler.cpp index e9ffed026..9a2d1f0a3 100644 --- a/libkineto/src/CuptiActivityProfiler.cpp +++ b/libkineto/src/CuptiActivityProfiler.cpp @@ -134,11 +134,11 @@ void CuptiActivityProfiler::disableGpuTracing() { cupti_.disableCuptiActivities(derivedConfig_->profileActivityTypes()); } -void CuptiActivityProfiler::clearGpuActivities() { +void CuptiActivityProfiler::clearGpuActivitiesImpl() { cupti_.clearActivities(); } -bool CuptiActivityProfiler::isGpuCollectionStopped() const { +bool CuptiActivityProfiler::isGpuCollectionStoppedImpl() const { return cupti_.stopCollection; } diff --git a/libkineto/src/CuptiActivityProfiler.h b/libkineto/src/CuptiActivityProfiler.h index 344e62b6b..49d3e233b 100644 --- a/libkineto/src/CuptiActivityProfiler.h +++ b/libkineto/src/CuptiActivityProfiler.h @@ -27,8 +27,8 @@ class CuptiActivityProfiler : public GenericActivityProfiler { void setMaxGpuBufferSize(int64_t size) override; void enableGpuTracing() override; void disableGpuTracing() override; - void clearGpuActivities() override; - bool isGpuCollectionStopped() const override; + void clearGpuActivitiesImpl() override; + bool isGpuCollectionStoppedImpl() const override; void processGpuActivities(ActivityLogger& logger) override; void synchronizeGpuDevice() override; void pushCorrelationIdImpl(uint64_t id, CorrelationFlowType type) override; diff --git a/libkineto/src/GenericActivityProfiler.cpp b/libkineto/src/GenericActivityProfiler.cpp index 9e7f3458c..0e4d52bd2 100644 --- a/libkineto/src/GenericActivityProfiler.cpp +++ b/libkineto/src/GenericActivityProfiler.cpp @@ -102,23 +102,14 @@ std::ostream& operator<<( } GenericActivityProfiler::GenericActivityProfiler(bool cpuOnly) - : flushOverhead_{0, 0}, - setupOverhead_{0, 0}, - cpuOnly_{cpuOnly}, - currentRunloopState_{RunloopState::WaitForRequest} {} - -GenericActivityProfiler::~GenericActivityProfiler() { - if (collectTraceThread_ && collectTraceThread_->joinable()) { - collectTraceThread_->join(); - } -} + : flushOverhead_{0, 0}, setupOverhead_{0, 0}, cpuOnly_{cpuOnly} {} +GenericActivityProfiler::~GenericActivityProfiler() {} void GenericActivityProfiler::transferCpuTrace( std::unique_ptr cpuTrace) { std::lock_guard guard(mutex_); const string& trace_name = cpuTrace->span.name; - if (currentRunloopState_ != RunloopState::CollectTrace && - currentRunloopState_ != RunloopState::ProcessTrace) { + if (!acceptCpuTraces_) { VLOG(0) << "Trace collection not in progress - discarding span " << trace_name; return; @@ -470,10 +461,6 @@ void GenericActivityProfiler::configure( const Config& config, const time_point& now) { std::lock_guard guard(mutex_); - if (isActive()) { - LOG(WARNING) << "GenericActivityProfiler already busy, terminating"; - return; - } ApproximateClockToUnixTimeConverter clockConverter; get_time_converter() = clockConverter.makeConverter(); @@ -491,11 +478,6 @@ void GenericActivityProfiler::configure( derivedConfig_.reset(); derivedConfig_ = std::make_unique(*config_); - // Check if now is a valid time to start. - if (!derivedConfig_->canStart(now)) { - return; - } - if (LOG_IS_ON(INFO)) { config_->printActivityProfilerConfig(LIBKINETO_DBG_STREAM); } @@ -573,18 +555,33 @@ void GenericActivityProfiler::configure( traceBuffers_ = std::make_unique(); captureWindowStartTime_ = captureWindowEndTime_ = 0; - currentRunloopState_ = RunloopState::Warmup; + acceptCpuTraces_ = false; } -bool GenericActivityProfiler::getCollectTraceState() { - std::lock_guard guard(collectTraceStateMutex_); - return isCollectingTrace; -} +void GenericActivityProfiler::flushWarmupBuffers( + int64_t currentIter, + const time_point& nextWakeupTime) { + if (cpuOnly_) { + return; + } + // Flushing GPU activities can take a while so avoid doing it close to + // the start time. Clear during warmup in the following cases: + // 1. Iteration-based flow: called from application step() API + // (currentIter >= 0) with iteration profiling enabled + // 2. Timestamp-based flow: called from periodic runloop + // (currentIter < 0) when not close to profile start time + // 3. Iteration config with periodic runloop: always safe to clear + const bool isIterationBasedFlow = + derivedConfig_->isProfilingByIteration() && currentIter >= 0; + const bool isTimestampBasedFlowSafeToFlush = + !derivedConfig_->isProfilingByIteration() && currentIter < 0 && + nextWakeupTime < derivedConfig_->profileStartTime(); + const bool isIterationConfigWithPeriodicRunloop = + derivedConfig_->isProfilingByIteration() && currentIter < 0; -void GenericActivityProfiler::ensureCollectTraceDone() { - if (collectTraceThread_ && collectTraceThread_->joinable()) { - collectTraceThread_->join(); - collectTraceThread_.reset(nullptr); + if (isIterationBasedFlow || isTimestampBasedFlowSafeToFlush || + isIterationConfigWithPeriodicRunloop) { + clearGpuActivitiesImpl(); } } @@ -604,12 +601,12 @@ void GenericActivityProfiler::toggleCollectionDynamic(const bool enable) { void GenericActivityProfiler::startTraceInternal( const time_point& now) { captureWindowStartTime_ = libkineto::timeSinceEpoch(now); + acceptCpuTraces_ = true; VLOG(0) << "Warmup -> CollectTrace"; for (auto& session : sessions_) { LOG(INFO) << "Starting child profiler session"; session->start(); } - currentRunloopState_ = RunloopState::CollectTrace; } void GenericActivityProfiler::stopTraceInternal( @@ -629,23 +626,15 @@ void GenericActivityProfiler::stopTraceInternal( } } - if (currentRunloopState_ == RunloopState::CollectTrace) { - VLOG(0) << "CollectTrace -> ProcessTrace"; - } else { - LOG(WARNING) << "Called stopTrace with state == " - << static_cast>( - currentRunloopState_.load()); - } for (auto& session : sessions_) { LOG(INFO) << "Stopping child profiler session"; session->stop(); } - currentRunloopState_ = RunloopState::ProcessTrace; } void GenericActivityProfiler::resetInternal() { + acceptCpuTraces_ = false; resetTraceData(); - currentRunloopState_ = RunloopState::WaitForRequest; } void GenericActivityProfiler::finalizeTrace( @@ -787,7 +776,7 @@ void GenericActivityProfiler::popUserCorrelationId() { void GenericActivityProfiler::resetTraceData() { if (!cpuOnly_) { - clearGpuActivities(); + clearGpuActivitiesImpl(); onResetTraceData(); } activityMap_.clear(); @@ -808,191 +797,6 @@ void GenericActivityProfiler::resetTraceData() { #endif // !USE_GOOGLE_LOG } -// On-demand only code follows. -// -// TODO: Decide if we should refactor this into either the controller -// or its own class. -time_point GenericActivityProfiler::performRunLoopStep( - const time_point& now, - const time_point& nextWakeupTime, - int64_t currentIter) { - auto new_wakeup_time = nextWakeupTime; - bool warmup_done = false; - bool collection_done = false; - - VLOG_IF(1, currentIter >= 0) - << "Run loop on application step(), iteration = " << currentIter; - - switch (currentRunloopState_) { - case RunloopState::CollectMemorySnapshot: - LOG(WARNING) - << "Entered CollectMemorySnapshot in Kineto Loop Step, skipping loop"; - break; - case RunloopState::WaitForRequest: - VLOG(1) << "State: WaitForRequest"; - // Nothing to do - break; - - case RunloopState::Warmup: - VLOG(1) << "State: Warmup"; - warmup_done = derivedConfig_->isWarmupDone(now, currentIter); - { - // Flushing GPU activities can take a while so avoid doing it close to - // the start time. Clear during warmup in the following cases: - // 1. Iteration-based flow: called from application step() API - // (currentIter >= 0) with iteration profiling enabled - // 2. Timestamp-based flow: called from periodic runloop - // (currentIter < 0) when not close to profile start time - // 3. Iteration config with periodic runloop: always safe to clear - const bool isIterationBasedFlow = - derivedConfig_->isProfilingByIteration() && currentIter >= 0; - const bool isTimestampBasedFlowSafeToFlush = - !derivedConfig_->isProfilingByIteration() && currentIter < 0 && - nextWakeupTime < derivedConfig_->profileStartTime(); - const bool isIterationConfigWithPeriodicRunloop = - derivedConfig_->isProfilingByIteration() && currentIter < 0; - - if (!cpuOnly_ && - (isIterationBasedFlow || isTimestampBasedFlowSafeToFlush || - isIterationConfigWithPeriodicRunloop)) { - clearGpuActivities(); - } - } - - if (isGpuCollectionStopped()) { - // Go to process trace to clear any outstanding buffers etc - std::lock_guard guard(mutex_); - stopTraceInternal(now); - resetInternal(); - LOG(ERROR) - << "State: Warmup stopped by GPU profiler. (Buffer size configured is " - << config_->activitiesMaxGpuBufferSize() / 1024 / 1024 << "MB)"; - UST_LOGGER_MARK_COMPLETED(kWarmUpStage); - VLOG(0) << "Warmup -> WaitForRequest"; - break; - } - - if (warmup_done) { - UST_LOGGER_MARK_COMPLETED(kWarmUpStage); - if (!derivedConfig_->isProfilingByIteration() && - (now > derivedConfig_->profileStartTime() + milliseconds(10))) { - LOG(INFO) << "Tracing started " - << duration_cast( - now - derivedConfig_->profileStartTime()) - .count() - << "ms late!"; - } else { - LOG(INFO) << "Tracing started"; - } - startTrace(now); - if (libkineto::api().client()) { - libkineto::api().client()->start(); - } - if (nextWakeupTime > derivedConfig_->profileEndTime()) { - new_wakeup_time = derivedConfig_->profileEndTime(); - } - } else if (nextWakeupTime > derivedConfig_->profileStartTime()) { - new_wakeup_time = derivedConfig_->profileStartTime(); - } - - break; - - case RunloopState::CollectTrace: - VLOG(1) << "State: CollectTrace"; - collection_done = derivedConfig_->isCollectionDone(now, currentIter); - - if (collection_done || isGpuCollectionStopped()) { - // Update runloop state first to prevent further updates to shared - // state - LOG(INFO) << "Tracing complete."; - VLOG_IF(1, currentIter >= 0) - << "This state change was invoked by application's step() call"; - - // currentIter >= 0 means this is called from the step() api of - // the profile in pytorch main thread, it should be executed in - // another thread in case pytorch main thread is blocked - if (currentIter >= 0) { - // if collectTraceThread_ is already running, there's no need to - // execute collectTrace twice. - // Do not call collectTrace when profilerThread_ is collecting - // Trace. Otherwise, libkineto::api().client()->stop will be called - // twice, which leads to an unrecoverable ::c10:Error at - // disableProfiler - if (!collectTraceThread_ && !getCollectTraceState()) { - std::lock_guard guard(mutex_); - collectTraceThread_ = std::make_unique( - &GenericActivityProfiler::collectTrace, - this, - collection_done, - now); - } - break; - } - // this is executed in profilerThread_ - { - std::lock_guard guard(collectTraceStateMutex_); - isCollectingTrace = true; - } - collectTrace(collection_done, now); - { - std::lock_guard guard(collectTraceStateMutex_); - isCollectingTrace = false; - } - } else if (derivedConfig_->isProfilingByIteration()) { - // nothing to do here - } else if ( - now < derivedConfig_->profileEndTime() && - derivedConfig_->profileEndTime() < nextWakeupTime) { - new_wakeup_time = derivedConfig_->profileEndTime(); - } - - break; - - case RunloopState::ProcessTrace: - VLOG(1) << "State: ProcessTrace"; - // skip this state transition if it called from the step() api - // of the profiler. - // else it could lead to a race between the profiler thread and an - // application thread calling step() - if (currentIter >= 0) { - return new_wakeup_time; - } - - // Before processing, we should wait for collectTrace thread to be done. - ensureCollectTraceDone(); - - // FIXME: Probably want to allow interruption here - // for quickly handling trace request via synchronous API - std::lock_guard guard(mutex_); - processTraceInternal(*logger_); - UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); - resetInternal(); - VLOG(0) << "ProcessTrace -> WaitForRequest"; - break; - } - - return new_wakeup_time; -} - -void GenericActivityProfiler::performMemoryLoop( - const string& path, - uint32_t profile_time, - ActivityLogger* logger, - Config& config) { - currentRunloopState_ = RunloopState::CollectMemorySnapshot; - if (libkineto::api().client()) { - libkineto::api().client()->start_memory_profile(); - LOG(INFO) << "Running memory profiling for " << profile_time << " ms"; - std::this_thread::sleep_for(std::chrono::milliseconds(profile_time)); - LOG(INFO) << "Exporting memory profiling results to " << path; - libkineto::api().client()->export_memory_profile(path); - libkineto::api().client()->stop_memory_profile(); - LOG(INFO) << "Finalizing trace"; - logger->finalizeMemoryTrace(path, config); - } - currentRunloopState_ = RunloopState::WaitForRequest; -} - void GenericActivityProfiler::collectTrace( bool collection_done, const std::chrono::time_point& now) { @@ -1000,7 +804,7 @@ void GenericActivityProfiler::collectTrace( libkineto::api().client()->stop(); } - if (isGpuCollectionStopped()) { + if (isGpuCollectionStoppedImpl()) { ecs_.gpu_stopped_early = true; LOG(ERROR) << "State: CollectTrace stopped by GPU profiler. (Buffer size configured is " diff --git a/libkineto/src/GenericActivityProfiler.h b/libkineto/src/GenericActivityProfiler.h index 7fa4d5c2e..5ca8df73c 100644 --- a/libkineto/src/GenericActivityProfiler.h +++ b/libkineto/src/GenericActivityProfiler.h @@ -113,25 +113,6 @@ class GenericActivityProfiler { GenericActivityProfiler& operator=(const GenericActivityProfiler&) = delete; virtual ~GenericActivityProfiler(); - bool isActive() const { - return currentRunloopState_ != RunloopState::WaitForRequest; - } - bool isCollectingMemorySnapshot() const { - return currentRunloopState_ == RunloopState::CollectMemorySnapshot; - } - - // Invoke at a regular interval to perform profiling activities. - // When not active, an interval of 1-5 seconds is probably fine, - // depending on required warm-up time and delayed start time. - // When active, it's a good idea to invoke more frequently to stay below - // memory usage limit (ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB) during warmup. - std::chrono::time_point performRunLoopStep( - const std::chrono::time_point& now, - const std::chrono::time_point& nextWakeupTime, - int64_t currentIter = -1); - - void performMemoryLoop(const std::string& path, uint32_t profile_time, ActivityLogger* logger, Config& config); - // Collect CPU and GPU traces void collectTrace(bool collection_done, const std::chrono::time_point& now); @@ -156,7 +137,37 @@ class GenericActivityProfiler { return cpuActivityPresent_ || gpuActivityPresent_; } - // Synchronous control API + void flushWarmupBuffers(int64_t currentIter, + const std::chrono::time_point& nextWakeupTime); + + bool isWarmupDone(const std::chrono::time_point& now, int64_t currentIter) const { + return derivedConfig_->isWarmupDone(now, currentIter); + } + + bool isCollectionDone(const std::chrono::time_point& now, int64_t currentIter) const { + return derivedConfig_->isCollectionDone(now, currentIter); + } + + bool isGpuCollectionStopped() const { + return isGpuCollectionStoppedImpl(); + } + + bool isProfilingByIteration() const { + return derivedConfig_->isProfilingByIteration(); + } + + std::chrono::time_point profileStartTime() const { + return derivedConfig_->profileStartTime(); + } + + std::chrono::time_point profileEndTime() const { + return derivedConfig_->profileEndTime(); + } + + int activitiesMaxGpuBufferSizeMB() const { + return config_ ? config_->activitiesMaxGpuBufferSize() / 1024 / 1024 : 0; + } + void startTrace(const std::chrono::time_point& now) { std::lock_guard guard(mutex_); startTraceInternal(now); @@ -167,20 +178,33 @@ class GenericActivityProfiler { stopTraceInternal(now); } - // Ensure collectTrace is done - void ensureCollectTraceDone(); - // Process CPU and GPU traces + void stopTraceAndReset(const std::chrono::time_point& now) { + std::lock_guard guard(mutex_); + stopTraceInternal(now); + resetInternal(); + } + void processTrace(ActivityLogger& logger) { std::lock_guard guard(mutex_); processTraceInternal(logger); } + void processTraceAndReset(ActivityLogger& logger) { + std::lock_guard guard(mutex_); + processTraceInternal(logger); + resetInternal(); + } + void reset() { std::lock_guard guard(mutex_); resetInternal(); } - // Set up profiler as specified in config. + bool canStart(const Config& config, const std::chrono::time_point& now) const { + ConfigDerivedState derived(config); + return derived.canStart(now); + } + void configure(const Config& config, const std::chrono::time_point& now); // Toggle GPU tracing during a profile instance @@ -246,8 +270,8 @@ class GenericActivityProfiler { virtual void setMaxGpuBufferSize([[maybe_unused]] int64_t size) {} virtual void enableGpuTracing() {} virtual void disableGpuTracing() {} - virtual void clearGpuActivities() {} - virtual bool isGpuCollectionStopped() const { + virtual void clearGpuActivitiesImpl() {} + virtual bool isGpuCollectionStoppedImpl() const { return false; } virtual void processGpuActivities([[maybe_unused]] ActivityLogger& logger) {} @@ -379,8 +403,6 @@ class GenericActivityProfiler { void checkTimestampOrder(const ITraceActivity* act1); - bool getCollectTraceState(); - // On-demand Request Config (should not be modified) // TODO: remove this config_, dependency needs to be removed from // finalizeTrace. @@ -392,14 +414,6 @@ class GenericActivityProfiler { // Logger used during trace processing ActivityLogger* logger_; - enum class RunloopState { - WaitForRequest, - Warmup, - CollectTrace, - ProcessTrace, - CollectMemorySnapshot, - }; - // All recorded trace spans, both CPU and GPU // Trace Id -> list of iterations. // Using map of lists for the iterator semantics, since we are recording @@ -426,6 +440,10 @@ class GenericActivityProfiler { bool gpuOnly_{false}; bool cpuActivityPresent_{false}; bool gpuActivityPresent_{false}; + + // Gate for CPU trace ingestion. True only between startTraceInternal() + // and resetInternal() — spans arriving outside this window are discarded. + bool acceptCpuTraces_{false}; bool rangeProfilingActive_{false}; std::atomic toggleState_{true}; @@ -438,18 +456,6 @@ class GenericActivityProfiler { // Mutex to protect non-atomic access to below state std::recursive_mutex mutex_; - // Add a thread to collect both cpu and gpu traces in case torch main thread - // is blocked when profiling by iterations is enabled. Issue #953 shows - // details. - std::unique_ptr collectTraceThread_{nullptr}; - - // Add a mutex to protect state for CollectTrace - std::recursive_mutex collectTraceStateMutex_; - bool isCollectingTrace{false}; - - // Runloop phase - std::atomic currentRunloopState_{RunloopState::WaitForRequest}; - // Keep track of the start time and end time for the trace collected. // External threads using startTrace need to manually stopTrace. Part of the // mock tests. All CUDA events before this time will be removed diff --git a/libkineto/src/RocmActivityProfiler.cpp b/libkineto/src/RocmActivityProfiler.cpp index 120d5b380..1d199c295 100644 --- a/libkineto/src/RocmActivityProfiler.cpp +++ b/libkineto/src/RocmActivityProfiler.cpp @@ -113,11 +113,11 @@ void RocmActivityProfiler::disableGpuTracing() { roc_.disableActivities(derivedConfig_->profileActivityTypes()); } -void RocmActivityProfiler::clearGpuActivities() { +void RocmActivityProfiler::clearGpuActivitiesImpl() { roc_.clearActivities(); } -bool RocmActivityProfiler::isGpuCollectionStopped() const { +bool RocmActivityProfiler::isGpuCollectionStoppedImpl() const { return roc_.stopCollection; } diff --git a/libkineto/src/RocmActivityProfiler.h b/libkineto/src/RocmActivityProfiler.h index 04a65f8cf..f2425307d 100644 --- a/libkineto/src/RocmActivityProfiler.h +++ b/libkineto/src/RocmActivityProfiler.h @@ -42,8 +42,8 @@ class RocmActivityProfiler : public GenericActivityProfiler { void setMaxGpuBufferSize(int64_t size) override; void enableGpuTracing() override; void disableGpuTracing() override; - void clearGpuActivities() override; - bool isGpuCollectionStopped() const override; + void clearGpuActivitiesImpl() override; + bool isGpuCollectionStoppedImpl() const override; void processGpuActivities(ActivityLogger& logger) override; void synchronizeGpuDevice() override; void pushCorrelationIdImpl(uint64_t id, CorrelationFlowType type) override; diff --git a/libkineto/src/SyncActivityProfilerHandler.cpp b/libkineto/src/SyncActivityProfilerHandler.cpp new file mode 100644 index 000000000..2a0a2f794 --- /dev/null +++ b/libkineto/src/SyncActivityProfilerHandler.cpp @@ -0,0 +1,85 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "SyncActivityProfilerHandler.h" + +#include + +#include "ActivityProfilerController.h" +#include "ActivityTrace.h" +#include "Config.h" +#include "GenericActivityProfiler.h" +#include "Logger.h" +#include "libkineto.h" +#include "output_membuf.h" + +using namespace std::chrono; + +namespace KINETO_NAMESPACE { + +SyncActivityProfilerHandler::SyncActivityProfilerHandler( + GenericActivityProfiler& profiler, + std::atomic_bool& syncTraceActive) + : profiler_(profiler), syncTraceActive_(syncTraceActive) {} + +void SyncActivityProfilerHandler::prepareTrace(const Config& config) { + auto now = std::chrono::system_clock::now(); + if (!profiler_.canStart(config, now)) { + return; + } + profiler_.configure(config, now); + syncTraceActive_ = true; + active_ = true; +} + +void SyncActivityProfilerHandler::startTrace() { + UST_LOGGER_MARK_COMPLETED(kWarmUpStage); + profiler_.startTrace(std::chrono::system_clock::now()); +} + +std::unique_ptr SyncActivityProfilerHandler:: + stopTrace() { + profiler_.stopTrace(std::chrono::system_clock::now()); + UST_LOGGER_MARK_COMPLETED(kCollectionStage); + auto logger = std::make_unique(profiler_.config()); + profiler_.processTrace(*logger); + // Will follow up with another patch for logging URLs when ActivityTrace + // is moved. + UST_LOGGER_MARK_COMPLETED(kPostProcessingStage); + + // Logger Metadata contains a map of LOGs collected in Kineto + // logger_level -> List of log lines + // This will be added into the trace as metadata. + std::unordered_map> loggerMD = + profiler_.getLoggerMetadata(); + logger->setLoggerMetadata(std::move(loggerMD)); + + profiler_.reset(); + syncTraceActive_ = false; + active_ = false; + return std::make_unique( + std::move(logger), ActivityProfilerController::loggerFactory()); +} + +void SyncActivityProfilerHandler::cancel() { + if (!active_) { + return; + } + if (libkineto::api().client()) { + libkineto::api().client()->stop(); + } + profiler_.stopTraceAndReset(std::chrono::system_clock::now()); + syncTraceActive_ = false; + active_ = false; +} + +void SyncActivityProfilerHandler::toggleCollectionDynamic(const bool enable) { + profiler_.toggleCollectionDynamic(enable); +} + +} // namespace KINETO_NAMESPACE diff --git a/libkineto/src/SyncActivityProfilerHandler.h b/libkineto/src/SyncActivityProfilerHandler.h new file mode 100644 index 000000000..b78c51295 --- /dev/null +++ b/libkineto/src/SyncActivityProfilerHandler.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +#include "ActivityTraceInterface.h" +#include "GenericActivityProfiler.h" + +namespace KINETO_NAMESPACE { + +class Config; + +class SyncActivityProfilerHandler { + public: + explicit SyncActivityProfilerHandler(GenericActivityProfiler& profiler, std::atomic_bool& syncTraceActive); + + SyncActivityProfilerHandler(const SyncActivityProfilerHandler&) = delete; + SyncActivityProfilerHandler& operator=(const SyncActivityProfilerHandler&) = delete; + + ~SyncActivityProfilerHandler() = default; + + void prepareTrace(const Config& config); + void toggleCollectionDynamic(const bool enable); + void startTrace(); + std::unique_ptr stopTrace(); + void cancel(); + + bool isSyncActive() const { + return active_; + } + + private: + GenericActivityProfiler& profiler_; + std::atomic_bool& syncTraceActive_; + std::atomic active_{false}; +}; +} // namespace KINETO_NAMESPACE diff --git a/libkineto/test/AsyncActivityProfilerHandlerTest.cpp b/libkineto/test/AsyncActivityProfilerHandlerTest.cpp new file mode 100644 index 000000000..5beb2e2d3 --- /dev/null +++ b/libkineto/test/AsyncActivityProfilerHandlerTest.cpp @@ -0,0 +1,422 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include + +#include + +#ifdef __linux__ +#include +#include +#include +#endif + +#include "include/Config.h" +#include "src/AsyncActivityProfilerHandler.h" +#include "src/GenericActivityProfiler.h" +#include "src/SyncActivityProfilerHandler.h" + +#include "src/Logger.h" + +using namespace std::chrono; +using namespace KINETO_NAMESPACE; + +// Subclass that lets us control isGpuCollectionStopped() for testing +// the buffer-overflow-during-warmup path without needing real CUPTI. +class MockGpuProfiler : public GenericActivityProfiler { + public: + MockGpuProfiler() : GenericActivityProfiler(/*cpuOnly=*/false) {} + + void setGpuCollectionStopped(bool stopped) { + gpuStopped_ = stopped; + } + + protected: + bool isGpuCollectionStoppedImpl() const override { + return gpuStopped_; + } + + private: + bool gpuStopped_{false}; +}; + +static std::string logUrlToPath(const std::string& url) { + const std::string prefix = "file://"; + if (url.substr(0, prefix.size()) == prefix) { + return url.substr(prefix.size()); + } + return url; +} + +static void checkTracefile(const char* filename) { +#ifdef __linux__ + int fd = open(filename, O_RDONLY); + if (!fd) { + perror(filename); + } + EXPECT_TRUE(fd); + struct stat buf{}; + fstat(fd, &buf); + EXPECT_GT(buf.st_size, 100); + close(fd); +#endif +} + +TEST(AsyncActivityProfilerHandler, AsyncTrace) { + std::vector log_modules( + {"CuptiActivityProfiler.cpp", "output_json.cpp"}); + SET_LOG_VERBOSITY_LEVEL(1, log_modules); + + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + + int iter = 0; + int warmup = 5; + auto now = system_clock::now(); + auto startTime = now + seconds(10); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = {} + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + )CFG", + warmup, + filename, + duration_cast(startTime.time_since_epoch()).count())); + + EXPECT_TRUE(success); + EXPECT_FALSE(handler.isAsyncActive()); + + handler.configure(cfg, now); + + EXPECT_TRUE(handler.isAsyncActive()); + + // fast forward in time and we have reached the startTime + now = startTime; + + // Warmup + handler.performRunLoopStep(now, now); + + auto next = now + milliseconds(1000); + + // Iteration-based steps should have no effect on timestamp-based config + while (++iter < 20) { + handler.performRunLoopStep(now, now, iter); + } + + // Terminate collection + handler.performRunLoopStep(next, next); + + EXPECT_TRUE(handler.isAsyncActive()); + + auto nextnext = next + milliseconds(1000); + + while (++iter < 40) { + handler.performRunLoopStep(next, next, iter); + } + + EXPECT_TRUE(handler.isAsyncActive()); + + handler.performRunLoopStep(nextnext, nextnext); + handler.performRunLoopStep(nextnext, nextnext); + + EXPECT_FALSE(handler.isAsyncActive()); + + auto logFile = logUrlToPath(cfg.activitiesLogUrl()); + checkTracefile(logFile.c_str()); +} + +TEST(AsyncActivityProfilerHandler, AsyncTraceUsingIter) { + std::vector log_modules( + {"CuptiActivityProfiler.cpp", "output_json.cpp"}); + SET_LOG_VERBOSITY_LEVEL(1, log_modules); + + auto runIterTest = [&](int start_iter, int warmup_iters, int trace_iters) { + LOG(INFO) << "Async Trace Test: start_iteration = " << start_iter + << " warmup iterations = " << warmup_iters + << " trace iterations = " << trace_iters; + + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + + int iter = 0; + auto now = system_clock::now(); + + bool success = cfg.parse( + fmt::format( + R"CFG( + PROFILE_START_ITERATION = {} + ACTIVITIES_WARMUP_ITERATIONS={} + ACTIVITIES_ITERATIONS={} + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + )CFG", + start_iter, + warmup_iters, + trace_iters, + filename)); + + EXPECT_TRUE(success); + EXPECT_FALSE(handler.isAsyncActive()); + + while (iter < (start_iter - warmup_iters)) { + iter++; + } + + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + + now += seconds(10); + auto next = now + milliseconds(1000); + + handler.performRunLoopStep(now, next); + EXPECT_TRUE(handler.isAsyncActive()); + + while (iter < start_iter) { + handler.performRunLoopStep(now, next, iter++); + } + + while (iter < (start_iter + trace_iters)) { + handler.performRunLoopStep(now, next, iter++); + } + + if (iter >= (start_iter + trace_iters)) { + handler.performRunLoopStep(now, next, iter++); + } + EXPECT_TRUE(handler.isAsyncActive()); + + auto nextnext = next + milliseconds(1000); + handler.performRunLoopStep(nextnext, nextnext); + handler.ensureCollectTraceDone(); + handler.performRunLoopStep(nextnext, nextnext); + + EXPECT_FALSE(handler.isAsyncActive()); + + auto logFile = logUrlToPath(cfg.activitiesLogUrl()); + checkTracefile(logFile.c_str()); + }; + + runIterTest(50, 5, 10); + runIterTest(0, 0, 2); + runIterTest(0, 5, 5); +} + +TEST(AsyncActivityProfilerHandler, MetadataJsonFormatingTest) { + std::vector log_modules( + {"CuptiActivityProfiler.cpp", "output_json.cpp"}); + SET_LOG_VERBOSITY_LEVEL(1, log_modules); + + setenv("PT_PROFILER_JOB_NAME", "test_training_job", 1); + setenv("PT_PROFILER_JOB_VERSION", "2", 1); + setenv("PT_PROFILER_JOB_ATTEMPT_INDEX", "5", 1); + + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + + auto now = system_clock::now(); + auto startTime = now + seconds(2); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = 1 + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + )CFG", + filename, + duration_cast(startTime.time_since_epoch()).count())); + + EXPECT_TRUE(success); + EXPECT_FALSE(handler.isAsyncActive()); + + handler.configure(cfg, now); + + EXPECT_TRUE(handler.isAsyncActive()); + + std::string keyPrefix = "TEST_METADATA_"; + profiler.addMetadata(keyPrefix + "NORMAL", "\"metadata value\""); + profiler.addMetadata(keyPrefix + "NEWLINE", "\"metadata \nvalue\""); + profiler.addMetadata(keyPrefix + "BACKSLASH", R"("/test/metadata\path")"); + + auto next = startTime + milliseconds(1000); + auto after = next + milliseconds(1000); + + handler.performRunLoopStep(startTime, startTime); + EXPECT_TRUE(handler.isAsyncActive()); + + handler.performRunLoopStep(next, next); + EXPECT_TRUE(handler.isAsyncActive()); + + handler.performRunLoopStep(after, after); + EXPECT_FALSE(handler.isAsyncActive()); + +#ifdef __linux__ + auto logFile = logUrlToPath(cfg.activitiesLogUrl()); + std::ifstream file(logFile); + if (!file.is_open()) { + throw std::runtime_error("Failed to open the trace JSON file."); + } + std::string jsonStr( + (std::istreambuf_iterator(file)), std::istreambuf_iterator()); + folly::dynamic jsonData = folly::parseJson(jsonStr); + + auto countSubstrings = [](const std::string& source, + const std::string& substring) { + size_t count = 0; + size_t pos = source.find(substring); + while (pos != std::string::npos) { + ++count; + pos = source.find(substring, pos + substring.length()); + } + return count; + }; + + EXPECT_EQ(3, countSubstrings(jsonStr, keyPrefix)); + EXPECT_EQ(2, countSubstrings(jsonStr, "metadata value")); + EXPECT_EQ(1, countSubstrings(jsonStr, "/test/metadata/path")); + + EXPECT_EQ(jsonData["PT_PROFILER_JOB_NAME"].asString(), "test_training_job"); + EXPECT_EQ(jsonData["PT_PROFILER_JOB_VERSION"].asString(), "2"); + EXPECT_EQ(jsonData["PT_PROFILER_JOB_ATTEMPT_INDEX"].asString(), "5"); +#endif + + unsetenv("PT_PROFILER_JOB_NAME"); + unsetenv("PT_PROFILER_JOB_VERSION"); + unsetenv("PT_PROFILER_JOB_ATTEMPT_INDEX"); +} + +TEST(AsyncActivityProfilerHandler, Cancel) { + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + // Cancel when inactive is a no-op + EXPECT_FALSE(handler.isAsyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isAsyncActive()); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + auto now = system_clock::now(); + auto startTime = now + seconds(10); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = 5 + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + )CFG", + filename, + duration_cast(startTime.time_since_epoch()).count())); + EXPECT_TRUE(success); + + // Cancel during Warmup + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isAsyncActive()); + + // Stays inactive on subsequent steps + handler.performRunLoopStep(startTime, startTime); + EXPECT_FALSE(handler.isAsyncActive()); + + // Cancel during CollectTrace + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + now = startTime; + handler.performRunLoopStep(now, now); + EXPECT_TRUE(handler.isAsyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isAsyncActive()); +} + +TEST(AsyncActivityProfilerHandler, BufferSizeLimitDuringWarmup) { + MockGpuProfiler profiler; + std::atomic_bool syncTraceActive{false}; + AsyncActivityProfilerHandler handler(profiler, syncTraceActive); + + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + mkstemps(filename, 5); + + Config cfg; + auto now = system_clock::now(); + auto startTime = now + seconds(10); + + bool success = cfg.parse( + fmt::format( + R"CFG( + ACTIVITIES_WARMUP_PERIOD_SECS = 5 + ACTIVITIES_DURATION_SECS = 1 + ACTIVITIES_LOG_FILE = {} + PROFILE_START_TIME = {} + ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB = 3 + )CFG", + filename, + duration_cast(startTime.time_since_epoch()).count())); + EXPECT_TRUE(success); + + handler.configure(cfg, now); + EXPECT_TRUE(handler.isAsyncActive()); + + // Simulate GPU buffer overflow + profiler.setGpuCollectionStopped(true); + + // During warmup, the handler should detect GPU collection stopped + // and transition back to WaitForRequest + now = startTime; + handler.performRunLoopStep(now, now); + EXPECT_FALSE(handler.isAsyncActive()); +} + +TEST(SyncActivityProfilerHandler, Cancel) { + GenericActivityProfiler profiler(/*cpu only*/ true); + std::atomic_bool syncTraceActive{false}; + SyncActivityProfilerHandler handler(profiler, syncTraceActive); + + // Cancel when inactive is a no-op + EXPECT_FALSE(handler.isSyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isSyncActive()); + + // Cancel after prepareTrace + Config cfg; + cfg.validate(system_clock::now()); + handler.prepareTrace(cfg); + EXPECT_TRUE(handler.isSyncActive()); + handler.cancel(); + EXPECT_FALSE(handler.isSyncActive()); +} diff --git a/libkineto/test/CuptiActivityProfilerTest.cpp b/libkineto/test/CuptiActivityProfilerTest.cpp index de46ea585..952b83cae 100644 --- a/libkineto/test/CuptiActivityProfilerTest.cpp +++ b/libkineto/test/CuptiActivityProfilerTest.cpp @@ -302,205 +302,6 @@ class CuptiActivityProfilerTest : public ::testing::Test { ActivityLoggerFactory loggerFactory; }; -void checkTracefile(const char* filename) { -#ifdef __linux__ - // Check that the expected file was written and that it has some content - int fd = open(filename, O_RDONLY); - if (!fd) { - perror(filename); - } - EXPECT_TRUE(fd); - // Should expect at least 100 bytes - struct stat buf{}; - fstat(fd, &buf); - EXPECT_GT(buf.st_size, 100); - close(fd); -#endif -} - -TEST(CuptiActivityProfiler, AsyncTrace) { - std::vector log_modules( - {"CuptiActivityProfiler.cpp", "output_json.cpp"}); - SET_LOG_VERBOSITY_LEVEL(1, log_modules); - - MockCuptiActivities activities; - CuptiActivityProfiler profiler(activities, /*cpu only*/ true); - - char filename[] = "/tmp/libkineto_testXXXXXX.json"; - mkstemps(filename, 5); - - Config cfg; - - int iter = 0; - int warmup = 5; - auto now = system_clock::now(); - auto startTime = now + seconds(10); - - bool success = cfg.parse( - fmt::format( - R"CFG( - ACTIVITIES_WARMUP_PERIOD_SECS = {} - ACTIVITIES_DURATION_SECS = 1 - ACTIVITIES_LOG_FILE = {} - PROFILE_START_TIME = {} - )CFG", - warmup, - filename, - duration_cast(startTime.time_since_epoch()).count())); - - EXPECT_TRUE(success); - EXPECT_FALSE(profiler.isActive()); - - auto logger = std::make_unique(cfg.activitiesLogFile()); - - // Usually configuration is done when now is startTime - warmup to kick off - // warmup but start right away in the test - profiler.configure(cfg, now); - profiler.setLogger(logger.get()); - - EXPECT_TRUE(profiler.isActive()); - - // fast forward in time and we have reached the startTime - now = startTime; - - // Run the profiler - // Warmup - // performRunLoopStep is usually called by the controller loop and takes - // the current time and the controller's next wakeup time. - profiler.performRunLoopStep( - /* Current time */ now, /* Next wakeup time */ now); - - auto next = now + milliseconds(1000); - - // performRunLoopStep can also be called by an application thread to update - // iteration count since this config does not use iteration this should have - // no effect on the state - while (++iter < 20) { - profiler.performRunLoopStep(now, now, iter); - } - - // Runloop should now be in collect state, so start workload - // Perform another runloop step, passing in the end profile time as current. - // This should terminate collection - profiler.performRunLoopStep( - /* Current time */ next, /* Next wakeup time */ next); - // One step needed for each of the Process and Finalize phases - // Doesn't really matter what times we pass in here. - - EXPECT_TRUE(profiler.isActive()); - - auto nextnext = next + milliseconds(1000); - - while (++iter < 40) { - profiler.performRunLoopStep(next, next, iter); - } - - EXPECT_TRUE(profiler.isActive()); - - profiler.performRunLoopStep(nextnext, nextnext); - profiler.performRunLoopStep(nextnext, nextnext); - - // Assert that tracing has completed - EXPECT_FALSE(profiler.isActive()); - - checkTracefile(filename); -} - -TEST(CuptiActivityProfiler, AsyncTraceUsingIter) { - std::vector log_modules( - {"CuptiActivityProfiler.cpp", "output_json.cpp"}); - SET_LOG_VERBOSITY_LEVEL(1, log_modules); - - auto runIterTest = [&](int start_iter, int warmup_iters, int trace_iters) { - LOG(INFO) << "Async Trace Test: start_iteration = " << start_iter - << " warmup iterations = " << warmup_iters - << " trace iterations = " << trace_iters; - - MockCuptiActivities activities; - CuptiActivityProfiler profiler(activities, /*cpu only*/ true); - - char filename[] = "/tmp/libkineto_testXXXXXX.json"; - mkstemps(filename, 5); - - Config cfg; - - int iter = 0; - auto now = system_clock::now(); - - bool success = cfg.parse( - fmt::format( - R"CFG( - PROFILE_START_ITERATION = {} - ACTIVITIES_WARMUP_ITERATIONS={} - ACTIVITIES_ITERATIONS={} - ACTIVITIES_DURATION_SECS = 1 - ACTIVITIES_LOG_FILE = {} - )CFG", - start_iter, - warmup_iters, - trace_iters, - filename)); - - EXPECT_TRUE(success); - EXPECT_FALSE(profiler.isActive()); - - auto logger = std::make_unique(cfg.activitiesLogFile()); - - // Usually configuration is done when now is startIter - warmup iter to kick - // off warmup but start right away in the test - while (iter < (start_iter - warmup_iters)) { - profiler.performRunLoopStep(now, now, iter++); - } - - profiler.configure(cfg, now); - profiler.setLogger(logger.get()); - - EXPECT_TRUE(profiler.isActive()); - - // fast forward in time, mimicking what will happen in reality - now += seconds(10); - auto next = now + milliseconds(1000); - - // this call to runloop step should not be effecting the state - profiler.performRunLoopStep(now, next); - EXPECT_TRUE(profiler.isActive()); - - // start trace collection - while (iter < start_iter) { - profiler.performRunLoopStep(now, next, iter++); - } - - // Runloop should now be in collect state, so start workload - - while (iter < (start_iter + trace_iters)) { - profiler.performRunLoopStep(now, next, iter++); - } - - // One step is required for each of the Process and Finalize phases - // Doesn't really matter what times we pass in here. - if (iter >= (start_iter + trace_iters)) { - profiler.performRunLoopStep(now, next, iter++); - } - EXPECT_TRUE(profiler.isActive()); - - auto nextnext = next + milliseconds(1000); - profiler.performRunLoopStep(nextnext, nextnext); - profiler.ensureCollectTraceDone(); - profiler.performRunLoopStep(nextnext, nextnext); - - // Assert that tracing has completed - EXPECT_FALSE(profiler.isActive()); - - checkTracefile(filename); - }; - - // start iter = 50, warmup iters = 5, trace iters = 10 - runIterTest(50, 5, 10); - // should be able to start at 0 iteration - runIterTest(0, 0, 2); - runIterTest(0, 5, 5); -} - TEST_F(CuptiActivityProfilerTest, SyncTrace) { // Verbose logging is useful for debugging std::vector log_modules({"CuptiActivityProfiler.cpp"}); @@ -1037,10 +838,7 @@ TEST_F(CuptiActivityProfilerTest, SubActivityProfilers) { profiler.configure(*cfg_, start_time); profiler.startTrace(start_time); - EXPECT_TRUE(profiler.isActive()); - profiler.stopTrace(start_time + nanoseconds(duration_ns)); - EXPECT_TRUE(profiler.isActive()); char filename[] = "/tmp/libkineto_testXXXXXX.json"; mkstemps(filename, 5); @@ -1074,156 +872,6 @@ TEST_F(CuptiActivityProfilerTest, SubActivityProfilers) { EXPECT_GT(buf.st_size, 100); } -TEST_F(CuptiActivityProfilerTest, BufferSizeLimitTestWarmup) { - CuptiActivityProfiler profiler(cuptiActivities_, /*cpu only*/ false); - - auto now = system_clock::now(); - auto startTime = now + seconds(10); - - int maxBufferSizeMB = 3; - - auto startTimeEpoch = std::to_string( - duration_cast(startTime.time_since_epoch()).count()); - std::string maxBufferSizeMBStr = std::to_string(maxBufferSizeMB); - cfg_->handleOption("ACTIVITIES_MAX_GPU_BUFFER_SIZE_MB", maxBufferSizeMBStr); - cfg_->handleOption("PROFILE_START_TIME", startTimeEpoch); - - EXPECT_FALSE(profiler.isActive()); - profiler.configure(*cfg_, now); - EXPECT_TRUE(profiler.isActive()); - - for (int i = 0; i < maxBufferSizeMB; i++) { - uint8_t* buf; - size_t gpuBufferSize; - size_t maxNumRecords; - cuptiActivities_.bufferRequestedOverride( - &buf, &gpuBufferSize, &maxNumRecords); - } - - // fast forward to startTime and profiler is now running - now = startTime; - - profiler.performRunLoopStep(now, now); - - auto next = now + milliseconds(1000); - profiler.performRunLoopStep(next, next); - profiler.performRunLoopStep(next, next); - profiler.performRunLoopStep(next, next); - - EXPECT_FALSE(profiler.isActive()); -} - -TEST(CuptiActivityProfiler, MetadataJsonFormatingTest) { - // Check for Json string sanitation and env var injection - // based on AsyncTrace test - std::vector log_modules( - {"CuptiActivityProfiler.cpp", "output_json.cpp"}); - SET_LOG_VERBOSITY_LEVEL(1, log_modules); - - // Set environment variables for testing - setenv("PT_PROFILER_JOB_NAME", "test_training_job", 1); - setenv("PT_PROFILER_JOB_VERSION", "2", 1); - setenv("PT_PROFILER_JOB_ATTEMPT_INDEX", "5", 1); - - MockCuptiActivities activities; - CuptiActivityProfiler profiler(activities, /*cpu only*/ true); - - char filename[] = "/tmp/libkineto_testXXXXXX.json"; - mkstemps(filename, 5); - - Config cfg; - - auto now = system_clock::now(); - auto startTime = now + seconds(2); - - bool success = cfg.parse( - fmt::format( - R"CFG( - ACTIVITIES_WARMUP_PERIOD_SECS = 1 - ACTIVITIES_DURATION_SECS = 1 - ACTIVITIES_LOG_FILE = {} - PROFILE_START_TIME = {} - )CFG", - filename, - duration_cast(startTime.time_since_epoch()).count())); - - EXPECT_TRUE(success); - EXPECT_FALSE(profiler.isActive()); - - auto logger = std::make_unique(cfg.activitiesLogFile()); - - // Usually configuration is done when now is startTime - warmup to kick off - // warmup but start right away in the test - profiler.configure(cfg, now); - profiler.setLogger(logger.get()); - - EXPECT_TRUE(profiler.isActive()); - - // Add test metadata - std::string keyPrefix = "TEST_METADATA_"; - profiler.addMetadata(keyPrefix + "NORMAL", "\"metadata value\""); - profiler.addMetadata(keyPrefix + "NEWLINE", "\"metadata \nvalue\""); - profiler.addMetadata(keyPrefix + "BACKSLASH", R"("/test/metadata\path")"); - - // Profiling activity at start up/during active/after duration - auto next = startTime + milliseconds(1000); - auto after = next + milliseconds(1000); - - profiler.performRunLoopStep(startTime, startTime); - EXPECT_TRUE(profiler.isActive()); - - profiler.performRunLoopStep(next, next); - EXPECT_TRUE(profiler.isActive()); - - profiler.performRunLoopStep(after, after); - EXPECT_FALSE(profiler.isActive()); - -#ifdef __linux__ - // Check that the saved JSON file can be loaded and deserialized - std::ifstream file(filename); - if (!file.is_open()) { - throw std::runtime_error("Failed to open the trace JSON file."); - } - std::string jsonStr( - (std::istreambuf_iterator(file)), std::istreambuf_iterator()); - nlohmann::json jsonData = nlohmann::json::parse(jsonStr); - - auto countSubstrings = [](const std::string& source, - const std::string& substring) { - size_t count = 0; - size_t pos = source.find(substring); - while (pos != std::string::npos) { - ++count; - pos = source.find(substring, pos + substring.length()); - } - return count; - }; - - // Check if metadata has been correctly sanitized - EXPECT_EQ(3, countSubstrings(jsonStr, keyPrefix)); - EXPECT_EQ(2, countSubstrings(jsonStr, "metadata value")); - EXPECT_EQ(1, countSubstrings(jsonStr, "/test/metadata/path")); - - // Verify injected env vars are in trace metadata with correct values - EXPECT_EQ( - jsonData["PT_PROFILER_JOB_NAME"].get(), "test_training_job"); - EXPECT_EQ(jsonData["PT_PROFILER_JOB_VERSION"].get(), "2"); - EXPECT_EQ(jsonData["PT_PROFILER_JOB_ATTEMPT_INDEX"].get(), "5"); - - // Verify hostname is non-empty when present (gethostname may not be - // available in all environments, but when it succeeds the value must - // not be empty). - if (jsonData.contains("host_name")) { - EXPECT_FALSE(jsonData["host_name"].get().empty()); - } -#endif - - // Clean up environment variables - unsetenv("PT_PROFILER_JOB_NAME"); - unsetenv("PT_PROFILER_JOB_VERSION"); - unsetenv("PT_PROFILER_JOB_ATTEMPT_INDEX"); -} - TEST_F(CuptiActivityProfilerTest, JsonGPUIDSortTest) { // Set logging level for debugging purpose std::vector log_modules( diff --git a/libkineto/test/RocmActivityProfilerTest.cpp b/libkineto/test/RocmActivityProfilerTest.cpp index a5b0669be..c14477d05 100644 --- a/libkineto/test/RocmActivityProfilerTest.cpp +++ b/libkineto/test/RocmActivityProfilerTest.cpp @@ -734,10 +734,7 @@ TEST_F(RocmActivityProfilerTest, SubActivityProfilers) { profiler.configure(*cfg_, start_time); profiler.startTrace(start_time); - EXPECT_TRUE(profiler.isActive()); - profiler.stopTrace(start_time + nanoseconds(duration_ns)); - EXPECT_TRUE(profiler.isActive()); char filename[] = "/tmp/libkineto_testXXXXXX.json"; mkstemps(filename, 5);