From 555140c1b0648b54b629be3681844668f3f87f2f Mon Sep 17 00:00:00 2001 From: Darshan Sanghani Date: Fri, 15 May 2026 14:32:04 -0700 Subject: [PATCH] Fix ROCm HtoD memcpy stream attribution Summary: **Problem** AMD traces can render HtoD memcpy activity as if every copy happened on stream 0, even while the nearby GPU work is clearly using real non-zero streams. That makes the trace misleading for the main thing users are trying to understand: whether host-to-device copies overlap with active GPU work, and which stream context issued the copy. **Why** The ROCm async memory copy record can arrive without a useful queue id, so Kineto receives the copy as queue 0. At the same time, the HIP runtime stream context around the same work can still identify the real stream that issued the operation. Before this change, Kineto published the unusable queue 0 value directly, so the viewer collapsed those HtoD copies onto stream 0. **Fix** This change repairs only the safe case. The shared ROCm stream/queue helper learns an unambiguous HIP stream to non-zero ROCm queue mapping from correlated GPU activity, then uses that mapping to backfill HtoD memcpy rows that would otherwise render as stream 0. Existing non-zero memcpy queues are preserved, and ambiguous mappings stay on stream 0 instead of guessing. **Caveat [from Michael Wootton]** This is a Kineto rendering/backfill fix, not proof that an SDMA copy physically executed on the compute queue shown in the UI. Some memory copies may be serviced by SDMA without a queue id, and some may be serviced by a blit kernel. Mapping a queue-less copy onto the HIP stream's GPU queue is therefore virtual attribution: useful for making the trace readable and preserving stream-context overlap, but not a replacement for ROCm reporting an explicit copy queue or for a future dedicated pseudo-queue representation for queue-less copies. Reviewed By: scotts Differential Revision: D103952982 --- libkineto/src/RocmActivityProfiler.h | 2 + libkineto/src/RocmStreamQueue.h | 131 +++++++++++++ libkineto/src/RocprofActivityApi.cpp | 14 ++ libkineto/src/RoctracerActivityApi.cpp | 24 +++ libkineto/test/RocmActivityProfilerTest.cpp | 200 ++++++++++++++++++-- 5 files changed, 360 insertions(+), 11 deletions(-) create mode 100644 libkineto/src/RocmStreamQueue.h diff --git a/libkineto/src/RocmActivityProfiler.h b/libkineto/src/RocmActivityProfiler.h index 1021db3ab..11c59b9a9 100644 --- a/libkineto/src/RocmActivityProfiler.h +++ b/libkineto/src/RocmActivityProfiler.h @@ -12,6 +12,8 @@ // need this guard in the header file. #ifdef HAS_ROCTRACER +#include + #include "GenericActivityProfiler.h" #include "RocLogger.h" #ifndef ROCTRACER_FALLBACK diff --git a/libkineto/src/RocmStreamQueue.h b/libkineto/src/RocmStreamQueue.h new file mode 100644 index 000000000..797b10c95 --- /dev/null +++ b/libkineto/src/RocmStreamQueue.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#ifdef HAS_ROCTRACER + +#include +#include +#include +#include + +#include "RocLogger.h" + +namespace KINETO_NAMESPACE { +namespace detail { + +inline uint64_t streamIdFromHipStream(hipStream_t stream) { + return static_cast(reinterpret_cast(stream)); +} + +inline uint64_t runtimeStreamId(const rocprofBase* item) { + if (item->type == ROCTRACER_ACTIVITY_KERNEL) { + return streamIdFromHipStream(reinterpret_cast(item)->stream); + } + if (item->type == ROCTRACER_ACTIVITY_COPY) { + return streamIdFromHipStream(reinterpret_cast(item)->stream); + } + return 0; +} + +struct StreamQueueMaps { + std::unordered_map runtimeStreamByCorrelation; + std::unordered_map asyncQueueByRuntimeStream; + // Prefer preserving stream 0 over guessing when one HIP stream maps to + // multiple ROCm async queues in the same trace. + std::unordered_set ambiguousRuntimeStreams; +}; + +inline void rememberCorrelationQueue(std::unordered_map& asyncQueueByCorrelation, + std::unordered_set& ambiguousCorrelations, + uint64_t correlationId, + uint64_t queue) { + if (ambiguousCorrelations.count(correlationId) > 0) { + return; + } + const auto [queueIt, inserted] = asyncQueueByCorrelation.emplace(correlationId, queue); + if (!inserted && queueIt->second != queue) { + asyncQueueByCorrelation.erase(queueIt); + ambiguousCorrelations.insert(correlationId); + } +} + +inline void rememberQueueForRuntimeStream(StreamQueueMaps& maps, uint64_t stream, uint64_t queue) { + if (maps.ambiguousRuntimeStreams.count(stream) > 0) { + return; + } + const auto [queueIt, inserted] = maps.asyncQueueByRuntimeStream.emplace(stream, queue); + if (!inserted && queueIt->second != queue) { + maps.asyncQueueByRuntimeStream.erase(queueIt); + maps.ambiguousRuntimeStreams.insert(stream); + } +} + +inline StreamQueueMaps buildStreamQueueMaps(const std::vector& rows) { + StreamQueueMaps maps; + std::unordered_map asyncQueueByCorrelation; + std::unordered_set ambiguousCorrelations; + + for (const auto* item : rows) { + const uint64_t streamId = runtimeStreamId(item); + if (streamId != 0) { + maps.runtimeStreamByCorrelation[item->id] = streamId; + } + + if (item->type == ROCTRACER_ACTIVITY_ASYNC) { + const auto* async = reinterpret_cast(item); + if (async->queue != 0) { + rememberCorrelationQueue(asyncQueueByCorrelation, ambiguousCorrelations, async->id, async->queue); + } + } + } + + for (const auto* item : rows) { + const uint64_t streamId = runtimeStreamId(item); + if (streamId == 0 || ambiguousCorrelations.count(item->id) > 0) { + continue; + } + const auto queue = asyncQueueByCorrelation.find(item->id); + if (queue != asyncQueueByCorrelation.end()) { + rememberQueueForRuntimeStream(maps, streamId, queue->second); + } + } + + return maps; +} + +template +void backfillAsyncCopyStreams(std::vector& rows, IsAsyncCopy isAsyncCopy) { + const auto maps = buildStreamQueueMaps(rows); + if (maps.runtimeStreamByCorrelation.empty() || maps.asyncQueueByRuntimeStream.empty()) { + return; + } + for (auto* item : rows) { + if (item->type != ROCTRACER_ACTIVITY_ASYNC) { + continue; + } + auto* async = reinterpret_cast(item); + if (!isAsyncCopy(*async) || async->queue != 0) { + continue; + } + const auto stream = maps.runtimeStreamByCorrelation.find(async->id); + if (stream == maps.runtimeStreamByCorrelation.end() || maps.ambiguousRuntimeStreams.count(stream->second) > 0) { + continue; + } + const auto queue = maps.asyncQueueByRuntimeStream.find(stream->second); + if (queue != maps.asyncQueueByRuntimeStream.end()) { + async->queue = queue->second; + } + } +} + +} // namespace detail +} // namespace KINETO_NAMESPACE + +#endif // HAS_ROCTRACER diff --git a/libkineto/src/RocprofActivityApi.cpp b/libkineto/src/RocprofActivityApi.cpp index 0ae529166..aa61d5ecc 100644 --- a/libkineto/src/RocprofActivityApi.cpp +++ b/libkineto/src/RocprofActivityApi.cpp @@ -10,11 +10,15 @@ #include #include +#include #include #include +#include +#include #include "ApproximateClock.h" #include "Demangle.h" #include "Logger.h" +#include "RocmStreamQueue.h" #include "ThreadUtil.h" #include "output_base.h" @@ -22,6 +26,12 @@ using namespace std::chrono; namespace KINETO_NAMESPACE { +namespace { +bool isAsyncCopy(const rocprofAsyncRow& async) { + return async.domain == ROCPROFILER_BUFFER_TRACING_MEMORY_COPY; +} +} // namespace + RocprofActivityApi& RocprofActivityApi::singleton() { static RocprofActivityApi instance; return instance; @@ -117,6 +127,10 @@ int RocprofActivityApi::processActivities( // much better job. auto toffset = getTimeOffset(); + if (isLogged(ActivityType::GPU_MEMCPY)) { + detail::backfillAsyncCopyStreams(d->rows_, isAsyncCopy); + } + // All Runtime API Calls for (auto& item : d->rows_) { bool filtered = false; diff --git a/libkineto/src/RoctracerActivityApi.cpp b/libkineto/src/RoctracerActivityApi.cpp index 709a89dd1..575f2694a 100644 --- a/libkineto/src/RoctracerActivityApi.cpp +++ b/libkineto/src/RoctracerActivityApi.cpp @@ -10,11 +10,15 @@ #include #include +#include #include #include +#include +#include #include "ApproximateClock.h" #include "Demangle.h" #include "Logger.h" +#include "RocmStreamQueue.h" #include "ThreadUtil.h" #include "output_base.h" @@ -22,6 +26,22 @@ using namespace std::chrono; namespace KINETO_NAMESPACE { +namespace { +bool isAsyncCopy(const rocprofAsyncRow& async) { + switch (async.kind) { + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_2D_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_2D_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_2D_: + return true; + default: + return false; + } +} +} // namespace + RoctracerActivityApi& RoctracerActivityApi::singleton() { static RoctracerActivityApi instance; return instance; @@ -115,6 +135,10 @@ int RoctracerActivityApi::processActivities( // much better job. auto toffset = getTimeOffset(); + if (isLogged(ActivityType::GPU_MEMCPY)) { + detail::backfillAsyncCopyStreams(d->rows_, isAsyncCopy); + } + // All Runtime API Calls for (auto& item : d->rows_) { bool filtered = false; diff --git a/libkineto/test/RocmActivityProfilerTest.cpp b/libkineto/test/RocmActivityProfilerTest.cpp index 06a3f87db..c8a3975dd 100644 --- a/libkineto/test/RocmActivityProfilerTest.cpp +++ b/libkineto/test/RocmActivityProfilerTest.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include @@ -30,6 +32,7 @@ #include "include/time_since_epoch.h" #include "src/ActivityTrace.h" #include "src/RocmActivityProfiler.h" +#include "src/RocmStreamQueue.h" #ifdef ROCTRACER_FALLBACK #include "src/RoctracerActivityApi.h" @@ -88,6 +91,24 @@ void createTempTraceFile(char* filename) { ASSERT_GE(fd, 0) << "mkstemps failed for " << filename; close(fd); } + +bool isAsyncCopy(const rocprofAsyncRow& async) { +#ifdef ROCTRACER_FALLBACK + switch (async.kind) { + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_: + case HIP_OP_COPY_KIND_DEVICE_TO_HOST_2D_: + case HIP_OP_COPY_KIND_HOST_TO_DEVICE_2D_: + case HIP_OP_COPY_KIND_DEVICE_TO_DEVICE_2D_: + return true; + default: + return false; + } +#else + return async.domain == ROCPROFILER_BUFFER_TRACING_MEMORY_COPY; +#endif +} } // namespace // Provides ability to easily create a test CPU-side ops @@ -133,7 +154,8 @@ struct MockRocLogger { uint32_t cid, int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + uint64_t stream = 0) { rocprofKernelRow* row = new rocprofKernelRow( correlation, RUNTIME_DOMAIN, @@ -151,7 +173,7 @@ struct MockRocLogger { 0, 0, 0, - 0); + reinterpret_cast(stream)); activities_.push_back(row); } @@ -177,7 +199,9 @@ struct MockRocLogger { uint32_t cid, int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + hipMemcpyKind kind = hipMemcpyHostToHost, + uint64_t stream = 0) { rocprofCopyRow* row = new rocprofCopyRow( correlation, RUNTIME_DOMAIN, @@ -189,15 +213,16 @@ struct MockRocLogger { nullptr, nullptr, 1, - hipMemcpyHostToHost, - static_cast(0)); + kind, + reinterpret_cast(stream)); activities_.push_back(row); } void addKernelActivity( int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + uint64_t queue = 1) { #ifdef ROCTRACER_FALLBACK rocprofAsyncRow* row = new rocprofAsyncRow( correlation, @@ -205,7 +230,7 @@ struct MockRocLogger { HIP_OP_DISPATCH_KIND_KERNEL_, 0, 0, - 1, + queue, start_ns, end_ns, std::string("kernel")); @@ -216,7 +241,7 @@ struct MockRocLogger { 0, 0, 0, - 1, + queue, start_ns, end_ns, std::string("kernel")); @@ -227,7 +252,8 @@ struct MockRocLogger { void addMemcpyH2DActivity( int64_t start_ns, int64_t end_ns, - int64_t correlation) { + int64_t correlation, + uint64_t queue = 2) { #ifdef ROCTRACER_FALLBACK rocprofAsyncRow* row = new rocprofAsyncRow( correlation, @@ -235,7 +261,7 @@ struct MockRocLogger { HIP_OP_COPY_KIND_HOST_TO_DEVICE_, 0, 0, - 2, + queue, start_ns, end_ns, std::string()); @@ -246,7 +272,7 @@ struct MockRocLogger { 0, ROCPROFILER_MEMORY_COPY_HOST_TO_DEVICE, 0, - 2, + queue, start_ns, end_ns, std::string()); @@ -321,6 +347,7 @@ class MockRocActivities : public RocprofActivityApi { } externalCorrelations.clear(); } + detail::backfillAsyncCopyStreams(activityLogger->activities_, isAsyncCopy); for (auto& item : activityLogger->activities_) { handler(item); ++count; @@ -467,6 +494,157 @@ TEST_F(RocmActivityProfilerTest, SyncTrace) { #endif } +TEST_F( + RocmActivityProfilerTest, + HtoDMemcpyUsesRuntimeStreamWhenAsyncQueueIsZero) { + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + + auto gpuOps = std::make_unique(); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 10, start_time_ns + 15, 2, 7); + gpuOps->addKernelActivity(start_time_ns + 16, start_time_ns + 19, 2, 23); + gpuOps->addRuntimeCopyActivity( + HIP_MEMCPY, + start_time_ns + 20, + start_time_ns + 30, + 1, + hipMemcpyHostToDevice, + 7); + gpuOps->addMemcpyH2DActivity(start_time_ns + 40, start_time_ns + 50, 1, 0); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + const ITraceActivity* memcpyActivity = nullptr; + for (const auto& activity : *trace.activities()) { + if (activity->name() == "Memcpy HtoD (Host -> Device)") { + memcpyActivity = activity; + break; + } + } + + ASSERT_NE(memcpyActivity, nullptr); + EXPECT_EQ(memcpyActivity->resourceId(), 23); + +#ifdef __linux__ + char filename[] = "/tmp/libkineto_testXXXXXX.json"; + createTempTraceFile(filename); + trace.save(filename); + + std::ifstream file(filename); + ASSERT_TRUE(file.is_open()); + std::string jsonStr( + (std::istreambuf_iterator(file)), std::istreambuf_iterator()); + nlohmann::json jsonData = nlohmann::json::parse(jsonStr); + + bool foundMemcpy = false; + for (const auto& event : jsonData["traceEvents"]) { + if (event.value("name", "") == "Memcpy HtoD (Host -> Device)") { + foundMemcpy = true; + EXPECT_EQ(event["args"]["stream"].get(), 23); + } + } + EXPECT_TRUE(foundMemcpy); +#endif +} + +TEST_F(RocmActivityProfilerTest, HtoDMemcpyKeepsNonzeroAsyncQueue) { + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + + auto gpuOps = std::make_unique(); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 10, start_time_ns + 15, 2, 7); + gpuOps->addKernelActivity(start_time_ns + 16, start_time_ns + 19, 2, 23); + gpuOps->addRuntimeCopyActivity( + HIP_MEMCPY, + start_time_ns + 20, + start_time_ns + 30, + 1, + hipMemcpyHostToDevice, + 7); + gpuOps->addMemcpyH2DActivity(start_time_ns + 40, start_time_ns + 50, 1, 42); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + const ITraceActivity* memcpyActivity = nullptr; + for (const auto& activity : *trace.activities()) { + if (activity->name() == "Memcpy HtoD (Host -> Device)") { + memcpyActivity = activity; + break; + } + } + + ASSERT_NE(memcpyActivity, nullptr); + EXPECT_EQ(memcpyActivity->resourceId(), 42); +} + +TEST_F( + RocmActivityProfilerTest, + HtoDMemcpyStaysOnZeroWhenRuntimeStreamMapsToMultipleQueues) { + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + + auto gpuOps = std::make_unique(); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 10, start_time_ns + 15, 2, 7); + gpuOps->addKernelActivity(start_time_ns + 16, start_time_ns + 19, 2, 23); + gpuOps->addRuntimeKernelActivity( + HIP_LAUNCH_KERNEL, start_time_ns + 20, start_time_ns + 25, 3, 7); + gpuOps->addKernelActivity(start_time_ns + 26, start_time_ns + 29, 3, 24); + gpuOps->addRuntimeCopyActivity( + HIP_MEMCPY, + start_time_ns + 30, + start_time_ns + 40, + 1, + hipMemcpyHostToDevice, + 7); + gpuOps->addMemcpyH2DActivity(start_time_ns + 50, start_time_ns + 60, 1, 0); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + const ITraceActivity* memcpyActivity = nullptr; + for (const auto& activity : *trace.activities()) { + if (activity->name() == "Memcpy HtoD (Host -> Device)") { + memcpyActivity = activity; + break; + } + } + + ASSERT_NE(memcpyActivity, nullptr); + EXPECT_EQ(memcpyActivity->resourceId(), 0); +} + TEST_F(RocmActivityProfilerTest, GpuNCCLCollectiveTest) { // Set logging level for debugging purpose std::vector log_modules(