diff --git a/libkineto/src/RocmActivityProfiler.h b/libkineto/src/RocmActivityProfiler.h index 1021db3ab..11c59b9a9 100644 --- a/libkineto/src/RocmActivityProfiler.h +++ b/libkineto/src/RocmActivityProfiler.h @@ -12,6 +12,8 @@ // need this guard in the header file. #ifdef HAS_ROCTRACER +#include + #include "GenericActivityProfiler.h" #include "RocLogger.h" #ifndef ROCTRACER_FALLBACK diff --git a/libkineto/src/RocmStreamQueue.h b/libkineto/src/RocmStreamQueue.h new file mode 100644 index 000000000..797b10c95 --- /dev/null +++ b/libkineto/src/RocmStreamQueue.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#ifdef HAS_ROCTRACER + +#include +#include +#include +#include + +#include "RocLogger.h" + +namespace KINETO_NAMESPACE { +namespace detail { + +inline uint64_t streamIdFromHipStream(hipStream_t stream) { + return static_cast(reinterpret_cast(stream)); +} + +inline uint64_t runtimeStreamId(const rocprofBase* item) { + if (item->type == ROCTRACER_ACTIVITY_KERNEL) { + return streamIdFromHipStream(reinterpret_cast(item)->stream); + } + if (item->type == ROCTRACER_ACTIVITY_COPY) { + return streamIdFromHipStream(reinterpret_cast(item)->stream); + } + return 0; +} + +struct StreamQueueMaps { + std::unordered_map runtimeStreamByCorrelation; + std::unordered_map asyncQueueByRuntimeStream; + // Prefer preserving stream 0 over guessing when one HIP stream maps to + // multiple ROCm async queues in the same trace. + std::unordered_set ambiguousRuntimeStreams; +}; + +inline void rememberCorrelationQueue(std::unordered_map& asyncQueueByCorrelation, + std::unordered_set& ambiguousCorrelations, + uint64_t correlationId, + uint64_t queue) { + if (ambiguousCorrelations.count(correlationId) > 0) { + return; + } + const auto [queueIt, inserted] = asyncQueueByCorrelation.emplace(correlationId, queue); + if (!inserted && queueIt->second != queue) { + asyncQueueByCorrelation.erase(queueIt); + ambiguousCorrelations.insert(correlationId); + } +} + +inline void rememberQueueForRuntimeStream(StreamQueueMaps& maps, uint64_t stream, uint64_t queue) { + if (maps.ambiguousRuntimeStreams.count(stream) > 0) { + return; + } + const auto [queueIt, inserted] = maps.asyncQueueByRuntimeStream.emplace(stream, queue); + if (!inserted && queueIt->second != queue) { + maps.asyncQueueByRuntimeStream.erase(queueIt); + maps.ambiguousRuntimeStreams.insert(stream); + } +} + +inline StreamQueueMaps buildStreamQueueMaps(const std::vector& rows) { + StreamQueueMaps maps; + std::unordered_map asyncQueueByCorrelation; + std::unordered_set ambiguousCorrelations; + + for (const auto* item : rows) { + const uint64_t streamId = runtimeStreamId(item); + if (streamId != 0) { + maps.runtimeStreamByCorrelation[item->id] = streamId; + } + + if (item->type == ROCTRACER_ACTIVITY_ASYNC) { + const auto* async = reinterpret_cast(item); + if (async->queue != 0) { + rememberCorrelationQueue(asyncQueueByCorrelation, ambiguousCorrelations, async->id, async->queue); + } + } + } + + for (const auto* item : rows) { + const uint64_t streamId = runtimeStreamId(item); + if (streamId == 0 || ambiguousCorrelations.count(item->id) > 0) { + continue; + } + const auto queue = asyncQueueByCorrelation.find(item->id); + if (queue != asyncQueueByCorrelation.end()) { + rememberQueueForRuntimeStream(maps, streamId, queue->second); + } + } + + return maps; +} + +template +void backfillAsyncCopyStreams(std::vector& rows, IsAsyncCopy isAsyncCopy) { + const auto maps = buildStreamQueueMaps(rows); + if (maps.runtimeStreamByCorrelation.empty() || maps.asyncQueueByRuntimeStream.empty()) { + return; + } + for (auto* item : rows) { + if (item->type != ROCTRACER_ACTIVITY_ASYNC) { + continue; + } + auto* async = reinterpret_cast(item); + if (!isAsyncCopy(*async) || async->queue != 0) { + continue; + } + const auto stream = maps.runtimeStreamByCorrelation.find(async->id); + if (stream == maps.runtimeStreamByCorrelation.end() || maps.ambiguousRuntimeStreams.count(stream->second) > 0) { + continue; + } + const auto queue = maps.asyncQueueByRuntimeStream.find(stream->second); + if (queue != maps.asyncQueueByRuntimeStream.end()) { + async->queue = queue->second; + } + } +} + +} // namespace detail +} // namespace KINETO_NAMESPACE + +#endif // HAS_ROCTRACER diff --git a/libkineto/src/RocprofActivityApi.cpp b/libkineto/src/RocprofActivityApi.cpp index 0ae529166..aa61d5ecc 100644 --- a/libkineto/src/RocprofActivityApi.cpp +++ b/libkineto/src/RocprofActivityApi.cpp @@ -10,11 +10,15 @@ #include #include +#include #include #include +#include +#include #include "ApproximateClock.h" #include "Demangle.h" #include "Logger.h" +#include "RocmStreamQueue.h" #include "ThreadUtil.h" #include "output_base.h" @@ -22,6 +26,12 @@ using namespace std::chrono; namespace KINETO_NAMESPACE { +namespace { +bool isAsyncCopy(const rocprofAsyncRow& async) { + return async.domain == ROCPROFILER_BUFFER_TRACING_MEMORY_COPY; +} +} // namespace + RocprofActivityApi& RocprofActivityApi::singleton() { static RocprofActivityApi instance; return instance; @@ -117,6 +127,10 @@ int RocprofActivityApi::processActivities( // much better job. auto toffset = getTimeOffset(); + if (isLogged(ActivityType::GPU_MEMCPY)) { + detail::backfillAsyncCopyStreams(d->rows_, isAsyncCopy); + } + // All Runtime API Calls for (auto& item : d->rows_) { bool filtered = false; diff --git a/libkineto/src/RoctracerActivityApi.cpp b/libkineto/src/RoctracerActivityApi.cpp index 709a89dd1..575f2694a 100644 --- a/libkineto/src/RoctracerActivityApi.cpp +++ b/libkineto/src/RoctracerActivityApi.cpp @@ -10,11 +10,15 @@ #include #include +#include #include #include +#include +#include #include "ApproximateClock.h" #include "Demangle.h" #include "Logger.h" +#include "RocmStreamQueue.h" #include "ThreadUtil.h" #include "output_base.h" @@ -22,6 +26,22 @@ using namespace std::chrono; namespace KINETO_NAMESPACE { +namespace { +bool isAsyncCopy(const rocprofAsyncRow& async) { + switch (async.kind) { + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_2D_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_2D_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_2D_: + return true; + default: + return false; + } +} +} // namespace + RoctracerActivityApi& RoctracerActivityApi::singleton() { static RoctracerActivityApi instance; return instance; @@ -115,6 +135,10 @@ int RoctracerActivityApi::processActivities( // much better job. auto toffset = getTimeOffset(); + if (isLogged(ActivityType::GPU_MEMCPY)) { + detail::backfillAsyncCopyStreams(d->rows_, isAsyncCopy); + } + // All Runtime API Calls for (auto& item : d->rows_) { bool filtered = false; diff --git a/libkineto/test/RocmActivityProfilerTest.cpp b/libkineto/test/RocmActivityProfilerTest.cpp index 06a3f87db..c8a3975dd 100644 --- a/libkineto/test/RocmActivityProfilerTest.cpp +++ b/libkineto/test/RocmActivityProfilerTest.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include @@ -30,6 +32,7 @@ #include "include/time_since_epoch.h" #include "src/ActivityTrace.h" #include "src/RocmActivityProfiler.h" +#include "src/RocmStreamQueue.h" #ifdef ROCTRACER_FALLBACK #include "src/RoctracerActivityApi.h" @@ -88,6 +91,24 @@ void createTempTraceFile(char* filename) { ASSERT_GE(fd, 0) << "mkstemps failed for " << filename; close(fd); } + +bool isAsyncCopy(const rocprofAsyncRow& async) { +#ifdef ROCTRACER_FALLBACK + switch (async.kind) { + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_2D_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_2D_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_2D_: + return true; + default: + return false; + } +#else + return async.domain == ROCPROFILER_BUFFER_TRACING_MEMORY_COPY; +#endif +} } // namespace // Provides ability to easily create a test CPU-side ops @@ -133,7 +154,8 @@ struct MockRocLogger { uint32_t cid, int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + uint64_t stream = 0) { rocprofKernelRow* row = new rocprofKernelRow( correlation, RUNTIME_DOMAIN, @@ -151,7 +173,7 @@ struct MockRocLogger { 0, 0, 0, - 0); + reinterpret_cast(stream)); activities_.push_back(row); } @@ -177,7 +199,9 @@ struct MockRocLogger { uint32_t cid, int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + hipMemcpyKind kind = hipMemcpyHostToHost, + uint64_t stream = 0) { rocprofCopyRow* row = new rocprofCopyRow( correlation, RUNTIME_DOMAIN, @@ -189,15 +213,16 @@ struct MockRocLogger { nullptr, nullptr, 1, - hipMemcpyHostToHost, - static_cast(0)); + kind, + reinterpret_cast(stream)); activities_.push_back(row); } void addKernelActivity( int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + uint64_t queue = 1) { #ifdef ROCTRACER_FALLBACK rocprofAsyncRow* row = new rocprofAsyncRow( correlation, @@ -205,7 +230,7 @@ struct MockRocLogger { HIP_OP_DISPATCH_KIND_KERNEL_, 0, 0, - 1, + queue, start_ns, end_ns, std::string("kernel")); @@ -216,7 +241,7 @@ struct MockRocLogger { 0, 0, 0, - 1, + queue, start_ns, end_ns, std::string("kernel")); @@ -227,7 +252,8 @@ struct MockRocLogger { void addMemcpyH2DActivity( int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + uint64_t queue = 2) { #ifdef ROCTRACER_FALLBACK rocprofAsyncRow* row = new rocprofAsyncRow( correlation, @@ -235,7 +261,7 @@ struct MockRocLogger { HIP_OP_COPY_KIND_HOST_TO_DEVICE_, 0, 0, - 2, + queue, start_ns, end_ns, std::string()); @@ -246,7 +272,7 @@ struct MockRocLogger { 0, ROCPROFILER_MEMORY_COPY_HOST_TO_DEVICE, 0, - 2, + queue, start_ns, end_ns, std::string()); @@ -321,6 +347,7 @@ class MockRocActivities : public RocprofActivityApi { } externalCorrelations.clear(); } + detail::backfillAsyncCopyStreams(activityLogger->activities_, isAsyncCopy); for (auto& item : activityLogger->activities_) { handler(item); ++count; @@ -467,6 +494,157 @@ TEST_F(RocmActivityProfilerTest, SyncTrace) { #endif } +TEST_F( + RocmActivityProfilerTest, + HtoDMemcpyUsesRuntimeStreamWhenAsyncQueueIsZero) { + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + + auto gpuOps = std::make_unique(); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 10, start_time_ns + 15, 2, 7); + gpuOps->addKernelActivity(start_time_ns + 16, start_time_ns + 19, 2, 23); + gpuOps->addRuntimeCopyActivity( + HIP_MEMCPY, + start_time_ns + 20, + start_time_ns + 30, + 1, + hipMemcpyHostToDevice, + 7); + gpuOps->addMemcpyH2DActivity(start_time_ns + 40, start_time_ns + 50, 1, 0); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + const ITraceActivity* memcpyActivity = nullptr; + for (const auto& activity : *trace.activities()) { + if (activity->name() == "Memcpy HtoD (Host -> Device)") { + memcpyActivity = activity; + break; + } + } + + ASSERT_NE(memcpyActivity, nullptr); + EXPECT_EQ(memcpyActivity->resourceId(), 23); + +#ifdef __linux__ + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + createTempTraceFile(filename); + trace.save(filename); + + std::ifstream file(filename); + ASSERT_TRUE(file.is_open()); + std::string jsonStr( + (std::istreambuf_iterator(file)), std::istreambuf_iterator()); + nlohmann::json jsonData = nlohmann::json::parse(jsonStr); + + bool foundMemcpy = false; + for (const auto& event : jsonData["traceEvents"]) { + if (event.value("name", "") == "Memcpy HtoD (Host -> Device)") { + foundMemcpy = true; + EXPECT_EQ(event["args"]["stream"].get(), 23); + } + } + EXPECT_TRUE(foundMemcpy); +#endif +} + +TEST_F(RocmActivityProfilerTest, HtoDMemcpyKeepsNonzeroAsyncQueue) { + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + + auto gpuOps = std::make_unique(); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 10, start_time_ns + 15, 2, 7); + gpuOps->addKernelActivity(start_time_ns + 16, start_time_ns + 19, 2, 23); + gpuOps->addRuntimeCopyActivity( + HIP_MEMCPY, + start_time_ns + 20, + start_time_ns + 30, + 1, + hipMemcpyHostToDevice, + 7); + gpuOps->addMemcpyH2DActivity(start_time_ns + 40, start_time_ns + 50, 1, 42); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + const ITraceActivity* memcpyActivity = nullptr; + for (const auto& activity : *trace.activities()) { + if (activity->name() == "Memcpy HtoD (Host -> Device)") { + memcpyActivity = activity; + break; + } + } + + ASSERT_NE(memcpyActivity, nullptr); + EXPECT_EQ(memcpyActivity->resourceId(), 42); +} + +TEST_F( + RocmActivityProfilerTest, + HtoDMemcpyStaysOnZeroWhenRuntimeStreamMapsToMultipleQueues) { + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + + auto gpuOps = std::make_unique(); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 10, start_time_ns + 15, 2, 7); + gpuOps->addKernelActivity(start_time_ns + 16, start_time_ns + 19, 2, 23); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 20, start_time_ns + 25, 3, 7); + gpuOps->addKernelActivity(start_time_ns + 26, start_time_ns + 29, 3, 24); + gpuOps->addRuntimeCopyActivity( + HIP_MEMCPY, + start_time_ns + 30, + start_time_ns + 40, + 1, + hipMemcpyHostToDevice, + 7); + gpuOps->addMemcpyH2DActivity(start_time_ns + 50, start_time_ns + 60, 1, 0); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + const ITraceActivity* memcpyActivity = nullptr; + for (const auto& activity : *trace.activities()) { + if (activity->name() == "Memcpy HtoD (Host -> Device)") { + memcpyActivity = activity; + break; + } + } + + ASSERT_NE(memcpyActivity, nullptr); + EXPECT_EQ(memcpyActivity->resourceId(), 0); +} + TEST_F(RocmActivityProfilerTest, GpuNCCLCollectiveTest) { // Set logging level for debugging purpose std::vector log_modules(